summaryrefslogtreecommitdiff
path: root/server
diff options
context:
space:
mode:
authorAleš Smodiš <aless@guru.si>2015-09-14 14:49:54 +0200
committerAleš Smodiš <aless@guru.si>2015-09-14 14:49:54 +0200
commitafdb3c0b28715b3d9a0982e4e0504a0cbcf11e70 (patch)
treed052125c6fc4fb4803b10dd20d3c24fc3a2711f9 /server
parentd82013c214021d6e5480d18105760fa70cfc708b (diff)
Reimplemented communication with the client side.
* Implemented a node web server supporting asynchronous websocket and long-polling communication with clients. * Implemented TCP communication between python middleware and node web server.
Diffstat (limited to 'server')
-rw-r--r--server/__init__.py7
-rw-r--r--server/handlers.py302
-rw-r--r--server/problems.py3
-rw-r--r--server/socket.py286
-rw-r--r--server/user_session.py11
5 files changed, 608 insertions, 1 deletions
diff --git a/server/__init__.py b/server/__init__.py
index 284c162..861a7df 100644
--- a/server/__init__.py
+++ b/server/__init__.py
@@ -3,9 +3,14 @@
import multiprocessing.managers
from . import user_session
from . import prolog_session
+from . import python_session
+from . import socket
import server.problems
-__all__ = ['user_session', 'prolog_session', 'problems', 'start']
+__all__ = ['socket', 'handlers', 'user_session', 'prolog_session', 'python_session', 'problems', 'start']
+
+
+# TODO: everything below is deprecated, remove it
class Codeq(object):
_method_to_typeid_ = {
diff --git a/server/handlers.py b/server/handlers.py
new file mode 100644
index 0000000..e9cf7c8
--- /dev/null
+++ b/server/handlers.py
@@ -0,0 +1,302 @@
+# coding=utf-8
+
+from concurrent.futures import ThreadPoolExecutor
+import traceback
+from errors.session import *
+import server
+
+
+class CodeqService(object):
+ """Base class for all CodeQ services.
+ """
+ session_is_optional = False
+
+ def process(self, request):
+ pass
+
+
+class ProblemList(CodeqService):
+ """List all available problems to the client.
+ """
+ session_is_optional = True
+
+ def process(self, request):
+ request.reply({'code': 0, 'message': 'ok', 'problems': server.problems.list_problems()})
+
+
+class Login(CodeqService):
+ """Logs in a client, creating a new session.
+ """
+ session_is_optional = True
+
+ def process(self, request):
+ js = request.data
+ username = js.get('username')
+ password = js.get('password')
+ if username is None:
+ request.reply({'code': 1, 'message': 'Username was not provided'})
+ elif password is None:
+ request.reply({'code': 2, 'message': 'Password was not provided'})
+ else:
+ try:
+ session = server.user_session.authenticate_and_create_session(username, password)
+ except AuthenticationFailed:
+ request.reply({'code': 3, 'message': 'Username or password do not match'})
+ else:
+ if request.session:
+ request.session.destroy()
+ request.reply({'code': 0, 'message': 'OK', 'sid':session.get_sid()})
+
+
+class Activity(CodeqService):
+ def process(self, request):
+ js = request.data
+ trace = js.get('trace')
+ solution = js.get('solution')
+ problem_id = js.get('problem_id')
+ if (trace is not None) or (solution is not None):
+ # we have something to do
+ if problem_id is None:
+ request.reply({'code': 1, 'message': 'Problem ID is missing'})
+ else:
+ request.session.update_solution(problem_id, trace, solution)
+ request.end() # no feedback, just acknowledge the reception
+
+
+class Query(CodeqService):
+ def process(self, request):
+ js = request.data
+ step = js.get('step')
+ if step is None:
+ request.reply({'code': 1, 'message': '"step" is not set'})
+ else:
+ problem_id = js.get('problem_id')
+ if problem_id is None:
+ request.reply({'code': 4, 'message': 'Problem ID not given'})
+ else:
+ session = request.session
+ trace = js.get('trace')
+ prolog = session.get_prolog()
+ program = None
+ if step == 'run':
+ program = js.get('program')
+ query = js.get('query')
+ if program is None:
+ result = {'code': 2, 'message': 'No program specified'}
+ elif query is None:
+ result = {'code': 3, 'message': 'No query specified'}
+ else:
+ messages, status, have_more = prolog.run_for_user(session.get_uid(), problem_id, program, query)
+ result = {'code': 0, 'message': 'ok', 'terminal': {'messages': messages, 'status': status, 'have_more': have_more}}
+ elif step == 'next':
+ messages, status, have_more = prolog.step()
+ result = {'code': 0, 'message': 'ok', 'terminal': {'messages': messages, 'status': status, 'have_more': have_more}}
+ elif step == 'end':
+ messages, status, have_more = prolog.end()
+ result = {'code': 0, 'message': 'ok', 'terminal': {'messages': messages, 'status': status, 'have_more': have_more}}
+ else:
+ result = {'code': 5, 'message': 'Unknown prolog step: {0}'.format(step)}
+ if program or trace:
+ session.update_solution(problem_id, trace, program)
+ request.reply(result)
+
+
+# Pull stdout/stderr from the session's Python interpreter. TODO: convert to async handling
+class PythonPull(CodeqService):
+ def process(self, request):
+ python = request.session.get_python()
+ output = python.pull()
+ request.reply({'code': 0, 'message': 'ok', 'terminal': {'text': output if output else ''}})
+
+
+# Push stdin to the session's Python interpreter. TODO: convert to async handling
+class PythonPush(CodeqService):
+ def process(self, request):
+ text = request.data.get('text')
+ if text is None:
+ request.reply({'code': 1, 'message': 'No input specified'})
+ else:
+ python = request.session.get_python()
+ python.push(text)
+ request.reply({'code': 0, 'message': 'ok'})
+
+
+class Hint(CodeqService):
+ def process(self, request):
+ js = request.data
+ language = js.get('language')
+ problem_id = js.get('problem_id')
+ program = js.get('program')
+
+ if problem_id is None:
+ request.reply({'code': 1, 'message': 'No problem ID specified'})
+ elif program is None:
+ request.reply({'code': 2, 'message': 'No program specified'})
+ else:
+ session = request.session
+ lang_session = None
+ if language == 'prolog':
+ lang_session = session.get_prolog()
+ elif language == 'python':
+ lang_session = session.get_python()
+
+ if lang_session is None:
+ request.reply({'code': 3, 'message': 'Unknown language specified'})
+ else:
+ hints = lang_session.hint(session.get_sid(), problem_id, program)
+ request.reply({'code': 0, 'message': 'ok', 'hints': hints})
+
+
+class Test(CodeqService):
+ def process(self, request):
+ js = request.data
+ language = js.get('language')
+ problem_id = js.get('problem_id')
+ program = js.get('program')
+
+ if problem_id is None:
+ request.reply({'code': 1, 'message': 'No problem ID specified'})
+ elif program is None:
+ request.reply({'code': 2, 'message': 'No program specified'})
+ else:
+ session = request.session
+ lang_session = None
+ if language == 'prolog':
+ lang_session = session.get_prolog()
+ elif language == 'python':
+ lang_session = session.get_python()
+
+ if lang_session is None:
+ request.reply({'code': 3, 'message': 'Unknown language specified'})
+ else:
+ hints = lang_session.test(session.get_sid(), problem_id, program)
+ request.reply({'code': 0, 'message': 'ok', 'hints': hints})
+
+
+class GetProblem(CodeqService):
+ def process(self, request):
+ js = request.data
+ language = js.get('language')
+ problem_group = js.get('problem_group')
+ problem = js.get('problem')
+ if language is None:
+ request.reply({'code': 1, 'message': 'Language identifier not given'})
+ elif problem_group is None:
+ request.reply({'code': 2, 'message': 'Problem group identifier not given'})
+ elif problem is None:
+ request.reply({'code': 3, 'message': 'Problem identifier not given'})
+ else:
+ request.reply({'code': 0, 'message': 'ok', 'data': request.session.get_problem_data(language, problem_group, problem)})
+
+
+# maps actions to their handlers
+incoming_handlers = {
+ 'list_problems': ProblemList(),
+ 'login': Login(),
+ 'get_problem': GetProblem(),
+ 'logout': None,
+ 'activity': Activity(),
+ 'query': Query(),
+ 'hint': Hint(),
+ 'test': Test()
+}
+
+
+class Request(object):
+ def __init__(self, tid, original_sid, session, data):
+ """Creates a new request
+
+ :param tid: communicator-level transaction ID (global relative to the specific communicator where it originated)
+ :param original_sid: session ID, optional
+ :param session: the actual session with the original_sid, if it exists; the processor may swap it for a new session
+ :param data: the request data from the client
+ :return: new instance
+ """
+ self._tid = tid
+ self._original_sid = original_sid
+ self.session = session
+ self.data = data
+ self.is_finished = False
+
+ def reply(self, data):
+ """Reply to this request.
+
+ :param data: the dictionary representing the reply, that will be converted to JSON
+ :return: None
+ """
+ if data is None:
+ self.end()
+ if self._original_sid is not None:
+ sid = data.get('sid')
+ if sid is None:
+ data['sid'] = self._original_sid
+ elif sid != self._original_sid:
+ data['sid'] = self._original_sid
+ data['new_sid'] = sid
+ # it is important to reply with the same tid and sid parameters as were in the request, so message accounting doesn't get confused
+ send(self._tid, self._original_sid, data)
+ self.is_finished = True
+
+ def end(self):
+ """Conclude the request, without sending a response.
+
+ This is to acknowledge that the response has been received.
+ :return: None
+ """
+ send(self._tid, self._original_sid, None)
+ self.is_finished = True
+
+
+########## low-level machinery, subject to change to support more than the single socket communicator ##########
+
+_executor = ThreadPoolExecutor(max_workers=100)
+
+def _invoke_handler(handler, request):
+ try:
+ print('Worker thread processing data={}'.format(str(request.data)))
+ handler.process(request)
+ if not request.is_finished:
+ print('ERROR: the request was not concluded!')
+ request.reply({'code': -1, 'message': 'Request processing did not provide a reply'})
+ print('Processing finished')
+ except Exception as e:
+ print('ERROR: data processing failed: ' + str(e))
+ traceback.print_exc()
+ request.reply({'code': -1, 'message': 'Internal error: ' + str(e)})
+
+def serve_request(json_obj):
+ if not isinstance(json_obj, dict):
+ raise RequestProcessingError('Require a request represented as a dict, instead got: ' + str(type(json_obj)))
+ tid = json_obj.get('tid') # session ID and transaction ID uniquely identify a transaction
+ sid = json_obj.get('sid')
+ action = json_obj.get('action')
+ if action is None:
+ raise RequestProcessingError('Request does not contain an action')
+ if not isinstance(action, str):
+ raise RequestProcessingError('Requested action must be a string, got: ' + str(type(action)))
+ handler = incoming_handlers.get(action)
+ if handler is None:
+ raise RequestProcessingError('No handler for ' + action)
+ print("Attempting to serve action={}".format(action))
+ session = None
+ if sid is None:
+ if not handler.session_is_optional:
+ raise RequestProcessingError('Request is missing a session-ID')
+ else:
+ del json_obj['sid']
+ try:
+ session = server.user_session.get_session_by_id(sid)
+ except NoSuchSession:
+ if not handler.session_is_optional:
+ raise RequestProcessingError('This user session has expired. Please log-in again.')
+ _executor.submit(_invoke_handler, handler, Request(tid, sid, session, json_obj))
+
+def send(tid, sid, json_obj):
+ # just a proxy function for now
+ print('Sending reply: {}'.format(str(json_obj)))
+ server.socket.sendPacket(tid, sid, json_obj)
+
+def stop():
+ global _executor
+ _executor.shutdown()
+ _executor = None
diff --git a/server/problems.py b/server/problems.py
index 02f0307..1c87345 100644
--- a/server/problems.py
+++ b/server/problems.py
@@ -19,6 +19,9 @@ def load_module(fullname):
d = os.path.join(_path_prefix, *parts[:-1])
ff = importlib.machinery.FileFinder(d, (importlib.machinery.SourceFileLoader, ['.py']))
spec = ff.find_spec(fullname)
+ if spec is None:
+ print('ERROR: there is no problem module {0}'.format(fullname))
+ return None
mod = type(sys)(fullname)
mod.__loader__ = spec.loader
mod.__package__ = spec.parent
diff --git a/server/socket.py b/server/socket.py
new file mode 100644
index 0000000..c248056
--- /dev/null
+++ b/server/socket.py
@@ -0,0 +1,286 @@
+# coding=utf-8
+
+import socket
+import selectors
+import json
+import threading
+import traceback
+from server.handlers import serve_request
+
+# TODO: add a whole lot of try..except blocks, and just make it overall error resistant
+# TODO: add logging
+
+_mapping_lock = threading.Lock() # the lock guarding access to the two mappings below
+
+_sessions_to_socket = {} # keyed by sid: what session is bound to what socket
+
+_transactions_to_socket = {} # keyed by tid, used only when there is no sid available, so a request can be replied
+
+
+def processIncomingPacket(receiving_socket, packet):
+ print('Decocoding JSON: {}'.format(packet))
+ obj = json.loads(packet)
+ if obj.get('type') == 'connect':
+ return # TODO: first packet is 'connect', providing the list of connected sessions to the peer
+ tid = obj.get('tid') # transaction ID
+ if tid is None:
+ raise Exception('Transaction ID is missing from the request')
+ sid = obj.get('sid') # session ID
+ _mapping_lock.acquire()
+ try:
+ if sid is None:
+ _transactions_to_socket[tid] = receiving_socket
+ else:
+ _sessions_to_socket[sid] = receiving_socket
+ finally:
+ _mapping_lock.release()
+ serve_request(obj)
+
+
+def sendPacket(tid, sid, json_obj):
+ if sid is None:
+ if tid is None:
+ raise Exception('Cannot send a message without a tid and/or a sid')
+ _mapping_lock.acquire()
+ try:
+ socket = _transactions_to_socket.get(tid)
+ del _transactions_to_socket[tid] # this mapping is not relevant anymore
+ finally:
+ _mapping_lock.release()
+ if socket is None:
+ raise Exception('Cannot send in transaction, it is not registered: ' + tid)
+ else:
+ _mapping_lock.acquire()
+ try:
+ socket = _sessions_to_socket.get(sid)
+ finally:
+ _mapping_lock.release()
+ if socket is None:
+ raise Exception('Cannot send to session, it is not registered: ' + sid)
+ if json_obj is None:
+ # a special case: this is only an acknowledgment, so any request/response accounting can be cleared
+ json_obj = {}
+ if sid is not None:
+ json_obj['sid'] = sid
+ if tid is not None:
+ json_obj['tid'] = tid
+ socket.send(json_obj)
+
+
+class SocketHandler(object):
+
+ def read(self):
+ pass
+
+ def destroy(self):
+ pass
+
+
+class ServerSocket(SocketHandler):
+
+ def __init__(self, communication, hostname='localhost', port=19732):
+ self.communication = communication
+ server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ server_socket.bind((hostname, port))
+ server_socket.listen(5)
+ server_socket.setblocking(False)
+ self.socket = server_socket
+
+ def read(self):
+ conn, addr = self.socket.accept()
+ self.communication.add_socket(JsonClientSocket(self.communication, conn, addr))
+
+ def destroy(self):
+ self.communication.remove_socket(self)
+ try:
+ self.socket.close()
+ except:
+ pass
+
+
+class JsonClientSocket(SocketHandler):
+
+ def __init__(self, communication, socket, address):
+ self.communication = communication
+ self.socket = socket
+ self.address = address
+ socket.setblocking(False)
+ self.status = 0 # registering
+ self.receiving_header = True
+ self.buffer = []
+ self.received_length = 0 # the cumulative length of strings in buffer
+ self.body_length = 0 # the length of the next packet, given in the header
+ self._read_lock = threading.Lock()
+ self._write_lock = threading.Lock()
+
+ def read(self):
+ lock = self._read_lock
+ lock.acquire()
+ try:
+ data = self.socket.recv(8192)
+ if not data:
+ self.destroy()
+ else:
+ # packet decode loop
+ data = data.decode('utf-8')
+ print("received: {}".format(data))
+ offset = 0
+ N = len(data)
+ while (offset < N):
+ if self.receiving_header:
+ i = data.find(':', offset)
+ if i < 0:
+ self.buffer.append(data[offset:])
+ break
+ if len(self.buffer) == 0:
+ self.body_length = int(data[offset:i])
+ else:
+ self.buffer.append(data[offset:i])
+ self.body_length = int(''.join(self.buffer))
+ self.buffer.clear()
+ offset = i + 1
+ self.receiving_header = False
+ continue
+ elif (self.received_length + N - offset) <= self.body_length:
+ if self.received_length == 0:
+ # an optimized case
+ s = data[offset:offset+self.body_length]
+ offset += len(s)
+ else:
+ s = data[offset:offset+self.body_length - self.received_length] # TODO: is this correct?
+ offset += len(s)
+ self.buffer.append(s)
+ s = ''.join(self.buffer)
+ self.buffer.clear()
+ self.received_length = 0
+ self.receiving_header = True
+ try:
+ processIncomingPacket(self, s)
+ except Exception as e:
+ # any exception that propagates to here means a possible protocol error, we have to disconnect
+ traceback.print_exc()
+ self.destroy()
+ return
+ else:
+ # incomplete packet body, buffer it until the next time
+ s = data[offset:]
+ self.buffer.append(s)
+ self.received_length += len(s)
+ break
+ finally:
+ lock.release()
+
+ def destroy(self):
+ self.communication.remove_socket(self)
+ try:
+ self.socket.close()
+ except:
+ pass
+ # TODO: unregister from any internal mechanisms
+
+ def send(self, json_obj):
+ js = json.dumps(json_obj)
+ m = str(len(js)) + u':' + js
+ bytes = m.encode('utf-8')
+ lock = self._write_lock
+ lock.acquire()
+ try:
+ self.socket.sendall(bytes)
+ finally:
+ lock.release()
+
+
+class Communication(SocketHandler):
+ def __init__(self):
+ # the selector to register sockets with, the serve_forever() uses it to handle any incoming data
+ self._selector = selectors.DefaultSelector() # to wait until at least on socket is readable
+ self._lock = threading.Lock() # the access lock, because of multi-threading
+ self._remove_queue = [] # what sockets to remove
+ self._add_queue = [] # what sockets to add
+ notifier, receiver = socket.socketpair() # the notification mechanism for the serving thread, using the selector
+ receiver.setblocking(False)
+ self._notifier = notifier
+ self._receiver = receiver
+ self._registered_handlers = set()
+ self._not_signalled = True
+ self._destroying = False
+
+ def serve_forever(self):
+ self._selector.register(self._receiver, selectors.EVENT_READ, self)
+ while True:
+ events = self._selector.select()
+ for key, mask in events:
+ if mask & selectors.EVENT_READ:
+ key.data.read()
+
+ def add_socket(self, socket_handler):
+ lock = self._lock
+ lock.acquire()
+ try:
+ self._add_queue.append(socket_handler)
+ self._signal()
+ finally:
+ lock.release()
+
+ def remove_socket(self, socket_handler):
+ lock = self._lock
+ lock.acquire()
+ try:
+ self._remove_queue.append(socket_handler)
+ self._signal()
+ finally:
+ lock.release()
+
+ def _signal(self): # invoke holding the lock
+ if self._not_signalled:
+ self._notifier.send(b'1')
+ self._not_signalled = False
+
+ def read(self):
+ # we have been invoked to process some new additions and/or removals
+ lock = self._lock
+ lock.acquire()
+ try:
+ self._receiver.recv(1024) # clear the queue
+ self._not_signalled = True
+ selector = self._selector
+ for handler in self._remove_queue:
+ selector.unregister(handler.socket)
+ self._registered_handlers.remove(handler)
+ self._remove_queue.clear()
+ for handler in self._add_queue:
+ selector.register(handler.socket, selectors.EVENT_READ, handler)
+ self._registered_handlers.add(handler)
+ self._add_queue.clear()
+ if self._destroying:
+ self._notifier.close()
+ self._receiver.close()
+ self._selector.close()
+ finally:
+ lock.release()
+
+ def destroy(self):
+ lock = self._lock
+ # make a copy of handlers, because as we're destroying them each will invoke remove_socket(), and for that the lock must be available and the iterating set must not change
+ lock.acquire()
+ try:
+ handlers = set(self._registered_handlers)
+ finally:
+ lock.release()
+ for handler in handlers:
+ handler.destroy() # destroy them all, even the server socket
+ # this is where everything is destroyed but us, so what follows is self-destruct
+ lock.acquire()
+ try:
+ self._destroying = True
+ self._signal()
+ finally:
+ lock.release()
+
+
+def serve_forever():
+ communication = Communication()
+ server = ServerSocket(communication)
+ communication.add_socket(server)
+ communication.serve_forever()
diff --git a/server/user_session.py b/server/user_session.py
index f21102c..5eec743 100644
--- a/server/user_session.py
+++ b/server/user_session.py
@@ -8,6 +8,7 @@ import random
from . import prolog_session
from . import python_session
from . import problems
+from . import handlers
import db
from errors.session import NoSuchSession, AuthenticationFailed
import psycopg2.extras
@@ -128,6 +129,16 @@ class UserSession(object):
finally:
db.return_connection(conn)
+ def send(self, json_obj):
+ """Sends a message to the user.
+
+ This method may be used only for messages that are not replies to requests.
+ For replies use the reply() method on the Request object.
+ :param json_obj: a dict representing the json message
+ :return: None
+ """
+ handlers.send(None, self.sid, json_obj)
+
def __del__(self):
# no locking needed if GC is removing us, as there cannot be any concurrent access by definition
if hasattr(self, 'prolog_session') and (self.prolog_session is not None):