From afdb3c0b28715b3d9a0982e4e0504a0cbcf11e70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ale=C5=A1=20Smodi=C5=A1?= Date: Mon, 14 Sep 2015 14:49:54 +0200 Subject: Reimplemented communication with the client side. * Implemented a node web server supporting asynchronous websocket and long-polling communication with clients. * Implemented TCP communication between python middleware and node web server. --- server/socket.py | 286 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 286 insertions(+) create mode 100644 server/socket.py (limited to 'server/socket.py') diff --git a/server/socket.py b/server/socket.py new file mode 100644 index 0000000..c248056 --- /dev/null +++ b/server/socket.py @@ -0,0 +1,286 @@ +# coding=utf-8 + +import socket +import selectors +import json +import threading +import traceback +from server.handlers import serve_request + +# TODO: add a whole lot of try..except blocks, and just make it overall error resistant +# TODO: add logging + +_mapping_lock = threading.Lock() # the lock guarding access to the two mappings below + +_sessions_to_socket = {} # keyed by sid: what session is bound to what socket + +_transactions_to_socket = {} # keyed by tid, used only when there is no sid available, so a request can be replied + + +def processIncomingPacket(receiving_socket, packet): + print('Decocoding JSON: {}'.format(packet)) + obj = json.loads(packet) + if obj.get('type') == 'connect': + return # TODO: first packet is 'connect', providing the list of connected sessions to the peer + tid = obj.get('tid') # transaction ID + if tid is None: + raise Exception('Transaction ID is missing from the request') + sid = obj.get('sid') # session ID + _mapping_lock.acquire() + try: + if sid is None: + _transactions_to_socket[tid] = receiving_socket + else: + _sessions_to_socket[sid] = receiving_socket + finally: + _mapping_lock.release() + serve_request(obj) + + +def sendPacket(tid, sid, json_obj): + if sid is None: + if tid is None: + raise Exception('Cannot send a message without a tid and/or a sid') + _mapping_lock.acquire() + try: + socket = _transactions_to_socket.get(tid) + del _transactions_to_socket[tid] # this mapping is not relevant anymore + finally: + _mapping_lock.release() + if socket is None: + raise Exception('Cannot send in transaction, it is not registered: ' + tid) + else: + _mapping_lock.acquire() + try: + socket = _sessions_to_socket.get(sid) + finally: + _mapping_lock.release() + if socket is None: + raise Exception('Cannot send to session, it is not registered: ' + sid) + if json_obj is None: + # a special case: this is only an acknowledgment, so any request/response accounting can be cleared + json_obj = {} + if sid is not None: + json_obj['sid'] = sid + if tid is not None: + json_obj['tid'] = tid + socket.send(json_obj) + + +class SocketHandler(object): + + def read(self): + pass + + def destroy(self): + pass + + +class ServerSocket(SocketHandler): + + def __init__(self, communication, hostname='localhost', port=19732): + self.communication = communication + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server_socket.bind((hostname, port)) + server_socket.listen(5) + server_socket.setblocking(False) + self.socket = server_socket + + def read(self): + conn, addr = self.socket.accept() + self.communication.add_socket(JsonClientSocket(self.communication, conn, addr)) + + def destroy(self): + self.communication.remove_socket(self) + try: + self.socket.close() + except: + pass + + +class JsonClientSocket(SocketHandler): + + def __init__(self, communication, socket, address): + self.communication = communication + self.socket = socket + self.address = address + socket.setblocking(False) + self.status = 0 # registering + self.receiving_header = True + self.buffer = [] + self.received_length = 0 # the cumulative length of strings in buffer + self.body_length = 0 # the length of the next packet, given in the header + self._read_lock = threading.Lock() + self._write_lock = threading.Lock() + + def read(self): + lock = self._read_lock + lock.acquire() + try: + data = self.socket.recv(8192) + if not data: + self.destroy() + else: + # packet decode loop + data = data.decode('utf-8') + print("received: {}".format(data)) + offset = 0 + N = len(data) + while (offset < N): + if self.receiving_header: + i = data.find(':', offset) + if i < 0: + self.buffer.append(data[offset:]) + break + if len(self.buffer) == 0: + self.body_length = int(data[offset:i]) + else: + self.buffer.append(data[offset:i]) + self.body_length = int(''.join(self.buffer)) + self.buffer.clear() + offset = i + 1 + self.receiving_header = False + continue + elif (self.received_length + N - offset) <= self.body_length: + if self.received_length == 0: + # an optimized case + s = data[offset:offset+self.body_length] + offset += len(s) + else: + s = data[offset:offset+self.body_length - self.received_length] # TODO: is this correct? + offset += len(s) + self.buffer.append(s) + s = ''.join(self.buffer) + self.buffer.clear() + self.received_length = 0 + self.receiving_header = True + try: + processIncomingPacket(self, s) + except Exception as e: + # any exception that propagates to here means a possible protocol error, we have to disconnect + traceback.print_exc() + self.destroy() + return + else: + # incomplete packet body, buffer it until the next time + s = data[offset:] + self.buffer.append(s) + self.received_length += len(s) + break + finally: + lock.release() + + def destroy(self): + self.communication.remove_socket(self) + try: + self.socket.close() + except: + pass + # TODO: unregister from any internal mechanisms + + def send(self, json_obj): + js = json.dumps(json_obj) + m = str(len(js)) + u':' + js + bytes = m.encode('utf-8') + lock = self._write_lock + lock.acquire() + try: + self.socket.sendall(bytes) + finally: + lock.release() + + +class Communication(SocketHandler): + def __init__(self): + # the selector to register sockets with, the serve_forever() uses it to handle any incoming data + self._selector = selectors.DefaultSelector() # to wait until at least on socket is readable + self._lock = threading.Lock() # the access lock, because of multi-threading + self._remove_queue = [] # what sockets to remove + self._add_queue = [] # what sockets to add + notifier, receiver = socket.socketpair() # the notification mechanism for the serving thread, using the selector + receiver.setblocking(False) + self._notifier = notifier + self._receiver = receiver + self._registered_handlers = set() + self._not_signalled = True + self._destroying = False + + def serve_forever(self): + self._selector.register(self._receiver, selectors.EVENT_READ, self) + while True: + events = self._selector.select() + for key, mask in events: + if mask & selectors.EVENT_READ: + key.data.read() + + def add_socket(self, socket_handler): + lock = self._lock + lock.acquire() + try: + self._add_queue.append(socket_handler) + self._signal() + finally: + lock.release() + + def remove_socket(self, socket_handler): + lock = self._lock + lock.acquire() + try: + self._remove_queue.append(socket_handler) + self._signal() + finally: + lock.release() + + def _signal(self): # invoke holding the lock + if self._not_signalled: + self._notifier.send(b'1') + self._not_signalled = False + + def read(self): + # we have been invoked to process some new additions and/or removals + lock = self._lock + lock.acquire() + try: + self._receiver.recv(1024) # clear the queue + self._not_signalled = True + selector = self._selector + for handler in self._remove_queue: + selector.unregister(handler.socket) + self._registered_handlers.remove(handler) + self._remove_queue.clear() + for handler in self._add_queue: + selector.register(handler.socket, selectors.EVENT_READ, handler) + self._registered_handlers.add(handler) + self._add_queue.clear() + if self._destroying: + self._notifier.close() + self._receiver.close() + self._selector.close() + finally: + lock.release() + + def destroy(self): + lock = self._lock + # make a copy of handlers, because as we're destroying them each will invoke remove_socket(), and for that the lock must be available and the iterating set must not change + lock.acquire() + try: + handlers = set(self._registered_handlers) + finally: + lock.release() + for handler in handlers: + handler.destroy() # destroy them all, even the server socket + # this is where everything is destroyed but us, so what follows is self-destruct + lock.acquire() + try: + self._destroying = True + self._signal() + finally: + lock.release() + + +def serve_forever(): + communication = Communication() + server = ServerSocket(communication) + communication.add_socket(server) + communication.serve_forever() -- cgit v1.2.1