# coding=utf-8 import socket import selectors import json import threading import traceback from server.handlers import serve_request # TODO: add a whole lot of try..except blocks, and just make it overall error resistant # TODO: add logging _mapping_lock = threading.Lock() # the lock guarding access to the two mappings below _sessions_to_socket = {} # keyed by sid: what session is bound to what socket _transactions_to_socket = {} # keyed by tid, used only when there is no sid available, so a request can be replied def processIncomingPacket(receiving_socket, packet): print('Decoding JSON: {}'.format(packet)) obj = json.loads(packet) req_type = obj.get('type') # private (meta) requests have the 'type' if req_type == 'connect': return # TODO: first packet is 'connect', providing the list of connected sessions to the peer tid = obj.get('tid') # transaction ID if tid is None: raise Exception('Transaction ID is missing from the request') sid = obj.get('sid') # session ID _mapping_lock.acquire() try: if sid is None: _transactions_to_socket[tid] = receiving_socket else: _sessions_to_socket[sid] = receiving_socket finally: _mapping_lock.release() serve_request(obj) def sendPacket(tid, sid, json_obj): if sid is None: if tid is None: raise Exception('Cannot send a message without a tid and/or a sid') _mapping_lock.acquire() try: socket = _transactions_to_socket.get(tid) del _transactions_to_socket[tid] # this mapping is not relevant anymore finally: _mapping_lock.release() if socket is None: raise Exception('Cannot send in transaction, it is not registered: ' + tid) else: _mapping_lock.acquire() try: socket = _sessions_to_socket.get(sid) finally: _mapping_lock.release() if socket is None: raise Exception('Cannot send to session, it is not registered: ' + sid) if json_obj is None: # a special case: this is only an acknowledgment, so any request/response accounting can be cleared json_obj = {} if sid is not None: json_obj['sid'] = sid if tid is not None: json_obj['tid'] = tid socket.send(json_obj) class SocketHandler(object): def read(self): pass def destroy(self): pass class ServerSocket(SocketHandler): def __init__(self, communication, hostname='localhost', port=19732): self.communication = communication server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind((hostname, port)) server_socket.listen(5) server_socket.setblocking(False) self.socket = server_socket def read(self): conn, addr = self.socket.accept() self.communication.add_socket(JsonClientSocket(self.communication, conn, addr)) def destroy(self): self.communication.remove_socket(self) try: self.socket.close() except: pass class JsonClientSocket(SocketHandler): def __init__(self, communication, socket, address): self.communication = communication self.socket = socket self.address = address socket.setblocking(False) self.status = 0 # registering self.receiving_header = True # whether we're receiving the length of the next packet, or the packet itself self.buffer = [] # chunks of the next packet (header or body) self.received_length = 0 # the cumulative length of strings in buffer self.body_length = 0 # the length of the next packet, which was given in the header self._read_lock = threading.Lock() self._write_lock = threading.Lock() def read(self): lock = self._read_lock lock.acquire() try: data = self.socket.recv(8192) if not data: self.destroy() else: # packet decode loop data = data.decode('utf-8') print("received: {}".format(data)) offset = 0 N = len(data) while (offset < N): if self.receiving_header: i = data.find(':', offset) if i < 0: self.buffer.append(data[offset:]) break if len(self.buffer) == 0: self.body_length = int(data[offset:i]) else: self.buffer.append(data[offset:i]) self.body_length = int(''.join(self.buffer)) self.buffer.clear() offset = i + 1 self.receiving_header = False continue elif (self.received_length + N - offset) >= self.body_length: if self.received_length == 0: # an optimized case s = data[offset:offset+self.body_length] offset += len(s) else: s = data[offset:offset+self.body_length - self.received_length] # TODO: is this correct? offset += len(s) self.buffer.append(s) s = ''.join(self.buffer) self.buffer.clear() self.received_length = 0 self.receiving_header = True try: processIncomingPacket(self, s) except Exception as e: # any exception that propagates to here means a possible protocol error, we have to disconnect traceback.print_exc() self.destroy() return else: # incomplete packet body, buffer it until the next time s = data[offset:] self.buffer.append(s) self.received_length += len(s) break finally: lock.release() def destroy(self): self.communication.remove_socket(self) try: self.socket.close() except: pass # TODO: unregister from any internal mechanisms def send(self, json_obj): js = json.dumps(json_obj) m = str(len(js)) + u':' + js bytes = m.encode('utf-8') lock = self._write_lock lock.acquire() try: self.socket.sendall(bytes) finally: lock.release() class Communication(SocketHandler): def __init__(self): # the selector to register sockets with, the serve_forever() uses it to handle any incoming data self._selector = selectors.DefaultSelector() # to wait until at least on socket is readable self._lock = threading.Lock() # the access lock, because of multi-threading self._remove_queue = [] # what sockets to remove self._add_queue = [] # what sockets to add notifier, receiver = socket.socketpair() # the notification mechanism for the serving thread, using the selector receiver.setblocking(False) self._notifier = notifier self._receiver = receiver self._registered_handlers = set() self._not_signalled = True self._destroying = False def serve_forever(self): self._selector.register(self._receiver, selectors.EVENT_READ, self) while True: events = self._selector.select() for key, mask in events: if mask & selectors.EVENT_READ: key.data.read() def add_socket(self, socket_handler): lock = self._lock lock.acquire() try: self._add_queue.append(socket_handler) self._signal() finally: lock.release() def remove_socket(self, socket_handler): lock = self._lock lock.acquire() try: self._remove_queue.append(socket_handler) self._signal() finally: lock.release() def _signal(self): # invoke holding the lock if self._not_signalled: self._notifier.send(b'1') self._not_signalled = False def read(self): # we have been invoked to process some new additions and/or removals lock = self._lock lock.acquire() try: self._receiver.recv(1024) # clear the queue self._not_signalled = True selector = self._selector for handler in self._remove_queue: selector.unregister(handler.socket) self._registered_handlers.remove(handler) self._remove_queue.clear() for handler in self._add_queue: selector.register(handler.socket, selectors.EVENT_READ, handler) self._registered_handlers.add(handler) self._add_queue.clear() if self._destroying: self._notifier.close() self._receiver.close() self._selector.close() finally: lock.release() def destroy(self): lock = self._lock # make a copy of handlers, because as we're destroying them each will invoke remove_socket(), and for that the lock must be available and the iterating set must not change lock.acquire() try: handlers = set(self._registered_handlers) finally: lock.release() for handler in handlers: handler.destroy() # destroy them all, even the server socket # this is where everything has been destroyed but us, so what follows is self-destruct lock.acquire() try: self._destroying = True self._signal() finally: lock.release() def serve_forever(): communication = Communication() server = ServerSocket(communication) communication.add_socket(server) communication.serve_forever()