# CodeQ: an online programming tutor. # Copyright (C) 2015 UL FRI # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) any # later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more # details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import socket import selectors import json import threading import traceback from server.handlers import serve_request import logging # TODO: add a whole lot of try..except blocks, and just make it overall error resistant _mapping_lock = threading.Lock() # the lock guarding access to the two mappings below _sessions_to_socket = {} # keyed by sid: what session is bound to what socket _transactions_to_socket = {} # keyed by tid, used only when there is no sid available, so a request can be replied def handle_connect(data): """The first packet is 'connect', providing the list of connected sessions to the peer (that's us).""" pass def handle_unregister(data): """When the client disconnects from the peer, but doesn't logout first, leaving the possibility that it will reconnect in the same session, possibly via other channel (nodejs instance).""" pass # TODO: meta handlers implement system protocol to support horizontal scaling, implement them when it becomes relevant _meta_handlers = { 'connect': handle_connect, 'unregister': handle_unregister } def processIncomingPacket(receiving_socket, packet): logging.debug('Decoding JSON: {}'.format(packet)) obj = json.loads(packet) req_type = obj.get('type') # private (meta) requests have the 'type' if req_type is None: # user protocol, internally map the request so response can be delivered correctly, and forward it to the higher layer tid = obj.get('tid') # transaction ID if tid is None: raise Exception('Transaction ID is missing from the request') sid = obj.get('sid') # session ID _mapping_lock.acquire() try: if sid is None: _transactions_to_socket[tid] = receiving_socket else: _sessions_to_socket[sid] = receiving_socket finally: _mapping_lock.release() serve_request(obj) else: # system protocol, connection-specific meta data handler = _meta_handlers.get(req_type) if handler is None: logging.error('Received an unknown meta packet: {}'.format(req_type)) else: handler(obj) def sendPacket(tid, sid, json_obj): if sid is None: if tid is None: raise Exception('Cannot send a message without a tid and/or a sid') _mapping_lock.acquire() try: socket = _transactions_to_socket.get(tid) del _transactions_to_socket[tid] # this mapping is not relevant anymore finally: _mapping_lock.release() if socket is None: raise Exception('Cannot send in transaction, it is not registered: ' + tid) else: _mapping_lock.acquire() try: socket = _sessions_to_socket.get(sid) finally: _mapping_lock.release() if socket is None: raise Exception('Cannot send to session, it is not registered: ' + sid) if json_obj is None: # a special case: this is only an acknowledgment, so any request/response accounting can be cleared json_obj = {} if sid is not None: json_obj['sid'] = sid if tid is not None: json_obj['tid'] = tid socket.send(json_obj) class SocketHandler(object): def read(self): pass def destroy(self): pass class ServerSocket(SocketHandler): def __init__(self, communication, hostname='localhost', port=19732): self.communication = communication server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind((hostname, port)) server_socket.listen(5) server_socket.setblocking(False) self.socket = server_socket def read(self): conn, addr = self.socket.accept() self.communication.add_socket(JsonClientSocket(self.communication, conn, addr)) def destroy(self): self.communication.remove_socket(self) try: self.socket.close() except: pass class JsonClientSocket(SocketHandler): def __init__(self, communication, socket, address): self.communication = communication self.socket = socket self.address = address socket.setblocking(False) self.status = 0 # registering self.receiving_header = True # whether we're receiving the length of the next packet, or the packet itself self.buffer = [] # chunks of the next packet (header or body) self.received_length = 0 # the cumulative length of strings in buffer self.body_length = 0 # the length of the next packet, which was given in the header self._read_lock = threading.Lock() self._write_lock = threading.Lock() def read(self): lock = self._read_lock lock.acquire() try: data = self.socket.recv(8192) if not data: self.destroy() else: # packet decode loop logging.debug("received: {}".format(data)) offset = 0 N = len(data) while (offset < N): if self.receiving_header: i = data.find(b':', offset) if i < 0: self.buffer.append(data[offset:]) break if len(self.buffer) == 0: self.body_length = int(data[offset:i].decode('utf-8')) else: self.buffer.append(data[offset:i]) self.body_length = int(b''.join(self.buffer)) self.buffer.clear() offset = i + 1 self.receiving_header = False continue elif (self.received_length + N - offset) >= self.body_length: if self.received_length == 0: # an optimized case s = data[offset:offset+self.body_length] offset += len(s) else: s = data[offset:offset+self.body_length - self.received_length] # TODO: is this correct? offset += len(s) self.buffer.append(s) s = b''.join(self.buffer) self.buffer.clear() self.received_length = 0 self.receiving_header = True try: processIncomingPacket(self, s.decode('utf-8')) except Exception as e: # any exception that propagates to here means a possible protocol error, we have to disconnect logging.critical(traceback.format_exc()) self.destroy() return else: # incomplete packet body, buffer it until the next time s = data[offset:] self.buffer.append(s) self.received_length += len(s) break finally: lock.release() def destroy(self): self.communication.remove_socket(self) try: self.socket.close() except: pass # TODO: unregister from any internal mechanisms def send(self, json_obj): b = json.dumps(json_obj).encode('utf-8') b = bytes(str(len(b)), 'utf-8') + b':' + b lock = self._write_lock lock.acquire() try: self.socket.sendall(b) finally: lock.release() class Communication(SocketHandler): def __init__(self): # the selector to register sockets with, the serve_forever() uses it to handle any incoming data self._selector = selectors.DefaultSelector() # to wait until at least on socket is readable self._lock = threading.Lock() # the access lock, because of multi-threading self._remove_queue = [] # what sockets to remove self._add_queue = [] # what sockets to add notifier, receiver = socket.socketpair() # the notification mechanism for the serving thread, using the selector receiver.setblocking(False) self._notifier = notifier self._receiver = receiver self._registered_handlers = set() self._not_signalled = True self._destroying = False def serve_forever(self): self._selector.register(self._receiver, selectors.EVENT_READ, self) while True: events = self._selector.select() for key, mask in events: if mask & selectors.EVENT_READ: key.data.read() def add_socket(self, socket_handler): lock = self._lock lock.acquire() try: self._add_queue.append(socket_handler) self._signal() finally: lock.release() def remove_socket(self, socket_handler): lock = self._lock lock.acquire() try: self._remove_queue.append(socket_handler) self._signal() finally: lock.release() def _signal(self): # invoke holding the lock if self._not_signalled: self._notifier.send(b'1') self._not_signalled = False def read(self): # we have been invoked to process some new additions and/or removals lock = self._lock lock.acquire() try: self._receiver.recv(1024) # clear the queue self._not_signalled = True selector = self._selector for handler in self._remove_queue: selector.unregister(handler.socket) self._registered_handlers.remove(handler) self._remove_queue.clear() for handler in self._add_queue: selector.register(handler.socket, selectors.EVENT_READ, handler) self._registered_handlers.add(handler) self._add_queue.clear() if self._destroying: self._notifier.close() self._receiver.close() self._selector.close() finally: lock.release() def destroy(self): lock = self._lock # make a copy of handlers, because as we're destroying them each will invoke remove_socket(), and for that the lock must be available and the iterating set must not change lock.acquire() try: handlers = set(self._registered_handlers) finally: lock.release() for handler in handlers: handler.destroy() # destroy them all, even the server socket # this is where everything has been destroyed but us, so what follows is self-destruct lock.acquire() try: self._destroying = True self._signal() finally: lock.release() def serve_forever(): communication = Communication() server = ServerSocket(communication) communication.add_socket(server) communication.serve_forever()