summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleš Smodiš <aless@guru.si>2015-09-14 14:49:54 +0200
committerAleš Smodiš <aless@guru.si>2015-09-14 14:49:54 +0200
commitafdb3c0b28715b3d9a0982e4e0504a0cbcf11e70 (patch)
treed052125c6fc4fb4803b10dd20d3c24fc3a2711f9
parentd82013c214021d6e5480d18105760fa70cfc708b (diff)
Reimplemented communication with the client side.
* Implemented a node web server supporting asynchronous websocket and long-polling communication with clients. * Implemented TCP communication between python middleware and node web server.
-rw-r--r--client/__init__.py2
-rw-r--r--daemon.py6
-rw-r--r--errors/session.py3
-rw-r--r--readme.md11
-rw-r--r--server/__init__.py7
-rw-r--r--server/handlers.py302
-rw-r--r--server/problems.py3
-rw-r--r--server/socket.py286
-rw-r--r--server/user_session.py11
-rw-r--r--web/.gitignore1
-rw-r--r--web/main.js552
-rw-r--r--web/package.json9
-rw-r--r--wsgi_server.py3
13 files changed, 1189 insertions, 7 deletions
diff --git a/client/__init__.py b/client/__init__.py
index 2cffb52..1c5d693 100644
--- a/client/__init__.py
+++ b/client/__init__.py
@@ -1,5 +1,7 @@
# coding=utf-8
+# TODO: this module is deprecated, remove it
+
import multiprocessing.managers
__all__ = ['get_session_by_id', 'get_or_create_session']
diff --git a/daemon.py b/daemon.py
index 0809004..bc8df3e 100644
--- a/daemon.py
+++ b/daemon.py
@@ -2,5 +2,7 @@
# coding=utf-8
if __name__ == '__main__':
- import server
- server.start()
+ # import server
+ # server.start()
+ import server.socket
+ server.socket.serve_forever()
diff --git a/errors/session.py b/errors/session.py
index d173fbf..c5170d8 100644
--- a/errors/session.py
+++ b/errors/session.py
@@ -5,3 +5,6 @@ class NoSuchSession(Exception):
class AuthenticationFailed(Exception):
pass
+
+class RequestProcessingError(Exception):
+ pass \ No newline at end of file
diff --git a/readme.md b/readme.md
index 6f891f2..a0f8b7e 100644
--- a/readme.md
+++ b/readme.md
@@ -10,22 +10,25 @@ Install the following packages:
- python3-termcolor
- python3-waitress
- swi-prolog-nox
+ - nodejs (>= 10.0.22)
Settings:
- point webroot to codeq-web
- - set up reverse proxy for /svc/ to wsgi server:
- ProxyPass /svc/ http://localhost:8082/
- ProxyPassReverse /svc/ http://localhost:8082/
+ - set up reverse proxy for /ws/ to the node server:
+ ProxyPass /ws/ http://localhost:8083/
+ ProxyPassReverse /ws/ http://localhost:8083/
- set _path_prefix in server.problems
- set DB settings in db
+ - run "npm install" inside the "web" directory to install all dependencies
+ (they will be installed inside the "web" directory)
Running:
- run prolog/runner/main.pl
- run python/runner/main.py
- run daemon.py
- - run wsgi_server.py
+ - start the node process (node web/main.js)
Misc.
=====
diff --git a/server/__init__.py b/server/__init__.py
index 284c162..861a7df 100644
--- a/server/__init__.py
+++ b/server/__init__.py
@@ -3,9 +3,14 @@
import multiprocessing.managers
from . import user_session
from . import prolog_session
+from . import python_session
+from . import socket
import server.problems
-__all__ = ['user_session', 'prolog_session', 'problems', 'start']
+__all__ = ['socket', 'handlers', 'user_session', 'prolog_session', 'python_session', 'problems', 'start']
+
+
+# TODO: everything below is deprecated, remove it
class Codeq(object):
_method_to_typeid_ = {
diff --git a/server/handlers.py b/server/handlers.py
new file mode 100644
index 0000000..e9cf7c8
--- /dev/null
+++ b/server/handlers.py
@@ -0,0 +1,302 @@
+# coding=utf-8
+
+from concurrent.futures import ThreadPoolExecutor
+import traceback
+from errors.session import *
+import server
+
+
+class CodeqService(object):
+ """Base class for all CodeQ services.
+ """
+ session_is_optional = False
+
+ def process(self, request):
+ pass
+
+
+class ProblemList(CodeqService):
+ """List all available problems to the client.
+ """
+ session_is_optional = True
+
+ def process(self, request):
+ request.reply({'code': 0, 'message': 'ok', 'problems': server.problems.list_problems()})
+
+
+class Login(CodeqService):
+ """Logs in a client, creating a new session.
+ """
+ session_is_optional = True
+
+ def process(self, request):
+ js = request.data
+ username = js.get('username')
+ password = js.get('password')
+ if username is None:
+ request.reply({'code': 1, 'message': 'Username was not provided'})
+ elif password is None:
+ request.reply({'code': 2, 'message': 'Password was not provided'})
+ else:
+ try:
+ session = server.user_session.authenticate_and_create_session(username, password)
+ except AuthenticationFailed:
+ request.reply({'code': 3, 'message': 'Username or password do not match'})
+ else:
+ if request.session:
+ request.session.destroy()
+ request.reply({'code': 0, 'message': 'OK', 'sid':session.get_sid()})
+
+
+class Activity(CodeqService):
+ def process(self, request):
+ js = request.data
+ trace = js.get('trace')
+ solution = js.get('solution')
+ problem_id = js.get('problem_id')
+ if (trace is not None) or (solution is not None):
+ # we have something to do
+ if problem_id is None:
+ request.reply({'code': 1, 'message': 'Problem ID is missing'})
+ else:
+ request.session.update_solution(problem_id, trace, solution)
+ request.end() # no feedback, just acknowledge the reception
+
+
+class Query(CodeqService):
+ def process(self, request):
+ js = request.data
+ step = js.get('step')
+ if step is None:
+ request.reply({'code': 1, 'message': '"step" is not set'})
+ else:
+ problem_id = js.get('problem_id')
+ if problem_id is None:
+ request.reply({'code': 4, 'message': 'Problem ID not given'})
+ else:
+ session = request.session
+ trace = js.get('trace')
+ prolog = session.get_prolog()
+ program = None
+ if step == 'run':
+ program = js.get('program')
+ query = js.get('query')
+ if program is None:
+ result = {'code': 2, 'message': 'No program specified'}
+ elif query is None:
+ result = {'code': 3, 'message': 'No query specified'}
+ else:
+ messages, status, have_more = prolog.run_for_user(session.get_uid(), problem_id, program, query)
+ result = {'code': 0, 'message': 'ok', 'terminal': {'messages': messages, 'status': status, 'have_more': have_more}}
+ elif step == 'next':
+ messages, status, have_more = prolog.step()
+ result = {'code': 0, 'message': 'ok', 'terminal': {'messages': messages, 'status': status, 'have_more': have_more}}
+ elif step == 'end':
+ messages, status, have_more = prolog.end()
+ result = {'code': 0, 'message': 'ok', 'terminal': {'messages': messages, 'status': status, 'have_more': have_more}}
+ else:
+ result = {'code': 5, 'message': 'Unknown prolog step: {0}'.format(step)}
+ if program or trace:
+ session.update_solution(problem_id, trace, program)
+ request.reply(result)
+
+
+# Pull stdout/stderr from the session's Python interpreter. TODO: convert to async handling
+class PythonPull(CodeqService):
+ def process(self, request):
+ python = request.session.get_python()
+ output = python.pull()
+ request.reply({'code': 0, 'message': 'ok', 'terminal': {'text': output if output else ''}})
+
+
+# Push stdin to the session's Python interpreter. TODO: convert to async handling
+class PythonPush(CodeqService):
+ def process(self, request):
+ text = request.data.get('text')
+ if text is None:
+ request.reply({'code': 1, 'message': 'No input specified'})
+ else:
+ python = request.session.get_python()
+ python.push(text)
+ request.reply({'code': 0, 'message': 'ok'})
+
+
+class Hint(CodeqService):
+ def process(self, request):
+ js = request.data
+ language = js.get('language')
+ problem_id = js.get('problem_id')
+ program = js.get('program')
+
+ if problem_id is None:
+ request.reply({'code': 1, 'message': 'No problem ID specified'})
+ elif program is None:
+ request.reply({'code': 2, 'message': 'No program specified'})
+ else:
+ session = request.session
+ lang_session = None
+ if language == 'prolog':
+ lang_session = session.get_prolog()
+ elif language == 'python':
+ lang_session = session.get_python()
+
+ if lang_session is None:
+ request.reply({'code': 3, 'message': 'Unknown language specified'})
+ else:
+ hints = lang_session.hint(session.get_sid(), problem_id, program)
+ request.reply({'code': 0, 'message': 'ok', 'hints': hints})
+
+
+class Test(CodeqService):
+ def process(self, request):
+ js = request.data
+ language = js.get('language')
+ problem_id = js.get('problem_id')
+ program = js.get('program')
+
+ if problem_id is None:
+ request.reply({'code': 1, 'message': 'No problem ID specified'})
+ elif program is None:
+ request.reply({'code': 2, 'message': 'No program specified'})
+ else:
+ session = request.session
+ lang_session = None
+ if language == 'prolog':
+ lang_session = session.get_prolog()
+ elif language == 'python':
+ lang_session = session.get_python()
+
+ if lang_session is None:
+ request.reply({'code': 3, 'message': 'Unknown language specified'})
+ else:
+ hints = lang_session.test(session.get_sid(), problem_id, program)
+ request.reply({'code': 0, 'message': 'ok', 'hints': hints})
+
+
+class GetProblem(CodeqService):
+ def process(self, request):
+ js = request.data
+ language = js.get('language')
+ problem_group = js.get('problem_group')
+ problem = js.get('problem')
+ if language is None:
+ request.reply({'code': 1, 'message': 'Language identifier not given'})
+ elif problem_group is None:
+ request.reply({'code': 2, 'message': 'Problem group identifier not given'})
+ elif problem is None:
+ request.reply({'code': 3, 'message': 'Problem identifier not given'})
+ else:
+ request.reply({'code': 0, 'message': 'ok', 'data': request.session.get_problem_data(language, problem_group, problem)})
+
+
+# maps actions to their handlers
+incoming_handlers = {
+ 'list_problems': ProblemList(),
+ 'login': Login(),
+ 'get_problem': GetProblem(),
+ 'logout': None,
+ 'activity': Activity(),
+ 'query': Query(),
+ 'hint': Hint(),
+ 'test': Test()
+}
+
+
+class Request(object):
+ def __init__(self, tid, original_sid, session, data):
+ """Creates a new request
+
+ :param tid: communicator-level transaction ID (global relative to the specific communicator where it originated)
+ :param original_sid: session ID, optional
+ :param session: the actual session with the original_sid, if it exists; the processor may swap it for a new session
+ :param data: the request data from the client
+ :return: new instance
+ """
+ self._tid = tid
+ self._original_sid = original_sid
+ self.session = session
+ self.data = data
+ self.is_finished = False
+
+ def reply(self, data):
+ """Reply to this request.
+
+ :param data: the dictionary representing the reply, that will be converted to JSON
+ :return: None
+ """
+ if data is None:
+ self.end()
+ if self._original_sid is not None:
+ sid = data.get('sid')
+ if sid is None:
+ data['sid'] = self._original_sid
+ elif sid != self._original_sid:
+ data['sid'] = self._original_sid
+ data['new_sid'] = sid
+ # it is important to reply with the same tid and sid parameters as were in the request, so message accounting doesn't get confused
+ send(self._tid, self._original_sid, data)
+ self.is_finished = True
+
+ def end(self):
+ """Conclude the request, without sending a response.
+
+ This is to acknowledge that the response has been received.
+ :return: None
+ """
+ send(self._tid, self._original_sid, None)
+ self.is_finished = True
+
+
+########## low-level machinery, subject to change to support more than the single socket communicator ##########
+
+_executor = ThreadPoolExecutor(max_workers=100)
+
+def _invoke_handler(handler, request):
+ try:
+ print('Worker thread processing data={}'.format(str(request.data)))
+ handler.process(request)
+ if not request.is_finished:
+ print('ERROR: the request was not concluded!')
+ request.reply({'code': -1, 'message': 'Request processing did not provide a reply'})
+ print('Processing finished')
+ except Exception as e:
+ print('ERROR: data processing failed: ' + str(e))
+ traceback.print_exc()
+ request.reply({'code': -1, 'message': 'Internal error: ' + str(e)})
+
+def serve_request(json_obj):
+ if not isinstance(json_obj, dict):
+ raise RequestProcessingError('Require a request represented as a dict, instead got: ' + str(type(json_obj)))
+ tid = json_obj.get('tid') # session ID and transaction ID uniquely identify a transaction
+ sid = json_obj.get('sid')
+ action = json_obj.get('action')
+ if action is None:
+ raise RequestProcessingError('Request does not contain an action')
+ if not isinstance(action, str):
+ raise RequestProcessingError('Requested action must be a string, got: ' + str(type(action)))
+ handler = incoming_handlers.get(action)
+ if handler is None:
+ raise RequestProcessingError('No handler for ' + action)
+ print("Attempting to serve action={}".format(action))
+ session = None
+ if sid is None:
+ if not handler.session_is_optional:
+ raise RequestProcessingError('Request is missing a session-ID')
+ else:
+ del json_obj['sid']
+ try:
+ session = server.user_session.get_session_by_id(sid)
+ except NoSuchSession:
+ if not handler.session_is_optional:
+ raise RequestProcessingError('This user session has expired. Please log-in again.')
+ _executor.submit(_invoke_handler, handler, Request(tid, sid, session, json_obj))
+
+def send(tid, sid, json_obj):
+ # just a proxy function for now
+ print('Sending reply: {}'.format(str(json_obj)))
+ server.socket.sendPacket(tid, sid, json_obj)
+
+def stop():
+ global _executor
+ _executor.shutdown()
+ _executor = None
diff --git a/server/problems.py b/server/problems.py
index 02f0307..1c87345 100644
--- a/server/problems.py
+++ b/server/problems.py
@@ -19,6 +19,9 @@ def load_module(fullname):
d = os.path.join(_path_prefix, *parts[:-1])
ff = importlib.machinery.FileFinder(d, (importlib.machinery.SourceFileLoader, ['.py']))
spec = ff.find_spec(fullname)
+ if spec is None:
+ print('ERROR: there is no problem module {0}'.format(fullname))
+ return None
mod = type(sys)(fullname)
mod.__loader__ = spec.loader
mod.__package__ = spec.parent
diff --git a/server/socket.py b/server/socket.py
new file mode 100644
index 0000000..c248056
--- /dev/null
+++ b/server/socket.py
@@ -0,0 +1,286 @@
+# coding=utf-8
+
+import socket
+import selectors
+import json
+import threading
+import traceback
+from server.handlers import serve_request
+
+# TODO: add a whole lot of try..except blocks, and just make it overall error resistant
+# TODO: add logging
+
+_mapping_lock = threading.Lock() # the lock guarding access to the two mappings below
+
+_sessions_to_socket = {} # keyed by sid: what session is bound to what socket
+
+_transactions_to_socket = {} # keyed by tid, used only when there is no sid available, so a request can be replied
+
+
+def processIncomingPacket(receiving_socket, packet):
+ print('Decocoding JSON: {}'.format(packet))
+ obj = json.loads(packet)
+ if obj.get('type') == 'connect':
+ return # TODO: first packet is 'connect', providing the list of connected sessions to the peer
+ tid = obj.get('tid') # transaction ID
+ if tid is None:
+ raise Exception('Transaction ID is missing from the request')
+ sid = obj.get('sid') # session ID
+ _mapping_lock.acquire()
+ try:
+ if sid is None:
+ _transactions_to_socket[tid] = receiving_socket
+ else:
+ _sessions_to_socket[sid] = receiving_socket
+ finally:
+ _mapping_lock.release()
+ serve_request(obj)
+
+
+def sendPacket(tid, sid, json_obj):
+ if sid is None:
+ if tid is None:
+ raise Exception('Cannot send a message without a tid and/or a sid')
+ _mapping_lock.acquire()
+ try:
+ socket = _transactions_to_socket.get(tid)
+ del _transactions_to_socket[tid] # this mapping is not relevant anymore
+ finally:
+ _mapping_lock.release()
+ if socket is None:
+ raise Exception('Cannot send in transaction, it is not registered: ' + tid)
+ else:
+ _mapping_lock.acquire()
+ try:
+ socket = _sessions_to_socket.get(sid)
+ finally:
+ _mapping_lock.release()
+ if socket is None:
+ raise Exception('Cannot send to session, it is not registered: ' + sid)
+ if json_obj is None:
+ # a special case: this is only an acknowledgment, so any request/response accounting can be cleared
+ json_obj = {}
+ if sid is not None:
+ json_obj['sid'] = sid
+ if tid is not None:
+ json_obj['tid'] = tid
+ socket.send(json_obj)
+
+
+class SocketHandler(object):
+
+ def read(self):
+ pass
+
+ def destroy(self):
+ pass
+
+
+class ServerSocket(SocketHandler):
+
+ def __init__(self, communication, hostname='localhost', port=19732):
+ self.communication = communication
+ server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ server_socket.bind((hostname, port))
+ server_socket.listen(5)
+ server_socket.setblocking(False)
+ self.socket = server_socket
+
+ def read(self):
+ conn, addr = self.socket.accept()
+ self.communication.add_socket(JsonClientSocket(self.communication, conn, addr))
+
+ def destroy(self):
+ self.communication.remove_socket(self)
+ try:
+ self.socket.close()
+ except:
+ pass
+
+
+class JsonClientSocket(SocketHandler):
+
+ def __init__(self, communication, socket, address):
+ self.communication = communication
+ self.socket = socket
+ self.address = address
+ socket.setblocking(False)
+ self.status = 0 # registering
+ self.receiving_header = True
+ self.buffer = []
+ self.received_length = 0 # the cumulative length of strings in buffer
+ self.body_length = 0 # the length of the next packet, given in the header
+ self._read_lock = threading.Lock()
+ self._write_lock = threading.Lock()
+
+ def read(self):
+ lock = self._read_lock
+ lock.acquire()
+ try:
+ data = self.socket.recv(8192)
+ if not data:
+ self.destroy()
+ else:
+ # packet decode loop
+ data = data.decode('utf-8')
+ print("received: {}".format(data))
+ offset = 0
+ N = len(data)
+ while (offset < N):
+ if self.receiving_header:
+ i = data.find(':', offset)
+ if i < 0:
+ self.buffer.append(data[offset:])
+ break
+ if len(self.buffer) == 0:
+ self.body_length = int(data[offset:i])
+ else:
+ self.buffer.append(data[offset:i])
+ self.body_length = int(''.join(self.buffer))
+ self.buffer.clear()
+ offset = i + 1
+ self.receiving_header = False
+ continue
+ elif (self.received_length + N - offset) <= self.body_length:
+ if self.received_length == 0:
+ # an optimized case
+ s = data[offset:offset+self.body_length]
+ offset += len(s)
+ else:
+ s = data[offset:offset+self.body_length - self.received_length] # TODO: is this correct?
+ offset += len(s)
+ self.buffer.append(s)
+ s = ''.join(self.buffer)
+ self.buffer.clear()
+ self.received_length = 0
+ self.receiving_header = True
+ try:
+ processIncomingPacket(self, s)
+ except Exception as e:
+ # any exception that propagates to here means a possible protocol error, we have to disconnect
+ traceback.print_exc()
+ self.destroy()
+ return
+ else:
+ # incomplete packet body, buffer it until the next time
+ s = data[offset:]
+ self.buffer.append(s)
+ self.received_length += len(s)
+ break
+ finally:
+ lock.release()
+
+ def destroy(self):
+ self.communication.remove_socket(self)
+ try:
+ self.socket.close()
+ except:
+ pass
+ # TODO: unregister from any internal mechanisms
+
+ def send(self, json_obj):
+ js = json.dumps(json_obj)
+ m = str(len(js)) + u':' + js
+ bytes = m.encode('utf-8')
+ lock = self._write_lock
+ lock.acquire()
+ try:
+ self.socket.sendall(bytes)
+ finally:
+ lock.release()
+
+
+class Communication(SocketHandler):
+ def __init__(self):
+ # the selector to register sockets with, the serve_forever() uses it to handle any incoming data
+ self._selector = selectors.DefaultSelector() # to wait until at least on socket is readable
+ self._lock = threading.Lock() # the access lock, because of multi-threading
+ self._remove_queue = [] # what sockets to remove
+ self._add_queue = [] # what sockets to add
+ notifier, receiver = socket.socketpair() # the notification mechanism for the serving thread, using the selector
+ receiver.setblocking(False)
+ self._notifier = notifier
+ self._receiver = receiver
+ self._registered_handlers = set()
+ self._not_signalled = True
+ self._destroying = False
+
+ def serve_forever(self):
+ self._selector.register(self._receiver, selectors.EVENT_READ, self)
+ while True:
+ events = self._selector.select()
+ for key, mask in events:
+ if mask & selectors.EVENT_READ:
+ key.data.read()
+
+ def add_socket(self, socket_handler):
+ lock = self._lock
+ lock.acquire()
+ try:
+ self._add_queue.append(socket_handler)
+ self._signal()
+ finally:
+ lock.release()
+
+ def remove_socket(self, socket_handler):
+ lock = self._lock
+ lock.acquire()
+ try:
+ self._remove_queue.append(socket_handler)
+ self._signal()
+ finally:
+ lock.release()
+
+ def _signal(self): # invoke holding the lock
+ if self._not_signalled:
+ self._notifier.send(b'1')
+ self._not_signalled = False
+
+ def read(self):
+ # we have been invoked to process some new additions and/or removals
+ lock = self._lock
+ lock.acquire()
+ try:
+ self._receiver.recv(1024) # clear the queue
+ self._not_signalled = True
+ selector = self._selector
+ for handler in self._remove_queue:
+ selector.unregister(handler.socket)
+ self._registered_handlers.remove(handler)
+ self._remove_queue.clear()
+ for handler in self._add_queue:
+ selector.register(handler.socket, selectors.EVENT_READ, handler)
+ self._registered_handlers.add(handler)
+ self._add_queue.clear()
+ if self._destroying:
+ self._notifier.close()
+ self._receiver.close()
+ self._selector.close()
+ finally:
+ lock.release()
+
+ def destroy(self):
+ lock = self._lock
+ # make a copy of handlers, because as we're destroying them each will invoke remove_socket(), and for that the lock must be available and the iterating set must not change
+ lock.acquire()
+ try:
+ handlers = set(self._registered_handlers)
+ finally:
+ lock.release()
+ for handler in handlers:
+ handler.destroy() # destroy them all, even the server socket
+ # this is where everything is destroyed but us, so what follows is self-destruct
+ lock.acquire()
+ try:
+ self._destroying = True
+ self._signal()
+ finally:
+ lock.release()
+
+
+def serve_forever():
+ communication = Communication()
+ server = ServerSocket(communication)
+ communication.add_socket(server)
+ communication.serve_forever()
diff --git a/server/user_session.py b/server/user_session.py
index f21102c..5eec743 100644
--- a/server/user_session.py
+++ b/server/user_session.py
@@ -8,6 +8,7 @@ import random
from . import prolog_session
from . import python_session
from . import problems
+from . import handlers
import db
from errors.session import NoSuchSession, AuthenticationFailed
import psycopg2.extras
@@ -128,6 +129,16 @@ class UserSession(object):
finally:
db.return_connection(conn)
+ def send(self, json_obj):
+ """Sends a message to the user.
+
+ This method may be used only for messages that are not replies to requests.
+ For replies use the reply() method on the Request object.
+ :param json_obj: a dict representing the json message
+ :return: None
+ """
+ handlers.send(None, self.sid, json_obj)
+
def __del__(self):
# no locking needed if GC is removing us, as there cannot be any concurrent access by definition
if hasattr(self, 'prolog_session') and (self.prolog_session is not None):
diff --git a/web/.gitignore b/web/.gitignore
new file mode 100644
index 0000000..3c3629e
--- /dev/null
+++ b/web/.gitignore
@@ -0,0 +1 @@
+node_modules
diff --git a/web/main.js b/web/main.js
new file mode 100644
index 0000000..1a299db
--- /dev/null
+++ b/web/main.js
@@ -0,0 +1,552 @@
+var engine = require('engine.io'),
+ http_server = require('http').createServer(),
+ net = require('net'),
+ Promise = require('bluebird'),
+ log4js = require('log4js');
+
+var logger = log4js.getLogger(); // TODO: log to a file
+
+http_server.listen(8083);
+
+var server = new engine.Server({
+ 'pingTimeout': 60000, // in ms
+ 'pingInterval': 25000, // in ms
+ 'maxHttpBufferSize': 100000, // in bytes, for polling
+ 'transports': ['polling', 'websocket'],
+ 'cookie': false // no cookies
+});
+
+http_server.on('request', function (request, response) {
+ var uriParts = request.url.split('/'); // uriParts[0] will be an empty string, because uri must start with a /
+
+ logger.debug('HTTP server request, URL: ' + request.url);
+
+ if ((uriParts.length <= 1) || (uriParts[1] === 'ws')) {
+ server.handleRequest(request, response);
+ }
+ else {
+ response.writeHead(404, {'Content-Type': 'text/plain'});
+ response.write('Not found');
+ response.end();
+ }
+});
+
+http_server.on('upgrade', function (request, socket, head) {
+ server.handleUpgrade(request, socket, head);
+});
+
+
+// connected clients
+var sessions = {
+ // sid: sessions description {sid: string, lastActivity: Date.now(), socket: net.Socket}
+};
+
+// GUI action handlers, keyed by the action name, values are functions that take the session and the message
+var guiHandlers = {
+ // TODO: list_problems is outside sessions for now; later it must become session-aware
+ 'list_problems': function actionListProblems(session, message) {
+ var tid = message['tid']; // remember any TID that is set
+ delete message['tid']; // delete any TID, we must use an internal one when sending to Python, because it must be unique over all connected sessions
+ logger.debug('Received a list_problems request from GUI');
+ sendDataToPython(message).then(
+ function listProblemsRequestOK(response) {
+ if ((typeof tid !== 'undefined') && (tid !== null)) response['tid'] = tid;
+ else delete response['tid'];
+ session.send(response);
+ },
+ function listProblemsRequestFailed(err) {
+ var response = {
+ 'code': -30
+ },
+ reason;
+ logger.debug('Failed to request list_problems from Python');
+ if ((typeof tid !== 'undefined') && (tid !== null)) response['tid'] = tid;
+ if ((typeof err === 'object') && (err !== null)) reason = err.toString();
+ else reason = '' + err;
+ response['message'] = 'List-problems request failed: ' + reason;
+ session.end(response);
+ }
+ ).done();
+ },
+
+ 'login': function actionLogin(session, message) {
+ // first-time connect: login
+ var tid = message['tid']; // remember any TID that is set
+ delete message['tid']; // delete any TID, we must use an internal one
+ logger.debug('Received a login request from GUI');
+ sendDataToPython(message).then(
+ function loginRequestOK(response) {
+ var sid, existingSession;
+ if ((typeof tid !== 'undefined') && (tid !== null)) response['tid'] = tid;
+ else delete response['tid'];
+ if (response.code !== 0) {
+ logger.debug('Python rejected login request from GUI');
+ session.end(response);
+ }
+ else {
+ logger.debug('Python accepted login request from GUI');
+ session.sid = sid = response.sid;
+ existingSession = sessions[sid];
+ sessions[sid] = session;
+ session.send(response);
+ if (existingSession) {
+ existingSession.end({'code': -40, 'message': 'Supplanted with a new connection for the same session', 'sid': sid});
+ }
+ }
+ },
+ function loginRequestFailed(err) {
+ var response = {
+ 'code': -30
+ },
+ reason;
+ logger.debug('Failed to request login from Python');
+ if ((typeof tid !== 'undefined') && (tid !== null)) response['tid'] = tid;
+ if ((typeof err === 'object') && (err !== null)) reason = err.toString();
+ else reason = '' + err;
+ response['message'] = 'Login request failed: ' + reason;
+ session.end(response);
+ }
+ ).done();
+ },
+
+ //'resume': function actionResume(session, message) {
+ // // reconnect: resume the session after unexpected disconnect
+ //
+ //},
+
+ 'logout': function actionLogout(session, message) {
+ // logout, the user quit the app
+ logger.debug('Logout GUI');
+ sendDataToPython(message).finally(function () {
+ session.end({'code': 9999, 'message': 'Bye.'});
+ }).done();
+ },
+
+ 'activity': function actionActivity(session, message) {
+ logger.debug('Received activity from GUI');
+ sendDataToPython(message).catch(session.end).done();
+ },
+
+ 'query': function actionQuery(session, message) {
+ logger.debug('Received query from GUI');
+ sendDataToPython(message).then(session.send, session.end).done();
+ },
+
+ 'hint': function actionHint(session, message) {
+ logger.debug('Received hint from GUI');
+ sendDataToPython(message).then(session.send, session.end).done();
+ },
+
+ 'test': function actionTest(session, message) {
+ logger.debug('Received test from GUI');
+ sendDataToPython(message).then(session.send, session.end).done();
+ },
+
+ 'get_problem': function actionTest(session, message) {
+ logger.debug('Received get_problem from GUI');
+ sendDataToPython(message).then(session.send, session.end).done();
+ }
+};
+
+server.on('connection', function (socket) {
+ var session,
+ fatal = function (jsonObj) {
+ // we need to preserve the socket object in a variable, so we can invoke close() on it later
+ var s = socket, m;
+ if (s === null) return;
+ if (jsonObj instanceof Error) jsonObj = {'code': -50, 'message': jsonObj.toString()};
+ m = JSON.stringify(jsonObj);
+ logger.debug('Sending to GUI from fatal(): ' + m);
+ try {
+ s.send(m, function () {
+ s.close();
+ s = null;
+ });
+ }
+ catch (e) {} // TODO: log
+ socket = null; // this socket will no longer be processed
+ session.socket = null;
+ },
+ reply = function (jsonObj) {
+ var m;
+ if (!socket) return false;
+ m = JSON.stringify(jsonObj);
+ logger.debug('Sending to GUI from reply(): ' + m);
+ try {
+ socket.send(m);
+ return true;
+ }
+ catch (e) {
+ socket = null; // this socket will no longer be processed
+ session.socket = null;
+ return false;
+ }
+ };
+
+ logger.debug('A new client connection established');
+
+ session = {
+ 'sid': null, // session id, null means the python hasn't authenticated us yet
+ 'lastActivity': Date.now(),
+ 'socket': socket,
+ 'lastTID': -1, // last transaction ID, anything less than this we've already handled
+ 'send': reply,
+ 'end': fatal
+ };
+
+ socket.on('close', function (reason, description) { // description is optional
+ logger.debug('GUI socket closed');
+ if (session.sid !== null) {
+ if (sessions[session.sid] === session) delete sessions[session.sid];
+ sendDataToPython({'type': 'unregister', 'sid': session.sid});
+ }
+ });
+
+ socket.on('message', function (data) {
+ var m, tid, action, handler;
+
+ if (socket === null) {
+ // this socket is not being processed anymore
+ logger.debug('Received something from GUI, but the socket is not being processed anymore');
+ return;
+ }
+
+ if (typeof data !== 'string') {
+ logger.debug('Received from GUI a binary message');
+ fatal({"code": -1, "message": "Can handle only text messages."});
+ return;
+ }
+
+ logger.debug('Received from GUI: ' + data);
+
+ try {
+ m = JSON.parse(data);
+ }
+ catch (e) {
+ fatal({"code": -2, "message": "Not a JSON message."});
+ return;
+ }
+
+ tid = m['tid']; // transaction ID
+ if (typeof tid !== 'number') {
+ fatal({"code": -3, "message": "Transaction ID is missing or is not an integer."});
+ return;
+ }
+
+ if (Math.floor(tid) !== tid) {
+ fatal({"code": -4, "message": "Transaction ID is not an integer."});
+ return;
+ }
+
+ session.lastActivity = Date.now();
+
+ if (tid <= session.lastTID) {
+ // we already met this message, ignore it
+ return;
+ }
+
+ action = m['action'];
+ if (typeof action !== 'string') {
+ if (typeof action === 'undefined') {
+ reply({"code": -10, "tid": tid, "message": "Action is not set."});
+ }
+ else if (action === null) {
+ reply({"code": -11, "tid": tid, "message": "Action is null."});
+ }
+ else {
+ reply({"code": -12, "tid": tid, "message": "Action is not a string."});
+ }
+ return;
+ }
+
+ if (!action) {
+ reply({"code": -13, "tid": tid, "message": "Action is an empty string."});
+ return;
+ }
+
+ handler = guiHandlers[action];
+ if (!handler) {
+ reply({"code": -14, "tid": tid, "message": "Action is not understood."});
+ return;
+ }
+
+ try {
+ handler(session, m);
+ session.lastTID = tid;
+ }
+ catch (e) {
+ reply({"code": -20, "tid": tid, "message": "Processing failed: " + e.toString()});
+ }
+ });
+});
+
+
+// ========== Python server connection ==========
+
+var pythonClient = null, // the Socket for communication with the python server
+ pythonSaturated = false, // whether the kernel-level buffer is full, in this case only push into the pythonQueue
+ pythonQueue = [], // queue of packets, waiting to be sent
+ pythonPromises = {}; // promises waiting to be resolved with incoming data from python; key is TID, or TID:SID if SID is available, value is {'created': Date.now(), resolve: function, reject: function}
+
+var pythonPromisesTimeout = setInterval(function () {
+ var floor = Date.now() - 300000, // timeout whatever is waiting for more than 5 minutes
+ key, resolver;
+ for (key in pythonPromises) {
+ if (pythonPromises.hasOwnProperty(key)) {
+ resolver = pythonPromises[key];
+ if ((typeof resolver === 'object') && (resolver !== null) && (resolver.created < floor)) {
+ delete pythonPromises[key];
+ resolver.reject(new Error('Timeout waiting for reply from server'));
+ logger.warn('Python promise timed out at key ' + key);
+ }
+ }
+ }
+}, 60000); // once every minute
+
+var processPacketFromPython = function processPacketFromPythonFunc(packetString) {
+ var m, session, isDummy,
+ key = null,
+ resolver = null;
+
+ logger.debug('Received from Python: ' + packetString);
+
+ try {
+ m = JSON.parse(packetString);
+ }
+ catch (e) {
+ logger.error('Failed to parse JSON data from Python: ' + e.toString() + '\n * The packet that caused the error: ' + packetString);
+ return;
+ }
+
+ if ((typeof m.tid !== 'undefined') && (m.tid !== null)) {
+ if ((typeof m.tid === 'string') && (m.tid.charAt(0) === 'i')) {
+ // internal TID: only one way to key this one
+ key = m.tid;
+ resolver = pythonPromises[key];
+ }
+ else if (m.sid) {
+ key = m.tid + ':' + m.sid;
+ resolver = pythonPromises[key];
+ if (!resolver) {
+ key = '' + m.tid
+ resolver = pythonPromises[key];
+ }
+ }
+ else {
+ key = '' + m.tid;
+ resolver = pythonPromises[key];
+ }
+ if (resolver) delete pythonPromises[key];
+ }
+
+ if (!m.tid || !resolver) {
+ // no promise is waiting, because this is a generated message, not a response to a previously sent message
+ isDummy = true;
+ for (key in m) {
+ if (m.hasOwnProperty(key) && (key !== 'tid') && (key != 'sid')) {
+ isDummy = false;
+ break;
+ }
+ }
+ if (isDummy) {
+ logger.debug('Message from Python is a dummy message, ignoring');
+ return;
+ }
+ if (!m.sid) {
+ // no session ID -> nowhere to send the message
+ logger.error('Message from Python dropped because there is no outstanding promise for it, and no session ID has been set');
+ return;
+ }
+ session = sessions[m.sid];
+ if (session) {
+ logger.debug('Proxying a message from Python directly to GUI, as there is no outstanding promise for it (probably a generated message)');
+ session.send(m);
+ }
+ else {
+ logger.error('Message from Python dropped because there is no outstanding promise for it, and no active session with ID ' + m.sid);
+ }
+ return;
+ }
+
+ logger.debug('Invoking the promise that is waiting for this Python message');
+ resolver.resolve(m);
+};
+
+// this is internal function to python connection handler
+var sendPacketToPython = function sendPacketToPythonFunc(packet) {
+ if (pythonSaturated || (pythonClient === null)) return false;
+ logger.debug('Attempting to send a packet to Python: ' + packet);
+ try {
+ pythonSaturated = !pythonClient.write(packet, 'utf8');
+ return true;
+ }
+ catch (e) {
+ // an error occurred: disconnect; the close event handler will trigger a connection retry
+ logger.error('Failed to write to the Python client connection: ' + e.toString());
+ try {
+ pythonClient.close();
+ }
+ catch (e2) {
+ logger.error('Failed to close() the Python client connection: ' + e2.toString());
+ }
+ pythonClient = null;
+ return false;
+ }
+};
+
+var internalTID = 0;
+var sendDataToPython = function sendDataToPythontFunc(packetObj) {
+ var s, key;
+ if ((typeof packetObj.tid !== 'undefined') && (packetObj.tid !== null)) {
+ if ((typeof packetObj.tid === 'string') && (packetObj.tid.charAt(0) === 'i')) key = packetObj.tid; // it bears a TID that is marked as internal one, there's only one way to key this
+ else if (packetObj.sid) key = packetObj.tid + ':' + packetObj.sid;
+ else key = '' + packetObj.tid;
+ }
+ else {
+ // no TID exists: set the internal one, not being a number, so it differs from GUI TIDs
+ packetObj.tid = key = 'i' + internalTID;
+ internalTID++;
+ }
+
+ s = JSON.stringify(packetObj);
+ s = s.length + ':' + s;
+
+ return new Promise(function (resolve, reject) {
+ var existingResolver = pythonPromises[key];
+ pythonPromises[key] = {
+ 'created': Date.now(),
+ 'resolve': resolve,
+ 'reject': reject
+ };
+ if (!sendPacketToPython(s)) {
+ // we can't send right now, queue it for later
+ pythonQueue.push(s);
+ }
+ if (existingResolver) {
+ existingResolver.reject(new Error('Supplanted by a new request with the same TID'));
+ }
+ });
+};
+
+// this is internal function to python connection handler
+var sendPythonQueue = function sendPythonQueueFunc() {
+ while (pythonQueue.length > 0) {
+ if (sendPacketToPython(pythonQueue[0])) {
+ // remove the packet only after it has been successfully sent
+ pythonQueue.shift();
+ }
+ else {
+ // no more sending possible
+ break;
+ }
+ }
+};
+
+var connectPython = function connectPythonFunc() {
+ var receivingHeader = true, // whether we're receiving the length of the next packet, or the packet itself
+ receiveBuffer = [], // chunks of the next packet (header or body)
+ bodyLength = 0, // the required length of the next packet
+ receivedLength = 0; // the size of the received body inside receiveBuffer up to now
+
+ logger.debug('Attempting to connect to Python');
+
+ pythonClient = net.connect({
+ 'host': 'localhost',
+ 'port': 19732
+ });
+
+ pythonClient.setEncoding('utf8');
+
+ pythonClient.on('connect', function () {
+ // login to the python server by sending it the list of currently handled sessions
+ var sid, sessions = [], packet;
+ logger.debug('Connection to Python established');
+ for (sid in sessions) {
+ if (sessions.hasOwnProperty(sid)) {
+ sessions.push({'sid': sid, 'lastActivity': sessions[sid].lastActivity});
+ }
+ }
+ // this should/must succeed in sending right now, as it is the first write ever made on the socket
+ packet = JSON.stringify({'type': 'connect', 'sessions': sessions});
+ packet = packet.length + ':' + packet;
+ sendPacketToPython(packet);
+ });
+
+ pythonClient.on('data', function (chunk) {
+ var i, s,
+ offset = 0,
+ N = chunk.length;
+
+ logger.debug('Received a chunk from Python: ' + chunk);
+ while (offset < N) { // while there's data left to process
+ if (receivingHeader) {
+ // receiving the length of the packet
+ i = chunk.indexOf(':', offset);
+ if (i < 0) {
+ // incomplete header, buffer it for the next 'data' event
+ receiveBuffer.push(chunk.substring(offset));
+ break;
+ }
+ if (receiveBuffer.length == 0) bodyLength = +chunk.substring(offset, i);
+ else {
+ receiveBuffer.push(chunk.substring(offset, i));
+ bodyLength = +receiveBuffer.join('');
+ receiveBuffer.length = 0;
+ }
+ offset = i + 1;
+ receivingHeader = false;
+ continue; // process the packet body in the next iteration
+ }
+ else if ((receivedLength + N - offset) <= bodyLength) {
+ if (receivedLength == 0) {
+ // an optimization in case the buffer is empty
+ s = chunk.substr(offset, bodyLength);
+ offset += s.length;
+ }
+ else {
+ s = chunk.substr(offset, bodyLength - receivedLength);
+ offset += s.length;
+ receiveBuffer.push(s);
+ s = receiveBuffer.join('');
+ receiveBuffer.length = 0;
+ receivedLength = 0;
+ }
+ receivingHeader = true; // process the packet header in the next iteration
+ try {
+ processPacketFromPython(s);
+ }
+ catch (e) {} // TODO: log the error
+ }
+ else {
+ // incomplete packet body, buffer it for the next 'data' event
+ s = chunk.substring(offset);
+ receiveBuffer.push(s);
+ receivedLength += s.length;
+ break;
+ }
+ }
+ });
+
+ pythonClient.on('drain', function () {
+ logger.debug('The Python client connection has been drained, resuming with sending any queued packets');
+ pythonSaturated = false;
+ sendPythonQueue();
+ });
+
+ pythonClient.on('end', function () {
+ logger.debug('The Python peer closed the connection');
+ pythonClient = null; // make it unavailable for further communication
+ });
+
+ pythonClient.on('error', function () {
+ logger.debug('There was an error on Python client connection');
+ pythonClient = null; // make it unavailable for further communication
+ });
+
+ pythonClient.on('close', function () {
+ logger.debug('The Python client connection was closed, attempting to reconnect after half a second');
+ setTimeout(connectPython, 500); // reconnect after half a second
+ });
+};
+
+connectPython(); // the initial connection attempt
diff --git a/web/package.json b/web/package.json
new file mode 100644
index 0000000..d1eba7d
--- /dev/null
+++ b/web/package.json
@@ -0,0 +1,9 @@
+{
+ "name": "CodeQWeb",
+ "version": "0.0.1",
+ "dependencies": {
+ "engine.io": "1.5.x",
+ "bluebird": "2.9.x",
+ "log4js": "0.6.x"
+ }
+} \ No newline at end of file
diff --git a/wsgi_server.py b/wsgi_server.py
index 324f338..f754b14 100644
--- a/wsgi_server.py
+++ b/wsgi_server.py
@@ -1,6 +1,9 @@
#!/usr/bin/python3
# coding=utf-8
+# TODO: this module is deprecated, remove it
+# TODO: all new development should occur in server/handlers.py instead
+
import falcon
import json
import client