diff options
author | Aleš Smodiš <aless@guru.si> | 2015-09-14 14:49:54 +0200 |
---|---|---|
committer | Aleš Smodiš <aless@guru.si> | 2015-09-14 14:49:54 +0200 |
commit | afdb3c0b28715b3d9a0982e4e0504a0cbcf11e70 (patch) | |
tree | d052125c6fc4fb4803b10dd20d3c24fc3a2711f9 | |
parent | d82013c214021d6e5480d18105760fa70cfc708b (diff) |
Reimplemented communication with the client side.
* Implemented a node web server supporting asynchronous websocket and long-polling communication with clients.
* Implemented TCP communication between python middleware and node web server.
-rw-r--r-- | client/__init__.py | 2 | ||||
-rw-r--r-- | daemon.py | 6 | ||||
-rw-r--r-- | errors/session.py | 3 | ||||
-rw-r--r-- | readme.md | 11 | ||||
-rw-r--r-- | server/__init__.py | 7 | ||||
-rw-r--r-- | server/handlers.py | 302 | ||||
-rw-r--r-- | server/problems.py | 3 | ||||
-rw-r--r-- | server/socket.py | 286 | ||||
-rw-r--r-- | server/user_session.py | 11 | ||||
-rw-r--r-- | web/.gitignore | 1 | ||||
-rw-r--r-- | web/main.js | 552 | ||||
-rw-r--r-- | web/package.json | 9 | ||||
-rw-r--r-- | wsgi_server.py | 3 |
13 files changed, 1189 insertions, 7 deletions
diff --git a/client/__init__.py b/client/__init__.py index 2cffb52..1c5d693 100644 --- a/client/__init__.py +++ b/client/__init__.py @@ -1,5 +1,7 @@ # coding=utf-8 +# TODO: this module is deprecated, remove it + import multiprocessing.managers __all__ = ['get_session_by_id', 'get_or_create_session'] @@ -2,5 +2,7 @@ # coding=utf-8 if __name__ == '__main__': - import server - server.start() + # import server + # server.start() + import server.socket + server.socket.serve_forever() diff --git a/errors/session.py b/errors/session.py index d173fbf..c5170d8 100644 --- a/errors/session.py +++ b/errors/session.py @@ -5,3 +5,6 @@ class NoSuchSession(Exception): class AuthenticationFailed(Exception): pass + +class RequestProcessingError(Exception): + pass
\ No newline at end of file @@ -10,22 +10,25 @@ Install the following packages: - python3-termcolor - python3-waitress - swi-prolog-nox + - nodejs (>= 10.0.22) Settings: - point webroot to codeq-web - - set up reverse proxy for /svc/ to wsgi server: - ProxyPass /svc/ http://localhost:8082/ - ProxyPassReverse /svc/ http://localhost:8082/ + - set up reverse proxy for /ws/ to the node server: + ProxyPass /ws/ http://localhost:8083/ + ProxyPassReverse /ws/ http://localhost:8083/ - set _path_prefix in server.problems - set DB settings in db + - run "npm install" inside the "web" directory to install all dependencies + (they will be installed inside the "web" directory) Running: - run prolog/runner/main.pl - run python/runner/main.py - run daemon.py - - run wsgi_server.py + - start the node process (node web/main.js) Misc. ===== diff --git a/server/__init__.py b/server/__init__.py index 284c162..861a7df 100644 --- a/server/__init__.py +++ b/server/__init__.py @@ -3,9 +3,14 @@ import multiprocessing.managers from . import user_session from . import prolog_session +from . import python_session +from . import socket import server.problems -__all__ = ['user_session', 'prolog_session', 'problems', 'start'] +__all__ = ['socket', 'handlers', 'user_session', 'prolog_session', 'python_session', 'problems', 'start'] + + +# TODO: everything below is deprecated, remove it class Codeq(object): _method_to_typeid_ = { diff --git a/server/handlers.py b/server/handlers.py new file mode 100644 index 0000000..e9cf7c8 --- /dev/null +++ b/server/handlers.py @@ -0,0 +1,302 @@ +# coding=utf-8 + +from concurrent.futures import ThreadPoolExecutor +import traceback +from errors.session import * +import server + + +class CodeqService(object): + """Base class for all CodeQ services. + """ + session_is_optional = False + + def process(self, request): + pass + + +class ProblemList(CodeqService): + """List all available problems to the client. + """ + session_is_optional = True + + def process(self, request): + request.reply({'code': 0, 'message': 'ok', 'problems': server.problems.list_problems()}) + + +class Login(CodeqService): + """Logs in a client, creating a new session. + """ + session_is_optional = True + + def process(self, request): + js = request.data + username = js.get('username') + password = js.get('password') + if username is None: + request.reply({'code': 1, 'message': 'Username was not provided'}) + elif password is None: + request.reply({'code': 2, 'message': 'Password was not provided'}) + else: + try: + session = server.user_session.authenticate_and_create_session(username, password) + except AuthenticationFailed: + request.reply({'code': 3, 'message': 'Username or password do not match'}) + else: + if request.session: + request.session.destroy() + request.reply({'code': 0, 'message': 'OK', 'sid':session.get_sid()}) + + +class Activity(CodeqService): + def process(self, request): + js = request.data + trace = js.get('trace') + solution = js.get('solution') + problem_id = js.get('problem_id') + if (trace is not None) or (solution is not None): + # we have something to do + if problem_id is None: + request.reply({'code': 1, 'message': 'Problem ID is missing'}) + else: + request.session.update_solution(problem_id, trace, solution) + request.end() # no feedback, just acknowledge the reception + + +class Query(CodeqService): + def process(self, request): + js = request.data + step = js.get('step') + if step is None: + request.reply({'code': 1, 'message': '"step" is not set'}) + else: + problem_id = js.get('problem_id') + if problem_id is None: + request.reply({'code': 4, 'message': 'Problem ID not given'}) + else: + session = request.session + trace = js.get('trace') + prolog = session.get_prolog() + program = None + if step == 'run': + program = js.get('program') + query = js.get('query') + if program is None: + result = {'code': 2, 'message': 'No program specified'} + elif query is None: + result = {'code': 3, 'message': 'No query specified'} + else: + messages, status, have_more = prolog.run_for_user(session.get_uid(), problem_id, program, query) + result = {'code': 0, 'message': 'ok', 'terminal': {'messages': messages, 'status': status, 'have_more': have_more}} + elif step == 'next': + messages, status, have_more = prolog.step() + result = {'code': 0, 'message': 'ok', 'terminal': {'messages': messages, 'status': status, 'have_more': have_more}} + elif step == 'end': + messages, status, have_more = prolog.end() + result = {'code': 0, 'message': 'ok', 'terminal': {'messages': messages, 'status': status, 'have_more': have_more}} + else: + result = {'code': 5, 'message': 'Unknown prolog step: {0}'.format(step)} + if program or trace: + session.update_solution(problem_id, trace, program) + request.reply(result) + + +# Pull stdout/stderr from the session's Python interpreter. TODO: convert to async handling +class PythonPull(CodeqService): + def process(self, request): + python = request.session.get_python() + output = python.pull() + request.reply({'code': 0, 'message': 'ok', 'terminal': {'text': output if output else ''}}) + + +# Push stdin to the session's Python interpreter. TODO: convert to async handling +class PythonPush(CodeqService): + def process(self, request): + text = request.data.get('text') + if text is None: + request.reply({'code': 1, 'message': 'No input specified'}) + else: + python = request.session.get_python() + python.push(text) + request.reply({'code': 0, 'message': 'ok'}) + + +class Hint(CodeqService): + def process(self, request): + js = request.data + language = js.get('language') + problem_id = js.get('problem_id') + program = js.get('program') + + if problem_id is None: + request.reply({'code': 1, 'message': 'No problem ID specified'}) + elif program is None: + request.reply({'code': 2, 'message': 'No program specified'}) + else: + session = request.session + lang_session = None + if language == 'prolog': + lang_session = session.get_prolog() + elif language == 'python': + lang_session = session.get_python() + + if lang_session is None: + request.reply({'code': 3, 'message': 'Unknown language specified'}) + else: + hints = lang_session.hint(session.get_sid(), problem_id, program) + request.reply({'code': 0, 'message': 'ok', 'hints': hints}) + + +class Test(CodeqService): + def process(self, request): + js = request.data + language = js.get('language') + problem_id = js.get('problem_id') + program = js.get('program') + + if problem_id is None: + request.reply({'code': 1, 'message': 'No problem ID specified'}) + elif program is None: + request.reply({'code': 2, 'message': 'No program specified'}) + else: + session = request.session + lang_session = None + if language == 'prolog': + lang_session = session.get_prolog() + elif language == 'python': + lang_session = session.get_python() + + if lang_session is None: + request.reply({'code': 3, 'message': 'Unknown language specified'}) + else: + hints = lang_session.test(session.get_sid(), problem_id, program) + request.reply({'code': 0, 'message': 'ok', 'hints': hints}) + + +class GetProblem(CodeqService): + def process(self, request): + js = request.data + language = js.get('language') + problem_group = js.get('problem_group') + problem = js.get('problem') + if language is None: + request.reply({'code': 1, 'message': 'Language identifier not given'}) + elif problem_group is None: + request.reply({'code': 2, 'message': 'Problem group identifier not given'}) + elif problem is None: + request.reply({'code': 3, 'message': 'Problem identifier not given'}) + else: + request.reply({'code': 0, 'message': 'ok', 'data': request.session.get_problem_data(language, problem_group, problem)}) + + +# maps actions to their handlers +incoming_handlers = { + 'list_problems': ProblemList(), + 'login': Login(), + 'get_problem': GetProblem(), + 'logout': None, + 'activity': Activity(), + 'query': Query(), + 'hint': Hint(), + 'test': Test() +} + + +class Request(object): + def __init__(self, tid, original_sid, session, data): + """Creates a new request + + :param tid: communicator-level transaction ID (global relative to the specific communicator where it originated) + :param original_sid: session ID, optional + :param session: the actual session with the original_sid, if it exists; the processor may swap it for a new session + :param data: the request data from the client + :return: new instance + """ + self._tid = tid + self._original_sid = original_sid + self.session = session + self.data = data + self.is_finished = False + + def reply(self, data): + """Reply to this request. + + :param data: the dictionary representing the reply, that will be converted to JSON + :return: None + """ + if data is None: + self.end() + if self._original_sid is not None: + sid = data.get('sid') + if sid is None: + data['sid'] = self._original_sid + elif sid != self._original_sid: + data['sid'] = self._original_sid + data['new_sid'] = sid + # it is important to reply with the same tid and sid parameters as were in the request, so message accounting doesn't get confused + send(self._tid, self._original_sid, data) + self.is_finished = True + + def end(self): + """Conclude the request, without sending a response. + + This is to acknowledge that the response has been received. + :return: None + """ + send(self._tid, self._original_sid, None) + self.is_finished = True + + +########## low-level machinery, subject to change to support more than the single socket communicator ########## + +_executor = ThreadPoolExecutor(max_workers=100) + +def _invoke_handler(handler, request): + try: + print('Worker thread processing data={}'.format(str(request.data))) + handler.process(request) + if not request.is_finished: + print('ERROR: the request was not concluded!') + request.reply({'code': -1, 'message': 'Request processing did not provide a reply'}) + print('Processing finished') + except Exception as e: + print('ERROR: data processing failed: ' + str(e)) + traceback.print_exc() + request.reply({'code': -1, 'message': 'Internal error: ' + str(e)}) + +def serve_request(json_obj): + if not isinstance(json_obj, dict): + raise RequestProcessingError('Require a request represented as a dict, instead got: ' + str(type(json_obj))) + tid = json_obj.get('tid') # session ID and transaction ID uniquely identify a transaction + sid = json_obj.get('sid') + action = json_obj.get('action') + if action is None: + raise RequestProcessingError('Request does not contain an action') + if not isinstance(action, str): + raise RequestProcessingError('Requested action must be a string, got: ' + str(type(action))) + handler = incoming_handlers.get(action) + if handler is None: + raise RequestProcessingError('No handler for ' + action) + print("Attempting to serve action={}".format(action)) + session = None + if sid is None: + if not handler.session_is_optional: + raise RequestProcessingError('Request is missing a session-ID') + else: + del json_obj['sid'] + try: + session = server.user_session.get_session_by_id(sid) + except NoSuchSession: + if not handler.session_is_optional: + raise RequestProcessingError('This user session has expired. Please log-in again.') + _executor.submit(_invoke_handler, handler, Request(tid, sid, session, json_obj)) + +def send(tid, sid, json_obj): + # just a proxy function for now + print('Sending reply: {}'.format(str(json_obj))) + server.socket.sendPacket(tid, sid, json_obj) + +def stop(): + global _executor + _executor.shutdown() + _executor = None diff --git a/server/problems.py b/server/problems.py index 02f0307..1c87345 100644 --- a/server/problems.py +++ b/server/problems.py @@ -19,6 +19,9 @@ def load_module(fullname): d = os.path.join(_path_prefix, *parts[:-1]) ff = importlib.machinery.FileFinder(d, (importlib.machinery.SourceFileLoader, ['.py'])) spec = ff.find_spec(fullname) + if spec is None: + print('ERROR: there is no problem module {0}'.format(fullname)) + return None mod = type(sys)(fullname) mod.__loader__ = spec.loader mod.__package__ = spec.parent diff --git a/server/socket.py b/server/socket.py new file mode 100644 index 0000000..c248056 --- /dev/null +++ b/server/socket.py @@ -0,0 +1,286 @@ +# coding=utf-8 + +import socket +import selectors +import json +import threading +import traceback +from server.handlers import serve_request + +# TODO: add a whole lot of try..except blocks, and just make it overall error resistant +# TODO: add logging + +_mapping_lock = threading.Lock() # the lock guarding access to the two mappings below + +_sessions_to_socket = {} # keyed by sid: what session is bound to what socket + +_transactions_to_socket = {} # keyed by tid, used only when there is no sid available, so a request can be replied + + +def processIncomingPacket(receiving_socket, packet): + print('Decocoding JSON: {}'.format(packet)) + obj = json.loads(packet) + if obj.get('type') == 'connect': + return # TODO: first packet is 'connect', providing the list of connected sessions to the peer + tid = obj.get('tid') # transaction ID + if tid is None: + raise Exception('Transaction ID is missing from the request') + sid = obj.get('sid') # session ID + _mapping_lock.acquire() + try: + if sid is None: + _transactions_to_socket[tid] = receiving_socket + else: + _sessions_to_socket[sid] = receiving_socket + finally: + _mapping_lock.release() + serve_request(obj) + + +def sendPacket(tid, sid, json_obj): + if sid is None: + if tid is None: + raise Exception('Cannot send a message without a tid and/or a sid') + _mapping_lock.acquire() + try: + socket = _transactions_to_socket.get(tid) + del _transactions_to_socket[tid] # this mapping is not relevant anymore + finally: + _mapping_lock.release() + if socket is None: + raise Exception('Cannot send in transaction, it is not registered: ' + tid) + else: + _mapping_lock.acquire() + try: + socket = _sessions_to_socket.get(sid) + finally: + _mapping_lock.release() + if socket is None: + raise Exception('Cannot send to session, it is not registered: ' + sid) + if json_obj is None: + # a special case: this is only an acknowledgment, so any request/response accounting can be cleared + json_obj = {} + if sid is not None: + json_obj['sid'] = sid + if tid is not None: + json_obj['tid'] = tid + socket.send(json_obj) + + +class SocketHandler(object): + + def read(self): + pass + + def destroy(self): + pass + + +class ServerSocket(SocketHandler): + + def __init__(self, communication, hostname='localhost', port=19732): + self.communication = communication + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server_socket.bind((hostname, port)) + server_socket.listen(5) + server_socket.setblocking(False) + self.socket = server_socket + + def read(self): + conn, addr = self.socket.accept() + self.communication.add_socket(JsonClientSocket(self.communication, conn, addr)) + + def destroy(self): + self.communication.remove_socket(self) + try: + self.socket.close() + except: + pass + + +class JsonClientSocket(SocketHandler): + + def __init__(self, communication, socket, address): + self.communication = communication + self.socket = socket + self.address = address + socket.setblocking(False) + self.status = 0 # registering + self.receiving_header = True + self.buffer = [] + self.received_length = 0 # the cumulative length of strings in buffer + self.body_length = 0 # the length of the next packet, given in the header + self._read_lock = threading.Lock() + self._write_lock = threading.Lock() + + def read(self): + lock = self._read_lock + lock.acquire() + try: + data = self.socket.recv(8192) + if not data: + self.destroy() + else: + # packet decode loop + data = data.decode('utf-8') + print("received: {}".format(data)) + offset = 0 + N = len(data) + while (offset < N): + if self.receiving_header: + i = data.find(':', offset) + if i < 0: + self.buffer.append(data[offset:]) + break + if len(self.buffer) == 0: + self.body_length = int(data[offset:i]) + else: + self.buffer.append(data[offset:i]) + self.body_length = int(''.join(self.buffer)) + self.buffer.clear() + offset = i + 1 + self.receiving_header = False + continue + elif (self.received_length + N - offset) <= self.body_length: + if self.received_length == 0: + # an optimized case + s = data[offset:offset+self.body_length] + offset += len(s) + else: + s = data[offset:offset+self.body_length - self.received_length] # TODO: is this correct? + offset += len(s) + self.buffer.append(s) + s = ''.join(self.buffer) + self.buffer.clear() + self.received_length = 0 + self.receiving_header = True + try: + processIncomingPacket(self, s) + except Exception as e: + # any exception that propagates to here means a possible protocol error, we have to disconnect + traceback.print_exc() + self.destroy() + return + else: + # incomplete packet body, buffer it until the next time + s = data[offset:] + self.buffer.append(s) + self.received_length += len(s) + break + finally: + lock.release() + + def destroy(self): + self.communication.remove_socket(self) + try: + self.socket.close() + except: + pass + # TODO: unregister from any internal mechanisms + + def send(self, json_obj): + js = json.dumps(json_obj) + m = str(len(js)) + u':' + js + bytes = m.encode('utf-8') + lock = self._write_lock + lock.acquire() + try: + self.socket.sendall(bytes) + finally: + lock.release() + + +class Communication(SocketHandler): + def __init__(self): + # the selector to register sockets with, the serve_forever() uses it to handle any incoming data + self._selector = selectors.DefaultSelector() # to wait until at least on socket is readable + self._lock = threading.Lock() # the access lock, because of multi-threading + self._remove_queue = [] # what sockets to remove + self._add_queue = [] # what sockets to add + notifier, receiver = socket.socketpair() # the notification mechanism for the serving thread, using the selector + receiver.setblocking(False) + self._notifier = notifier + self._receiver = receiver + self._registered_handlers = set() + self._not_signalled = True + self._destroying = False + + def serve_forever(self): + self._selector.register(self._receiver, selectors.EVENT_READ, self) + while True: + events = self._selector.select() + for key, mask in events: + if mask & selectors.EVENT_READ: + key.data.read() + + def add_socket(self, socket_handler): + lock = self._lock + lock.acquire() + try: + self._add_queue.append(socket_handler) + self._signal() + finally: + lock.release() + + def remove_socket(self, socket_handler): + lock = self._lock + lock.acquire() + try: + self._remove_queue.append(socket_handler) + self._signal() + finally: + lock.release() + + def _signal(self): # invoke holding the lock + if self._not_signalled: + self._notifier.send(b'1') + self._not_signalled = False + + def read(self): + # we have been invoked to process some new additions and/or removals + lock = self._lock + lock.acquire() + try: + self._receiver.recv(1024) # clear the queue + self._not_signalled = True + selector = self._selector + for handler in self._remove_queue: + selector.unregister(handler.socket) + self._registered_handlers.remove(handler) + self._remove_queue.clear() + for handler in self._add_queue: + selector.register(handler.socket, selectors.EVENT_READ, handler) + self._registered_handlers.add(handler) + self._add_queue.clear() + if self._destroying: + self._notifier.close() + self._receiver.close() + self._selector.close() + finally: + lock.release() + + def destroy(self): + lock = self._lock + # make a copy of handlers, because as we're destroying them each will invoke remove_socket(), and for that the lock must be available and the iterating set must not change + lock.acquire() + try: + handlers = set(self._registered_handlers) + finally: + lock.release() + for handler in handlers: + handler.destroy() # destroy them all, even the server socket + # this is where everything is destroyed but us, so what follows is self-destruct + lock.acquire() + try: + self._destroying = True + self._signal() + finally: + lock.release() + + +def serve_forever(): + communication = Communication() + server = ServerSocket(communication) + communication.add_socket(server) + communication.serve_forever() diff --git a/server/user_session.py b/server/user_session.py index f21102c..5eec743 100644 --- a/server/user_session.py +++ b/server/user_session.py @@ -8,6 +8,7 @@ import random from . import prolog_session from . import python_session from . import problems +from . import handlers import db from errors.session import NoSuchSession, AuthenticationFailed import psycopg2.extras @@ -128,6 +129,16 @@ class UserSession(object): finally: db.return_connection(conn) + def send(self, json_obj): + """Sends a message to the user. + + This method may be used only for messages that are not replies to requests. + For replies use the reply() method on the Request object. + :param json_obj: a dict representing the json message + :return: None + """ + handlers.send(None, self.sid, json_obj) + def __del__(self): # no locking needed if GC is removing us, as there cannot be any concurrent access by definition if hasattr(self, 'prolog_session') and (self.prolog_session is not None): diff --git a/web/.gitignore b/web/.gitignore new file mode 100644 index 0000000..3c3629e --- /dev/null +++ b/web/.gitignore @@ -0,0 +1 @@ +node_modules diff --git a/web/main.js b/web/main.js new file mode 100644 index 0000000..1a299db --- /dev/null +++ b/web/main.js @@ -0,0 +1,552 @@ +var engine = require('engine.io'), + http_server = require('http').createServer(), + net = require('net'), + Promise = require('bluebird'), + log4js = require('log4js'); + +var logger = log4js.getLogger(); // TODO: log to a file + +http_server.listen(8083); + +var server = new engine.Server({ + 'pingTimeout': 60000, // in ms + 'pingInterval': 25000, // in ms + 'maxHttpBufferSize': 100000, // in bytes, for polling + 'transports': ['polling', 'websocket'], + 'cookie': false // no cookies +}); + +http_server.on('request', function (request, response) { + var uriParts = request.url.split('/'); // uriParts[0] will be an empty string, because uri must start with a / + + logger.debug('HTTP server request, URL: ' + request.url); + + if ((uriParts.length <= 1) || (uriParts[1] === 'ws')) { + server.handleRequest(request, response); + } + else { + response.writeHead(404, {'Content-Type': 'text/plain'}); + response.write('Not found'); + response.end(); + } +}); + +http_server.on('upgrade', function (request, socket, head) { + server.handleUpgrade(request, socket, head); +}); + + +// connected clients +var sessions = { + // sid: sessions description {sid: string, lastActivity: Date.now(), socket: net.Socket} +}; + +// GUI action handlers, keyed by the action name, values are functions that take the session and the message +var guiHandlers = { + // TODO: list_problems is outside sessions for now; later it must become session-aware + 'list_problems': function actionListProblems(session, message) { + var tid = message['tid']; // remember any TID that is set + delete message['tid']; // delete any TID, we must use an internal one when sending to Python, because it must be unique over all connected sessions + logger.debug('Received a list_problems request from GUI'); + sendDataToPython(message).then( + function listProblemsRequestOK(response) { + if ((typeof tid !== 'undefined') && (tid !== null)) response['tid'] = tid; + else delete response['tid']; + session.send(response); + }, + function listProblemsRequestFailed(err) { + var response = { + 'code': -30 + }, + reason; + logger.debug('Failed to request list_problems from Python'); + if ((typeof tid !== 'undefined') && (tid !== null)) response['tid'] = tid; + if ((typeof err === 'object') && (err !== null)) reason = err.toString(); + else reason = '' + err; + response['message'] = 'List-problems request failed: ' + reason; + session.end(response); + } + ).done(); + }, + + 'login': function actionLogin(session, message) { + // first-time connect: login + var tid = message['tid']; // remember any TID that is set + delete message['tid']; // delete any TID, we must use an internal one + logger.debug('Received a login request from GUI'); + sendDataToPython(message).then( + function loginRequestOK(response) { + var sid, existingSession; + if ((typeof tid !== 'undefined') && (tid !== null)) response['tid'] = tid; + else delete response['tid']; + if (response.code !== 0) { + logger.debug('Python rejected login request from GUI'); + session.end(response); + } + else { + logger.debug('Python accepted login request from GUI'); + session.sid = sid = response.sid; + existingSession = sessions[sid]; + sessions[sid] = session; + session.send(response); + if (existingSession) { + existingSession.end({'code': -40, 'message': 'Supplanted with a new connection for the same session', 'sid': sid}); + } + } + }, + function loginRequestFailed(err) { + var response = { + 'code': -30 + }, + reason; + logger.debug('Failed to request login from Python'); + if ((typeof tid !== 'undefined') && (tid !== null)) response['tid'] = tid; + if ((typeof err === 'object') && (err !== null)) reason = err.toString(); + else reason = '' + err; + response['message'] = 'Login request failed: ' + reason; + session.end(response); + } + ).done(); + }, + + //'resume': function actionResume(session, message) { + // // reconnect: resume the session after unexpected disconnect + // + //}, + + 'logout': function actionLogout(session, message) { + // logout, the user quit the app + logger.debug('Logout GUI'); + sendDataToPython(message).finally(function () { + session.end({'code': 9999, 'message': 'Bye.'}); + }).done(); + }, + + 'activity': function actionActivity(session, message) { + logger.debug('Received activity from GUI'); + sendDataToPython(message).catch(session.end).done(); + }, + + 'query': function actionQuery(session, message) { + logger.debug('Received query from GUI'); + sendDataToPython(message).then(session.send, session.end).done(); + }, + + 'hint': function actionHint(session, message) { + logger.debug('Received hint from GUI'); + sendDataToPython(message).then(session.send, session.end).done(); + }, + + 'test': function actionTest(session, message) { + logger.debug('Received test from GUI'); + sendDataToPython(message).then(session.send, session.end).done(); + }, + + 'get_problem': function actionTest(session, message) { + logger.debug('Received get_problem from GUI'); + sendDataToPython(message).then(session.send, session.end).done(); + } +}; + +server.on('connection', function (socket) { + var session, + fatal = function (jsonObj) { + // we need to preserve the socket object in a variable, so we can invoke close() on it later + var s = socket, m; + if (s === null) return; + if (jsonObj instanceof Error) jsonObj = {'code': -50, 'message': jsonObj.toString()}; + m = JSON.stringify(jsonObj); + logger.debug('Sending to GUI from fatal(): ' + m); + try { + s.send(m, function () { + s.close(); + s = null; + }); + } + catch (e) {} // TODO: log + socket = null; // this socket will no longer be processed + session.socket = null; + }, + reply = function (jsonObj) { + var m; + if (!socket) return false; + m = JSON.stringify(jsonObj); + logger.debug('Sending to GUI from reply(): ' + m); + try { + socket.send(m); + return true; + } + catch (e) { + socket = null; // this socket will no longer be processed + session.socket = null; + return false; + } + }; + + logger.debug('A new client connection established'); + + session = { + 'sid': null, // session id, null means the python hasn't authenticated us yet + 'lastActivity': Date.now(), + 'socket': socket, + 'lastTID': -1, // last transaction ID, anything less than this we've already handled + 'send': reply, + 'end': fatal + }; + + socket.on('close', function (reason, description) { // description is optional + logger.debug('GUI socket closed'); + if (session.sid !== null) { + if (sessions[session.sid] === session) delete sessions[session.sid]; + sendDataToPython({'type': 'unregister', 'sid': session.sid}); + } + }); + + socket.on('message', function (data) { + var m, tid, action, handler; + + if (socket === null) { + // this socket is not being processed anymore + logger.debug('Received something from GUI, but the socket is not being processed anymore'); + return; + } + + if (typeof data !== 'string') { + logger.debug('Received from GUI a binary message'); + fatal({"code": -1, "message": "Can handle only text messages."}); + return; + } + + logger.debug('Received from GUI: ' + data); + + try { + m = JSON.parse(data); + } + catch (e) { + fatal({"code": -2, "message": "Not a JSON message."}); + return; + } + + tid = m['tid']; // transaction ID + if (typeof tid !== 'number') { + fatal({"code": -3, "message": "Transaction ID is missing or is not an integer."}); + return; + } + + if (Math.floor(tid) !== tid) { + fatal({"code": -4, "message": "Transaction ID is not an integer."}); + return; + } + + session.lastActivity = Date.now(); + + if (tid <= session.lastTID) { + // we already met this message, ignore it + return; + } + + action = m['action']; + if (typeof action !== 'string') { + if (typeof action === 'undefined') { + reply({"code": -10, "tid": tid, "message": "Action is not set."}); + } + else if (action === null) { + reply({"code": -11, "tid": tid, "message": "Action is null."}); + } + else { + reply({"code": -12, "tid": tid, "message": "Action is not a string."}); + } + return; + } + + if (!action) { + reply({"code": -13, "tid": tid, "message": "Action is an empty string."}); + return; + } + + handler = guiHandlers[action]; + if (!handler) { + reply({"code": -14, "tid": tid, "message": "Action is not understood."}); + return; + } + + try { + handler(session, m); + session.lastTID = tid; + } + catch (e) { + reply({"code": -20, "tid": tid, "message": "Processing failed: " + e.toString()}); + } + }); +}); + + +// ========== Python server connection ========== + +var pythonClient = null, // the Socket for communication with the python server + pythonSaturated = false, // whether the kernel-level buffer is full, in this case only push into the pythonQueue + pythonQueue = [], // queue of packets, waiting to be sent + pythonPromises = {}; // promises waiting to be resolved with incoming data from python; key is TID, or TID:SID if SID is available, value is {'created': Date.now(), resolve: function, reject: function} + +var pythonPromisesTimeout = setInterval(function () { + var floor = Date.now() - 300000, // timeout whatever is waiting for more than 5 minutes + key, resolver; + for (key in pythonPromises) { + if (pythonPromises.hasOwnProperty(key)) { + resolver = pythonPromises[key]; + if ((typeof resolver === 'object') && (resolver !== null) && (resolver.created < floor)) { + delete pythonPromises[key]; + resolver.reject(new Error('Timeout waiting for reply from server')); + logger.warn('Python promise timed out at key ' + key); + } + } + } +}, 60000); // once every minute + +var processPacketFromPython = function processPacketFromPythonFunc(packetString) { + var m, session, isDummy, + key = null, + resolver = null; + + logger.debug('Received from Python: ' + packetString); + + try { + m = JSON.parse(packetString); + } + catch (e) { + logger.error('Failed to parse JSON data from Python: ' + e.toString() + '\n * The packet that caused the error: ' + packetString); + return; + } + + if ((typeof m.tid !== 'undefined') && (m.tid !== null)) { + if ((typeof m.tid === 'string') && (m.tid.charAt(0) === 'i')) { + // internal TID: only one way to key this one + key = m.tid; + resolver = pythonPromises[key]; + } + else if (m.sid) { + key = m.tid + ':' + m.sid; + resolver = pythonPromises[key]; + if (!resolver) { + key = '' + m.tid + resolver = pythonPromises[key]; + } + } + else { + key = '' + m.tid; + resolver = pythonPromises[key]; + } + if (resolver) delete pythonPromises[key]; + } + + if (!m.tid || !resolver) { + // no promise is waiting, because this is a generated message, not a response to a previously sent message + isDummy = true; + for (key in m) { + if (m.hasOwnProperty(key) && (key !== 'tid') && (key != 'sid')) { + isDummy = false; + break; + } + } + if (isDummy) { + logger.debug('Message from Python is a dummy message, ignoring'); + return; + } + if (!m.sid) { + // no session ID -> nowhere to send the message + logger.error('Message from Python dropped because there is no outstanding promise for it, and no session ID has been set'); + return; + } + session = sessions[m.sid]; + if (session) { + logger.debug('Proxying a message from Python directly to GUI, as there is no outstanding promise for it (probably a generated message)'); + session.send(m); + } + else { + logger.error('Message from Python dropped because there is no outstanding promise for it, and no active session with ID ' + m.sid); + } + return; + } + + logger.debug('Invoking the promise that is waiting for this Python message'); + resolver.resolve(m); +}; + +// this is internal function to python connection handler +var sendPacketToPython = function sendPacketToPythonFunc(packet) { + if (pythonSaturated || (pythonClient === null)) return false; + logger.debug('Attempting to send a packet to Python: ' + packet); + try { + pythonSaturated = !pythonClient.write(packet, 'utf8'); + return true; + } + catch (e) { + // an error occurred: disconnect; the close event handler will trigger a connection retry + logger.error('Failed to write to the Python client connection: ' + e.toString()); + try { + pythonClient.close(); + } + catch (e2) { + logger.error('Failed to close() the Python client connection: ' + e2.toString()); + } + pythonClient = null; + return false; + } +}; + +var internalTID = 0; +var sendDataToPython = function sendDataToPythontFunc(packetObj) { + var s, key; + if ((typeof packetObj.tid !== 'undefined') && (packetObj.tid !== null)) { + if ((typeof packetObj.tid === 'string') && (packetObj.tid.charAt(0) === 'i')) key = packetObj.tid; // it bears a TID that is marked as internal one, there's only one way to key this + else if (packetObj.sid) key = packetObj.tid + ':' + packetObj.sid; + else key = '' + packetObj.tid; + } + else { + // no TID exists: set the internal one, not being a number, so it differs from GUI TIDs + packetObj.tid = key = 'i' + internalTID; + internalTID++; + } + + s = JSON.stringify(packetObj); + s = s.length + ':' + s; + + return new Promise(function (resolve, reject) { + var existingResolver = pythonPromises[key]; + pythonPromises[key] = { + 'created': Date.now(), + 'resolve': resolve, + 'reject': reject + }; + if (!sendPacketToPython(s)) { + // we can't send right now, queue it for later + pythonQueue.push(s); + } + if (existingResolver) { + existingResolver.reject(new Error('Supplanted by a new request with the same TID')); + } + }); +}; + +// this is internal function to python connection handler +var sendPythonQueue = function sendPythonQueueFunc() { + while (pythonQueue.length > 0) { + if (sendPacketToPython(pythonQueue[0])) { + // remove the packet only after it has been successfully sent + pythonQueue.shift(); + } + else { + // no more sending possible + break; + } + } +}; + +var connectPython = function connectPythonFunc() { + var receivingHeader = true, // whether we're receiving the length of the next packet, or the packet itself + receiveBuffer = [], // chunks of the next packet (header or body) + bodyLength = 0, // the required length of the next packet + receivedLength = 0; // the size of the received body inside receiveBuffer up to now + + logger.debug('Attempting to connect to Python'); + + pythonClient = net.connect({ + 'host': 'localhost', + 'port': 19732 + }); + + pythonClient.setEncoding('utf8'); + + pythonClient.on('connect', function () { + // login to the python server by sending it the list of currently handled sessions + var sid, sessions = [], packet; + logger.debug('Connection to Python established'); + for (sid in sessions) { + if (sessions.hasOwnProperty(sid)) { + sessions.push({'sid': sid, 'lastActivity': sessions[sid].lastActivity}); + } + } + // this should/must succeed in sending right now, as it is the first write ever made on the socket + packet = JSON.stringify({'type': 'connect', 'sessions': sessions}); + packet = packet.length + ':' + packet; + sendPacketToPython(packet); + }); + + pythonClient.on('data', function (chunk) { + var i, s, + offset = 0, + N = chunk.length; + + logger.debug('Received a chunk from Python: ' + chunk); + while (offset < N) { // while there's data left to process + if (receivingHeader) { + // receiving the length of the packet + i = chunk.indexOf(':', offset); + if (i < 0) { + // incomplete header, buffer it for the next 'data' event + receiveBuffer.push(chunk.substring(offset)); + break; + } + if (receiveBuffer.length == 0) bodyLength = +chunk.substring(offset, i); + else { + receiveBuffer.push(chunk.substring(offset, i)); + bodyLength = +receiveBuffer.join(''); + receiveBuffer.length = 0; + } + offset = i + 1; + receivingHeader = false; + continue; // process the packet body in the next iteration + } + else if ((receivedLength + N - offset) <= bodyLength) { + if (receivedLength == 0) { + // an optimization in case the buffer is empty + s = chunk.substr(offset, bodyLength); + offset += s.length; + } + else { + s = chunk.substr(offset, bodyLength - receivedLength); + offset += s.length; + receiveBuffer.push(s); + s = receiveBuffer.join(''); + receiveBuffer.length = 0; + receivedLength = 0; + } + receivingHeader = true; // process the packet header in the next iteration + try { + processPacketFromPython(s); + } + catch (e) {} // TODO: log the error + } + else { + // incomplete packet body, buffer it for the next 'data' event + s = chunk.substring(offset); + receiveBuffer.push(s); + receivedLength += s.length; + break; + } + } + }); + + pythonClient.on('drain', function () { + logger.debug('The Python client connection has been drained, resuming with sending any queued packets'); + pythonSaturated = false; + sendPythonQueue(); + }); + + pythonClient.on('end', function () { + logger.debug('The Python peer closed the connection'); + pythonClient = null; // make it unavailable for further communication + }); + + pythonClient.on('error', function () { + logger.debug('There was an error on Python client connection'); + pythonClient = null; // make it unavailable for further communication + }); + + pythonClient.on('close', function () { + logger.debug('The Python client connection was closed, attempting to reconnect after half a second'); + setTimeout(connectPython, 500); // reconnect after half a second + }); +}; + +connectPython(); // the initial connection attempt diff --git a/web/package.json b/web/package.json new file mode 100644 index 0000000..d1eba7d --- /dev/null +++ b/web/package.json @@ -0,0 +1,9 @@ +{ + "name": "CodeQWeb", + "version": "0.0.1", + "dependencies": { + "engine.io": "1.5.x", + "bluebird": "2.9.x", + "log4js": "0.6.x" + } +}
\ No newline at end of file diff --git a/wsgi_server.py b/wsgi_server.py index 324f338..f754b14 100644 --- a/wsgi_server.py +++ b/wsgi_server.py @@ -1,6 +1,9 @@ #!/usr/bin/python3 # coding=utf-8 +# TODO: this module is deprecated, remove it +# TODO: all new development should occur in server/handlers.py instead + import falcon import json import client |