summaryrefslogtreecommitdiff
path: root/server/socket.py
diff options
context:
space:
mode:
Diffstat (limited to 'server/socket.py')
-rw-r--r--server/socket.py286
1 files changed, 286 insertions, 0 deletions
diff --git a/server/socket.py b/server/socket.py
new file mode 100644
index 0000000..c248056
--- /dev/null
+++ b/server/socket.py
@@ -0,0 +1,286 @@
+# coding=utf-8
+
+import socket
+import selectors
+import json
+import threading
+import traceback
+from server.handlers import serve_request
+
+# TODO: add a whole lot of try..except blocks, and just make it overall error resistant
+# TODO: add logging
+
+_mapping_lock = threading.Lock() # the lock guarding access to the two mappings below
+
+_sessions_to_socket = {} # keyed by sid: what session is bound to what socket
+
+_transactions_to_socket = {} # keyed by tid, used only when there is no sid available, so a request can be replied
+
+
+def processIncomingPacket(receiving_socket, packet):
+ print('Decocoding JSON: {}'.format(packet))
+ obj = json.loads(packet)
+ if obj.get('type') == 'connect':
+ return # TODO: first packet is 'connect', providing the list of connected sessions to the peer
+ tid = obj.get('tid') # transaction ID
+ if tid is None:
+ raise Exception('Transaction ID is missing from the request')
+ sid = obj.get('sid') # session ID
+ _mapping_lock.acquire()
+ try:
+ if sid is None:
+ _transactions_to_socket[tid] = receiving_socket
+ else:
+ _sessions_to_socket[sid] = receiving_socket
+ finally:
+ _mapping_lock.release()
+ serve_request(obj)
+
+
+def sendPacket(tid, sid, json_obj):
+ if sid is None:
+ if tid is None:
+ raise Exception('Cannot send a message without a tid and/or a sid')
+ _mapping_lock.acquire()
+ try:
+ socket = _transactions_to_socket.get(tid)
+ del _transactions_to_socket[tid] # this mapping is not relevant anymore
+ finally:
+ _mapping_lock.release()
+ if socket is None:
+ raise Exception('Cannot send in transaction, it is not registered: ' + tid)
+ else:
+ _mapping_lock.acquire()
+ try:
+ socket = _sessions_to_socket.get(sid)
+ finally:
+ _mapping_lock.release()
+ if socket is None:
+ raise Exception('Cannot send to session, it is not registered: ' + sid)
+ if json_obj is None:
+ # a special case: this is only an acknowledgment, so any request/response accounting can be cleared
+ json_obj = {}
+ if sid is not None:
+ json_obj['sid'] = sid
+ if tid is not None:
+ json_obj['tid'] = tid
+ socket.send(json_obj)
+
+
+class SocketHandler(object):
+
+ def read(self):
+ pass
+
+ def destroy(self):
+ pass
+
+
+class ServerSocket(SocketHandler):
+
+ def __init__(self, communication, hostname='localhost', port=19732):
+ self.communication = communication
+ server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ server_socket.bind((hostname, port))
+ server_socket.listen(5)
+ server_socket.setblocking(False)
+ self.socket = server_socket
+
+ def read(self):
+ conn, addr = self.socket.accept()
+ self.communication.add_socket(JsonClientSocket(self.communication, conn, addr))
+
+ def destroy(self):
+ self.communication.remove_socket(self)
+ try:
+ self.socket.close()
+ except:
+ pass
+
+
+class JsonClientSocket(SocketHandler):
+
+ def __init__(self, communication, socket, address):
+ self.communication = communication
+ self.socket = socket
+ self.address = address
+ socket.setblocking(False)
+ self.status = 0 # registering
+ self.receiving_header = True
+ self.buffer = []
+ self.received_length = 0 # the cumulative length of strings in buffer
+ self.body_length = 0 # the length of the next packet, given in the header
+ self._read_lock = threading.Lock()
+ self._write_lock = threading.Lock()
+
+ def read(self):
+ lock = self._read_lock
+ lock.acquire()
+ try:
+ data = self.socket.recv(8192)
+ if not data:
+ self.destroy()
+ else:
+ # packet decode loop
+ data = data.decode('utf-8')
+ print("received: {}".format(data))
+ offset = 0
+ N = len(data)
+ while (offset < N):
+ if self.receiving_header:
+ i = data.find(':', offset)
+ if i < 0:
+ self.buffer.append(data[offset:])
+ break
+ if len(self.buffer) == 0:
+ self.body_length = int(data[offset:i])
+ else:
+ self.buffer.append(data[offset:i])
+ self.body_length = int(''.join(self.buffer))
+ self.buffer.clear()
+ offset = i + 1
+ self.receiving_header = False
+ continue
+ elif (self.received_length + N - offset) <= self.body_length:
+ if self.received_length == 0:
+ # an optimized case
+ s = data[offset:offset+self.body_length]
+ offset += len(s)
+ else:
+ s = data[offset:offset+self.body_length - self.received_length] # TODO: is this correct?
+ offset += len(s)
+ self.buffer.append(s)
+ s = ''.join(self.buffer)
+ self.buffer.clear()
+ self.received_length = 0
+ self.receiving_header = True
+ try:
+ processIncomingPacket(self, s)
+ except Exception as e:
+ # any exception that propagates to here means a possible protocol error, we have to disconnect
+ traceback.print_exc()
+ self.destroy()
+ return
+ else:
+ # incomplete packet body, buffer it until the next time
+ s = data[offset:]
+ self.buffer.append(s)
+ self.received_length += len(s)
+ break
+ finally:
+ lock.release()
+
+ def destroy(self):
+ self.communication.remove_socket(self)
+ try:
+ self.socket.close()
+ except:
+ pass
+ # TODO: unregister from any internal mechanisms
+
+ def send(self, json_obj):
+ js = json.dumps(json_obj)
+ m = str(len(js)) + u':' + js
+ bytes = m.encode('utf-8')
+ lock = self._write_lock
+ lock.acquire()
+ try:
+ self.socket.sendall(bytes)
+ finally:
+ lock.release()
+
+
+class Communication(SocketHandler):
+ def __init__(self):
+ # the selector to register sockets with, the serve_forever() uses it to handle any incoming data
+ self._selector = selectors.DefaultSelector() # to wait until at least on socket is readable
+ self._lock = threading.Lock() # the access lock, because of multi-threading
+ self._remove_queue = [] # what sockets to remove
+ self._add_queue = [] # what sockets to add
+ notifier, receiver = socket.socketpair() # the notification mechanism for the serving thread, using the selector
+ receiver.setblocking(False)
+ self._notifier = notifier
+ self._receiver = receiver
+ self._registered_handlers = set()
+ self._not_signalled = True
+ self._destroying = False
+
+ def serve_forever(self):
+ self._selector.register(self._receiver, selectors.EVENT_READ, self)
+ while True:
+ events = self._selector.select()
+ for key, mask in events:
+ if mask & selectors.EVENT_READ:
+ key.data.read()
+
+ def add_socket(self, socket_handler):
+ lock = self._lock
+ lock.acquire()
+ try:
+ self._add_queue.append(socket_handler)
+ self._signal()
+ finally:
+ lock.release()
+
+ def remove_socket(self, socket_handler):
+ lock = self._lock
+ lock.acquire()
+ try:
+ self._remove_queue.append(socket_handler)
+ self._signal()
+ finally:
+ lock.release()
+
+ def _signal(self): # invoke holding the lock
+ if self._not_signalled:
+ self._notifier.send(b'1')
+ self._not_signalled = False
+
+ def read(self):
+ # we have been invoked to process some new additions and/or removals
+ lock = self._lock
+ lock.acquire()
+ try:
+ self._receiver.recv(1024) # clear the queue
+ self._not_signalled = True
+ selector = self._selector
+ for handler in self._remove_queue:
+ selector.unregister(handler.socket)
+ self._registered_handlers.remove(handler)
+ self._remove_queue.clear()
+ for handler in self._add_queue:
+ selector.register(handler.socket, selectors.EVENT_READ, handler)
+ self._registered_handlers.add(handler)
+ self._add_queue.clear()
+ if self._destroying:
+ self._notifier.close()
+ self._receiver.close()
+ self._selector.close()
+ finally:
+ lock.release()
+
+ def destroy(self):
+ lock = self._lock
+ # make a copy of handlers, because as we're destroying them each will invoke remove_socket(), and for that the lock must be available and the iterating set must not change
+ lock.acquire()
+ try:
+ handlers = set(self._registered_handlers)
+ finally:
+ lock.release()
+ for handler in handlers:
+ handler.destroy() # destroy them all, even the server socket
+ # this is where everything is destroyed but us, so what follows is self-destruct
+ lock.acquire()
+ try:
+ self._destroying = True
+ self._signal()
+ finally:
+ lock.release()
+
+
+def serve_forever():
+ communication = Communication()
+ server = ServerSocket(communication)
+ communication.add_socket(server)
+ communication.serve_forever()