From afdb3c0b28715b3d9a0982e4e0504a0cbcf11e70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ale=C5=A1=20Smodi=C5=A1?= Date: Mon, 14 Sep 2015 14:49:54 +0200 Subject: Reimplemented communication with the client side. * Implemented a node web server supporting asynchronous websocket and long-polling communication with clients. * Implemented TCP communication between python middleware and node web server. --- server/__init__.py | 7 +- server/handlers.py | 302 +++++++++++++++++++++++++++++++++++++++++++++++++ server/problems.py | 3 + server/socket.py | 286 ++++++++++++++++++++++++++++++++++++++++++++++ server/user_session.py | 11 ++ 5 files changed, 608 insertions(+), 1 deletion(-) create mode 100644 server/handlers.py create mode 100644 server/socket.py (limited to 'server') diff --git a/server/__init__.py b/server/__init__.py index 284c162..861a7df 100644 --- a/server/__init__.py +++ b/server/__init__.py @@ -3,9 +3,14 @@ import multiprocessing.managers from . import user_session from . import prolog_session +from . import python_session +from . import socket import server.problems -__all__ = ['user_session', 'prolog_session', 'problems', 'start'] +__all__ = ['socket', 'handlers', 'user_session', 'prolog_session', 'python_session', 'problems', 'start'] + + +# TODO: everything below is deprecated, remove it class Codeq(object): _method_to_typeid_ = { diff --git a/server/handlers.py b/server/handlers.py new file mode 100644 index 0000000..e9cf7c8 --- /dev/null +++ b/server/handlers.py @@ -0,0 +1,302 @@ +# coding=utf-8 + +from concurrent.futures import ThreadPoolExecutor +import traceback +from errors.session import * +import server + + +class CodeqService(object): + """Base class for all CodeQ services. + """ + session_is_optional = False + + def process(self, request): + pass + + +class ProblemList(CodeqService): + """List all available problems to the client. + """ + session_is_optional = True + + def process(self, request): + request.reply({'code': 0, 'message': 'ok', 'problems': server.problems.list_problems()}) + + +class Login(CodeqService): + """Logs in a client, creating a new session. + """ + session_is_optional = True + + def process(self, request): + js = request.data + username = js.get('username') + password = js.get('password') + if username is None: + request.reply({'code': 1, 'message': 'Username was not provided'}) + elif password is None: + request.reply({'code': 2, 'message': 'Password was not provided'}) + else: + try: + session = server.user_session.authenticate_and_create_session(username, password) + except AuthenticationFailed: + request.reply({'code': 3, 'message': 'Username or password do not match'}) + else: + if request.session: + request.session.destroy() + request.reply({'code': 0, 'message': 'OK', 'sid':session.get_sid()}) + + +class Activity(CodeqService): + def process(self, request): + js = request.data + trace = js.get('trace') + solution = js.get('solution') + problem_id = js.get('problem_id') + if (trace is not None) or (solution is not None): + # we have something to do + if problem_id is None: + request.reply({'code': 1, 'message': 'Problem ID is missing'}) + else: + request.session.update_solution(problem_id, trace, solution) + request.end() # no feedback, just acknowledge the reception + + +class Query(CodeqService): + def process(self, request): + js = request.data + step = js.get('step') + if step is None: + request.reply({'code': 1, 'message': '"step" is not set'}) + else: + problem_id = js.get('problem_id') + if problem_id is None: + request.reply({'code': 4, 'message': 'Problem ID not given'}) + else: + session = request.session + trace = js.get('trace') + prolog = session.get_prolog() + program = None + if step == 'run': + program = js.get('program') + query = js.get('query') + if program is None: + result = {'code': 2, 'message': 'No program specified'} + elif query is None: + result = {'code': 3, 'message': 'No query specified'} + else: + messages, status, have_more = prolog.run_for_user(session.get_uid(), problem_id, program, query) + result = {'code': 0, 'message': 'ok', 'terminal': {'messages': messages, 'status': status, 'have_more': have_more}} + elif step == 'next': + messages, status, have_more = prolog.step() + result = {'code': 0, 'message': 'ok', 'terminal': {'messages': messages, 'status': status, 'have_more': have_more}} + elif step == 'end': + messages, status, have_more = prolog.end() + result = {'code': 0, 'message': 'ok', 'terminal': {'messages': messages, 'status': status, 'have_more': have_more}} + else: + result = {'code': 5, 'message': 'Unknown prolog step: {0}'.format(step)} + if program or trace: + session.update_solution(problem_id, trace, program) + request.reply(result) + + +# Pull stdout/stderr from the session's Python interpreter. TODO: convert to async handling +class PythonPull(CodeqService): + def process(self, request): + python = request.session.get_python() + output = python.pull() + request.reply({'code': 0, 'message': 'ok', 'terminal': {'text': output if output else ''}}) + + +# Push stdin to the session's Python interpreter. TODO: convert to async handling +class PythonPush(CodeqService): + def process(self, request): + text = request.data.get('text') + if text is None: + request.reply({'code': 1, 'message': 'No input specified'}) + else: + python = request.session.get_python() + python.push(text) + request.reply({'code': 0, 'message': 'ok'}) + + +class Hint(CodeqService): + def process(self, request): + js = request.data + language = js.get('language') + problem_id = js.get('problem_id') + program = js.get('program') + + if problem_id is None: + request.reply({'code': 1, 'message': 'No problem ID specified'}) + elif program is None: + request.reply({'code': 2, 'message': 'No program specified'}) + else: + session = request.session + lang_session = None + if language == 'prolog': + lang_session = session.get_prolog() + elif language == 'python': + lang_session = session.get_python() + + if lang_session is None: + request.reply({'code': 3, 'message': 'Unknown language specified'}) + else: + hints = lang_session.hint(session.get_sid(), problem_id, program) + request.reply({'code': 0, 'message': 'ok', 'hints': hints}) + + +class Test(CodeqService): + def process(self, request): + js = request.data + language = js.get('language') + problem_id = js.get('problem_id') + program = js.get('program') + + if problem_id is None: + request.reply({'code': 1, 'message': 'No problem ID specified'}) + elif program is None: + request.reply({'code': 2, 'message': 'No program specified'}) + else: + session = request.session + lang_session = None + if language == 'prolog': + lang_session = session.get_prolog() + elif language == 'python': + lang_session = session.get_python() + + if lang_session is None: + request.reply({'code': 3, 'message': 'Unknown language specified'}) + else: + hints = lang_session.test(session.get_sid(), problem_id, program) + request.reply({'code': 0, 'message': 'ok', 'hints': hints}) + + +class GetProblem(CodeqService): + def process(self, request): + js = request.data + language = js.get('language') + problem_group = js.get('problem_group') + problem = js.get('problem') + if language is None: + request.reply({'code': 1, 'message': 'Language identifier not given'}) + elif problem_group is None: + request.reply({'code': 2, 'message': 'Problem group identifier not given'}) + elif problem is None: + request.reply({'code': 3, 'message': 'Problem identifier not given'}) + else: + request.reply({'code': 0, 'message': 'ok', 'data': request.session.get_problem_data(language, problem_group, problem)}) + + +# maps actions to their handlers +incoming_handlers = { + 'list_problems': ProblemList(), + 'login': Login(), + 'get_problem': GetProblem(), + 'logout': None, + 'activity': Activity(), + 'query': Query(), + 'hint': Hint(), + 'test': Test() +} + + +class Request(object): + def __init__(self, tid, original_sid, session, data): + """Creates a new request + + :param tid: communicator-level transaction ID (global relative to the specific communicator where it originated) + :param original_sid: session ID, optional + :param session: the actual session with the original_sid, if it exists; the processor may swap it for a new session + :param data: the request data from the client + :return: new instance + """ + self._tid = tid + self._original_sid = original_sid + self.session = session + self.data = data + self.is_finished = False + + def reply(self, data): + """Reply to this request. + + :param data: the dictionary representing the reply, that will be converted to JSON + :return: None + """ + if data is None: + self.end() + if self._original_sid is not None: + sid = data.get('sid') + if sid is None: + data['sid'] = self._original_sid + elif sid != self._original_sid: + data['sid'] = self._original_sid + data['new_sid'] = sid + # it is important to reply with the same tid and sid parameters as were in the request, so message accounting doesn't get confused + send(self._tid, self._original_sid, data) + self.is_finished = True + + def end(self): + """Conclude the request, without sending a response. + + This is to acknowledge that the response has been received. + :return: None + """ + send(self._tid, self._original_sid, None) + self.is_finished = True + + +########## low-level machinery, subject to change to support more than the single socket communicator ########## + +_executor = ThreadPoolExecutor(max_workers=100) + +def _invoke_handler(handler, request): + try: + print('Worker thread processing data={}'.format(str(request.data))) + handler.process(request) + if not request.is_finished: + print('ERROR: the request was not concluded!') + request.reply({'code': -1, 'message': 'Request processing did not provide a reply'}) + print('Processing finished') + except Exception as e: + print('ERROR: data processing failed: ' + str(e)) + traceback.print_exc() + request.reply({'code': -1, 'message': 'Internal error: ' + str(e)}) + +def serve_request(json_obj): + if not isinstance(json_obj, dict): + raise RequestProcessingError('Require a request represented as a dict, instead got: ' + str(type(json_obj))) + tid = json_obj.get('tid') # session ID and transaction ID uniquely identify a transaction + sid = json_obj.get('sid') + action = json_obj.get('action') + if action is None: + raise RequestProcessingError('Request does not contain an action') + if not isinstance(action, str): + raise RequestProcessingError('Requested action must be a string, got: ' + str(type(action))) + handler = incoming_handlers.get(action) + if handler is None: + raise RequestProcessingError('No handler for ' + action) + print("Attempting to serve action={}".format(action)) + session = None + if sid is None: + if not handler.session_is_optional: + raise RequestProcessingError('Request is missing a session-ID') + else: + del json_obj['sid'] + try: + session = server.user_session.get_session_by_id(sid) + except NoSuchSession: + if not handler.session_is_optional: + raise RequestProcessingError('This user session has expired. Please log-in again.') + _executor.submit(_invoke_handler, handler, Request(tid, sid, session, json_obj)) + +def send(tid, sid, json_obj): + # just a proxy function for now + print('Sending reply: {}'.format(str(json_obj))) + server.socket.sendPacket(tid, sid, json_obj) + +def stop(): + global _executor + _executor.shutdown() + _executor = None diff --git a/server/problems.py b/server/problems.py index 02f0307..1c87345 100644 --- a/server/problems.py +++ b/server/problems.py @@ -19,6 +19,9 @@ def load_module(fullname): d = os.path.join(_path_prefix, *parts[:-1]) ff = importlib.machinery.FileFinder(d, (importlib.machinery.SourceFileLoader, ['.py'])) spec = ff.find_spec(fullname) + if spec is None: + print('ERROR: there is no problem module {0}'.format(fullname)) + return None mod = type(sys)(fullname) mod.__loader__ = spec.loader mod.__package__ = spec.parent diff --git a/server/socket.py b/server/socket.py new file mode 100644 index 0000000..c248056 --- /dev/null +++ b/server/socket.py @@ -0,0 +1,286 @@ +# coding=utf-8 + +import socket +import selectors +import json +import threading +import traceback +from server.handlers import serve_request + +# TODO: add a whole lot of try..except blocks, and just make it overall error resistant +# TODO: add logging + +_mapping_lock = threading.Lock() # the lock guarding access to the two mappings below + +_sessions_to_socket = {} # keyed by sid: what session is bound to what socket + +_transactions_to_socket = {} # keyed by tid, used only when there is no sid available, so a request can be replied + + +def processIncomingPacket(receiving_socket, packet): + print('Decocoding JSON: {}'.format(packet)) + obj = json.loads(packet) + if obj.get('type') == 'connect': + return # TODO: first packet is 'connect', providing the list of connected sessions to the peer + tid = obj.get('tid') # transaction ID + if tid is None: + raise Exception('Transaction ID is missing from the request') + sid = obj.get('sid') # session ID + _mapping_lock.acquire() + try: + if sid is None: + _transactions_to_socket[tid] = receiving_socket + else: + _sessions_to_socket[sid] = receiving_socket + finally: + _mapping_lock.release() + serve_request(obj) + + +def sendPacket(tid, sid, json_obj): + if sid is None: + if tid is None: + raise Exception('Cannot send a message without a tid and/or a sid') + _mapping_lock.acquire() + try: + socket = _transactions_to_socket.get(tid) + del _transactions_to_socket[tid] # this mapping is not relevant anymore + finally: + _mapping_lock.release() + if socket is None: + raise Exception('Cannot send in transaction, it is not registered: ' + tid) + else: + _mapping_lock.acquire() + try: + socket = _sessions_to_socket.get(sid) + finally: + _mapping_lock.release() + if socket is None: + raise Exception('Cannot send to session, it is not registered: ' + sid) + if json_obj is None: + # a special case: this is only an acknowledgment, so any request/response accounting can be cleared + json_obj = {} + if sid is not None: + json_obj['sid'] = sid + if tid is not None: + json_obj['tid'] = tid + socket.send(json_obj) + + +class SocketHandler(object): + + def read(self): + pass + + def destroy(self): + pass + + +class ServerSocket(SocketHandler): + + def __init__(self, communication, hostname='localhost', port=19732): + self.communication = communication + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server_socket.bind((hostname, port)) + server_socket.listen(5) + server_socket.setblocking(False) + self.socket = server_socket + + def read(self): + conn, addr = self.socket.accept() + self.communication.add_socket(JsonClientSocket(self.communication, conn, addr)) + + def destroy(self): + self.communication.remove_socket(self) + try: + self.socket.close() + except: + pass + + +class JsonClientSocket(SocketHandler): + + def __init__(self, communication, socket, address): + self.communication = communication + self.socket = socket + self.address = address + socket.setblocking(False) + self.status = 0 # registering + self.receiving_header = True + self.buffer = [] + self.received_length = 0 # the cumulative length of strings in buffer + self.body_length = 0 # the length of the next packet, given in the header + self._read_lock = threading.Lock() + self._write_lock = threading.Lock() + + def read(self): + lock = self._read_lock + lock.acquire() + try: + data = self.socket.recv(8192) + if not data: + self.destroy() + else: + # packet decode loop + data = data.decode('utf-8') + print("received: {}".format(data)) + offset = 0 + N = len(data) + while (offset < N): + if self.receiving_header: + i = data.find(':', offset) + if i < 0: + self.buffer.append(data[offset:]) + break + if len(self.buffer) == 0: + self.body_length = int(data[offset:i]) + else: + self.buffer.append(data[offset:i]) + self.body_length = int(''.join(self.buffer)) + self.buffer.clear() + offset = i + 1 + self.receiving_header = False + continue + elif (self.received_length + N - offset) <= self.body_length: + if self.received_length == 0: + # an optimized case + s = data[offset:offset+self.body_length] + offset += len(s) + else: + s = data[offset:offset+self.body_length - self.received_length] # TODO: is this correct? + offset += len(s) + self.buffer.append(s) + s = ''.join(self.buffer) + self.buffer.clear() + self.received_length = 0 + self.receiving_header = True + try: + processIncomingPacket(self, s) + except Exception as e: + # any exception that propagates to here means a possible protocol error, we have to disconnect + traceback.print_exc() + self.destroy() + return + else: + # incomplete packet body, buffer it until the next time + s = data[offset:] + self.buffer.append(s) + self.received_length += len(s) + break + finally: + lock.release() + + def destroy(self): + self.communication.remove_socket(self) + try: + self.socket.close() + except: + pass + # TODO: unregister from any internal mechanisms + + def send(self, json_obj): + js = json.dumps(json_obj) + m = str(len(js)) + u':' + js + bytes = m.encode('utf-8') + lock = self._write_lock + lock.acquire() + try: + self.socket.sendall(bytes) + finally: + lock.release() + + +class Communication(SocketHandler): + def __init__(self): + # the selector to register sockets with, the serve_forever() uses it to handle any incoming data + self._selector = selectors.DefaultSelector() # to wait until at least on socket is readable + self._lock = threading.Lock() # the access lock, because of multi-threading + self._remove_queue = [] # what sockets to remove + self._add_queue = [] # what sockets to add + notifier, receiver = socket.socketpair() # the notification mechanism for the serving thread, using the selector + receiver.setblocking(False) + self._notifier = notifier + self._receiver = receiver + self._registered_handlers = set() + self._not_signalled = True + self._destroying = False + + def serve_forever(self): + self._selector.register(self._receiver, selectors.EVENT_READ, self) + while True: + events = self._selector.select() + for key, mask in events: + if mask & selectors.EVENT_READ: + key.data.read() + + def add_socket(self, socket_handler): + lock = self._lock + lock.acquire() + try: + self._add_queue.append(socket_handler) + self._signal() + finally: + lock.release() + + def remove_socket(self, socket_handler): + lock = self._lock + lock.acquire() + try: + self._remove_queue.append(socket_handler) + self._signal() + finally: + lock.release() + + def _signal(self): # invoke holding the lock + if self._not_signalled: + self._notifier.send(b'1') + self._not_signalled = False + + def read(self): + # we have been invoked to process some new additions and/or removals + lock = self._lock + lock.acquire() + try: + self._receiver.recv(1024) # clear the queue + self._not_signalled = True + selector = self._selector + for handler in self._remove_queue: + selector.unregister(handler.socket) + self._registered_handlers.remove(handler) + self._remove_queue.clear() + for handler in self._add_queue: + selector.register(handler.socket, selectors.EVENT_READ, handler) + self._registered_handlers.add(handler) + self._add_queue.clear() + if self._destroying: + self._notifier.close() + self._receiver.close() + self._selector.close() + finally: + lock.release() + + def destroy(self): + lock = self._lock + # make a copy of handlers, because as we're destroying them each will invoke remove_socket(), and for that the lock must be available and the iterating set must not change + lock.acquire() + try: + handlers = set(self._registered_handlers) + finally: + lock.release() + for handler in handlers: + handler.destroy() # destroy them all, even the server socket + # this is where everything is destroyed but us, so what follows is self-destruct + lock.acquire() + try: + self._destroying = True + self._signal() + finally: + lock.release() + + +def serve_forever(): + communication = Communication() + server = ServerSocket(communication) + communication.add_socket(server) + communication.serve_forever() diff --git a/server/user_session.py b/server/user_session.py index f21102c..5eec743 100644 --- a/server/user_session.py +++ b/server/user_session.py @@ -8,6 +8,7 @@ import random from . import prolog_session from . import python_session from . import problems +from . import handlers import db from errors.session import NoSuchSession, AuthenticationFailed import psycopg2.extras @@ -128,6 +129,16 @@ class UserSession(object): finally: db.return_connection(conn) + def send(self, json_obj): + """Sends a message to the user. + + This method may be used only for messages that are not replies to requests. + For replies use the reply() method on the Request object. + :param json_obj: a dict representing the json message + :return: None + """ + handlers.send(None, self.sid, json_obj) + def __del__(self): # no locking needed if GC is removing us, as there cannot be any concurrent access by definition if hasattr(self, 'prolog_session') and (self.prolog_session is not None): -- cgit v1.2.1