From a4f639242f9f6221a486e0e91adeb75ba6096f45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ale=C5=A1=20Smodi=C5=A1?= Date: Mon, 24 Aug 2015 19:09:41 +0200 Subject: Split the development into daemon and wsgi_server. Implemented basic infrastructure for daemon (Prolog), and partial support for services in wsgi_server. --- client/__init__.py | 26 ++++++++++ daemon.py | 6 +++ db/__init__.py | 20 +++++--- errors/__init__.py | 1 + errors/session.py | 7 +++ readme.txt | 1 + server/__init__.py | 23 +++++++++ server/problems.py | 83 ++++++++++++++++++++++++++++++ server/prolog_session.py | 117 +++++++++++++++++++++++++++++++++++++++++++ server/user_session.py | 122 ++++++++++++++++++++++++++++++++++++++++++++ wsgi_server.py | 128 +++++++++++++++++++++++++++++++++++++++++++++++ 11 files changed, 528 insertions(+), 6 deletions(-) create mode 100644 client/__init__.py create mode 100644 daemon.py create mode 100644 errors/__init__.py create mode 100644 errors/session.py create mode 100644 server/__init__.py create mode 100644 server/problems.py create mode 100644 server/prolog_session.py create mode 100644 server/user_session.py create mode 100644 wsgi_server.py diff --git a/client/__init__.py b/client/__init__.py new file mode 100644 index 0000000..8781a96 --- /dev/null +++ b/client/__init__.py @@ -0,0 +1,26 @@ +# coding=utf-8 + +import multiprocessing.managers + +__all__ = ['get_session_by_id', 'get_or_create_session'] + +class CodeqManager(multiprocessing.managers.BaseManager): + pass + +CodeqManager.register('PrologSession') +CodeqManager.register('UserSession') +CodeqManager.register('get_session_by_id') +CodeqManager.register('get_or_create_session') +CodeqManager.register('authenticate_and_create_session') + +m = CodeqManager(address=('localhost', 16231), authkey=b'c0d3q3y') +m.connect() + +def get_session_by_id(sid): + return m.get_session_by_id(sid) + +def get_or_create_session(uid, username, sid=None): + return m.get_or_create_session(uid, username, sid) + +def authenticate_and_create_session(username, password): + return m.authenticate_and_create_session(username, password) diff --git a/daemon.py b/daemon.py new file mode 100644 index 0000000..0809004 --- /dev/null +++ b/daemon.py @@ -0,0 +1,6 @@ +#!/usr/bin/python3 +# coding=utf-8 + +if __name__ == '__main__': + import server + server.start() diff --git a/db/__init__.py b/db/__init__.py index e02a1fe..def4bb8 100644 --- a/db/__init__.py +++ b/db/__init__.py @@ -1,11 +1,13 @@ # coding=utf-8 import os - +import threading import psycopg2 __all__ = ['get_connection', 'return_connection', 'setup', 'models'] +_module_access_lock = threading.Lock() + _host = None # the database hostname/IP _port = None # the database port number _database = None # the name of the database @@ -44,16 +46,22 @@ _connection_pool = [] def get_connection(): """Retrieves a database connection from the connection pool.""" - if _host is None: - setup() # lazy init - if len(_connection_pool) > 0: - return _connection_pool.pop() + with _module_access_lock: + if _host is None: + setup() # lazy init + if len(_connection_pool) > 0: + return _connection_pool.pop() return psycopg2.connect(host=_host, port=_port, database=_database, user=_username, password=_password) def return_connection(connection): """Returns the given database connection to the pool.""" - _connection_pool.append(connection) + try: + connection.rollback() # sanity check + except: + return + with _module_access_lock: + _connection_pool.append(connection) if __name__ == '__main__': diff --git a/errors/__init__.py b/errors/__init__.py new file mode 100644 index 0000000..9bad579 --- /dev/null +++ b/errors/__init__.py @@ -0,0 +1 @@ +# coding=utf-8 diff --git a/errors/session.py b/errors/session.py new file mode 100644 index 0000000..d173fbf --- /dev/null +++ b/errors/session.py @@ -0,0 +1,7 @@ +# coding=utf-8 + +class NoSuchSession(Exception): + pass + +class AuthenticationFailed(Exception): + pass diff --git a/readme.txt b/readme.txt index 3c9f4d1..6f92562 100644 --- a/readme.txt +++ b/readme.txt @@ -2,4 +2,5 @@ Python3 packages required: - python3-ply - python3-termcolor - python3-psycopg2 +- python3-falcon diff --git a/server/__init__.py b/server/__init__.py new file mode 100644 index 0000000..f2c73b7 --- /dev/null +++ b/server/__init__.py @@ -0,0 +1,23 @@ +# coding=utf-8 + +import multiprocessing.managers +from . import user_session +from . import prolog_session + +__all__ = ['user_session'] + +class CodeqManager(multiprocessing.managers.BaseManager): + pass + +class UserSessionProxy(multiprocessing.managers.BaseProxy): + _method_to_typeid_ = {'get_prolog':'PrologSession'} + +CodeqManager.register('PrologSession') +CodeqManager.register('UserSession', proxytype=UserSessionProxy) +CodeqManager.register('get_session_by_id', callable=user_session.get_session_by_id, proxytype=UserSessionProxy) +CodeqManager.register('get_or_create_session', callable=user_session.get_or_create_session, proxytype=UserSessionProxy) +CodeqManager.register('authenticate_and_create_session', callable=user_session.authenticate_and_create_session, proxytype=UserSessionProxy) + +def start(): + m = CodeqManager(address=('localhost', 16231), authkey=b'c0d3q3y') + m.get_server().serve_forever() diff --git a/server/problems.py b/server/problems.py new file mode 100644 index 0000000..7fb2606 --- /dev/null +++ b/server/problems.py @@ -0,0 +1,83 @@ +# coding=utf-8 + +import sys +import importlib.machinery +import threading + +#sys.path.append('/home/aless/job/codeq/source/codeq-problems/') +_path_prefix = '/home/aless/job/codeq/source/codeq-problems/' +_module_loading_lock = threading.RLock() # TODO: make a more fine-grained locking import machinery + +def load_module(fullname): +# return importlib.import_module(fullname) + with _module_loading_lock: + mod = sys.modules.get(fullname, None) + if mod is None: + parts = fullname.split('.') + d = _path_prefix + '/'.join(parts[0:-1]) + ff = importlib.machinery.FileFinder(d, (importlib.machinery.SourceFileLoader, ['.py'])) + spec = ff.find_spec(fullname) + mod = type(sys)(fullname) + mod.__loader__ = spec.loader + mod.__package__ = spec.parent + mod.__spec__ = spec + if spec.has_location: + mod.__file__ = spec.origin + mod.__cached__ = spec.cached + sys.modules[fullname] = mod + try: + spec.loader.exec_module(mod) + except: + try: + del sys.modules[fullname] + except KeyError: + pass + raise + return mod + +def load_problem(language, problem_group, problem, tail_module): + return load_module('{0}.problems.{1}.{2}.{3}'.format(language, problem_group, problem, tail_module)) + +def load_facts(language, fact_module): + return load_module('{0}.facts.{1}'.format(language, fact_module)) + +def load_problems(language, tuples, tail_module): + modules = [] + for problem_group, problem in tuples: + mod = '{0}.problems.{1}.{2}.{3}'.format(language, problem_group, problem, tail_module) + print('importing {}'.format(mod)) + modules.append(load_module(mod)) + return modules + +def get_facts(language, problem_module): + try: + facts = problem_module.facts + except AttributeError as e: + return None + if facts is None: + return None + module = load_facts(language, facts) + if module: + try: + return module.facts + except AttributeError as e: + return None + return None + +def solutions_for_problems(language, tuples): + if not tuples: + return '' + modules = load_problems(language, tuples, 'common') + solutions = set() + facts = set() + for module in modules: + try: + solution = module.solution + except AttributeError as me: + pass + else: + solutions.add(solution) + f = get_facts(language, module) + if f: + facts.add(f) + return '\n'.join(facts) + '\n' + '\n'.join(solutions) diff --git a/server/prolog_session.py b/server/prolog_session.py new file mode 100644 index 0000000..e00afd8 --- /dev/null +++ b/server/prolog_session.py @@ -0,0 +1,117 @@ +# coding=utf-8 + +import operator +import threading +import prolog.engine +import db +from . import problems + +__all__ = ['PrologSession'] + +def format_prolog_output(reply, output): + messages = [text for text in map(operator.itemgetter(1), output)] + # When an engine is destroyed, a nested data object has the actual query result. + event = reply['event'] + if event == 'destroy': + reply = reply['data'] + event = reply['event'] + + if event == 'success': + messages.append(prolog.engine.pretty_vars(reply['data'][0])) + return messages, 'ok', True and reply['more'] + if event == 'failure': + messages.append('false') + return messages, 'ok', False + if event == 'error': + # Remove potential module name (engine ID) from the error message. + messages.append('error: ' + reply['data'].replace("'{}':".format(reply['id']), '')) + return messages, 'error', False + + return messages, 'ok', False # TODO: is it possible to reach this return statement? + + +class PrologSession(object): + """Abstracts a Prolog session. + Only public methods are available to the outside world due to the use of multiprocessing managers. + Therefore prefix any private methods with an underscore (_). + No properties are accessible; use getters and setters instead. + Values are passed by value instead of by reference (deep copy!). + """ + def __init__(self): + self._access_lock = threading.Lock() + self._engine_id = None + + def run(self, code): + with self._access_lock: + if self._engine_id is not None: + prolog.engine.stop(self._engine_id) + self._engine_id = None + reply, output = prolog.engine.create(code=code) + if reply.get('event') != 'create': + raise Exception('System error: could not create a prolog engine') + self._engine_id = reply['id'] + messages = [text for text in map(operator.itemgetter(1), output)] + status = 'error' if 'error' in map(operator.itemgetter(0), output) else 'ok' + return messages, status, False + + def query(self, query): + with self._access_lock: + if self._engine_id is None: + return ['Prolog is not running'], 'error', False + try: + return format_prolog_output(*prolog.engine.ask(self._engine_id, query)) + except Exception as e: + return [str(e)], 'error', False + + def step(self): + with self._access_lock: + if self._engine_id is None: + return ['Prolog is not running'], 'error', False + try: + return format_prolog_output(*prolog.engine.next(self._engine_id)) + except Exception as e: + return [str(e)], 'error', False + + def end(self): + """Stops the Prolog engine.""" + with self._access_lock: + if self._engine_id is not None: + prolog.engine.stop(self._engine_id) + self._engine_id = None + return [], 'ok', False + + def __del__(self): + # no locking needed if GC is removing us, as there cannot be any concurrent access by definition + if hasattr(self, '_engine_id') and (self._engine_id is not None): + prolog.engine.stop(self._engine_id) + self._engine_id = None + + def run_for_user(self, user_id, language, problem_group, problem, program, query): + """A "shorthand" method to start a Prolog session, load correct solutions of all user's solved + problems and the given program, and ask a query. + """ + conn = db.get_connection() + try: + cur = conn.cursor() + try: + cur.execute('select l.id, p.id from problem p inner join problem_group g on g.id = p.problem_group_id inner join language l on l.id = p.language_id where l.identifier = %s and g.identifier = %s and p.identifier = %s', (language, problem_group, problem)) + row = cur.fetchone() + language_id = row[0] + problem_id = row[1] + cur.execute('select g.identifier, p.identifier from solution s inner join problem p on p.id = s.problem_id inner join problem_group g on g.id = p.problem_group_id where s.codeq_user_id = %s and s.done = True and s.problem_id != %s and p.language_id = %s', (user_id, problem_id, language_id)) + solved_problems = cur.fetchall() + finally: + cur.close() + finally: + conn.commit() + db.return_connection(conn) + + other_solutions = problems.solutions_for_problems(language, solved_problems) + problem_module = problems.load_problem(language, problem_group, problem, 'common') + problem_facts = problems.get_facts(language, problem_module) or '' + code = other_solutions + problem_facts + program + messages, status, have_more = self.run(code) + if status == 'ok': + more_messages, status, have_more = self.query(query) + messages.extend(more_messages) + return messages, status, have_more diff --git a/server/user_session.py b/server/user_session.py new file mode 100644 index 0000000..e418f8d --- /dev/null +++ b/server/user_session.py @@ -0,0 +1,122 @@ +# coding=utf-8 + +import uuid +import threading # multiprocessing.managers.BaseManager uses threading to serve incoming requests +import hashlib +import base64 +import random +from . import prolog_session +import db +from errors.session import NoSuchSession, AuthenticationFailed + +__all__ = ['get_session_by_id', 'get_or_create_session', 'UserSession'] + +sessions = {} # maps session IDs to session objects + +module_access_lock = threading.Lock() # use this lock to access the sessions dictionary + +class UserSession(object): + """Abstracts a user session. + Only public methods are available to the outside world due to the use of multiprocessing managers. + Therefore prefix any private methods with an underscore (_). + No properties are accessible; use getters and setters instead. + Values are passed by value instead of by reference (deep copy!). + """ + def __init__(self, uid, username): + self._access_lock = threading.Lock() + self.sid = uuid.uuid4().hex + self.uid = uid + self.username = username + self.prolog_session = None + + def destroy(self): + """Destroys the session.""" + with self._access_lock: + with module_access_lock: + del sessions[self.sid] + if self.prolog_session is not None: + self.prolog_session.end() + self.prolog_session = None + # TODO: add any cleanups as features are added! + + def get_sid(self): + return self.sid + + def get_uid(self): + return self.uid + + def get_prolog(self): + with self._access_lock: + if self.prolog_session is None: + self.prolog_session = prolog_session.PrologSession() # lazy init + return self.prolog_session + + def __del__(self): + # no locking needed if GC is removing us, as there cannot be any concurrent access by definition + if hasattr(self, 'prolog_session') and (self.prolog_session is not None): + self.prolog_session.end() + self.prolog_session = None + # TODO: add any cleanups as features are added! + +def get_session_by_id(sid): + with module_access_lock: + s = sessions.get(sid, None) + if s is None: + raise NoSuchSession('There is no session with SID {}'.format(sid)) + return s + +def get_or_create_session(uid, username, sid=None): + with module_access_lock: + if sid is not None: + s = sessions.get(sid) + if s is not None: + return s + s = UserSession(uid, username) + sessions[s.sid] = s + return s + +def authenticate_and_create_session(username, password): + conn = db.get_connection() + try: + cur = conn.cursor() + try: + cur.execute('select id, password from codeq_user where username = %s', (username,)) + row = cur.fetchone() + if row is None: + raise AuthenticationFailed('No such user: {}'.format(username)) + if verify_password(password, row[1]): + return get_or_create_session(row[0], username) + raise AuthenticationFailed('Password mismatch') + finally: + cur.close() + finally: + conn.commit() + db.return_connection(conn) + +def verify_password(plain_password, encrypted_password): + elts = encrypted_password.split('$') + if len(elts) != 4: + return False + if elts[0] != 'pkbdf2_sha256': + return False + try: + rounds = int(elts[1]) + except: + return False + enc = hashlib.pbkdf2_hmac('sha256', plain_password.encode('utf-8'), elts[2].encode('utf-8'), rounds) + return base64.b64encode(enc).decode('utf-8') == elts[3] + +_salt_chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' +_salt_chars_len = len(_salt_chars) +def encrypt_password(plain_password): + rounds = 20000 + chosen_chars = [] + for i in range(0, 12): + chosen_chars.append(_salt_chars[random.randrange(0, _salt_chars_len)]) + salt = ''.join(chosen_chars) + enc = hashlib.pbkdf2_hmac('sha256', plain_password.encode('utf-8'), salt.encode('utf-8'), rounds) + return '{0}${1}${2}${3}'.format('pkbdf2_sha256', rounds, salt, base64.b64encode(enc).decode('utf-8')) + +random.seed() + +# TODO: add a session timeout timer diff --git a/wsgi_server.py b/wsgi_server.py new file mode 100644 index 0000000..0f70b40 --- /dev/null +++ b/wsgi_server.py @@ -0,0 +1,128 @@ +#!/usr/bin/python3 +# coding=utf-8 + +import falcon +import json +import client +from errors.session import NoSuchSession, AuthenticationFailed + +MAX_REQUEST_LENGTH = 1024 * 1024 # 1 MB + +api = application = falcon.API() + + +def get_json_payload(req, session_is_optional=False): + if not ((req.content_type == 'application/json') or (req.content_type == 'text/json')): + raise falcon.HTTPUnsupportedMediaType('Unsupported content-type: {0}'.format(req.content_type)) + length = req.get_header('Content-Length') + if length is None: + raise falcon.HTTPLengthRequired('Requests without content-length are not accepted') + try: + l = int(length) + except: + raise falcon.HTTPBadRequest('Invalid Content-Length', 'The given Content-Length is not a number: {0}'.format(length)) + if l > MAX_REQUEST_LENGTH: + raise falcon.HTTPError(falcon.HTTP_413, 'Request Entity Too Large', 'Cannot accept the request of length {0}: maximum allowed content-length is {1}'.format(length, MAX_REQUEST_LENGTH)) + try: + txt = req.stream.read() + except Exception as e: + raise falcon.HTTPBadRequest('Error reading request', 'Error while reading the request body: {0}'.format(e.message)) + try: + js = json.loads(txt, encoding="utf-8") + except ValueError as e: + raise falcon.HTTPBadRequest('Error parsing JSON payload', 'Error while parsing the request as JSON: {0}'.format(e.message)) + sid = js.get('sid') + if sid is None: + if session_is_optional: + return js + raise falcon.HTTPBadRequest('No Session', 'Request is missing a session-ID') + del js['sid'] + try: + session = client.get_session_by_id(sid) + except NoSuchSession: + if session_is_optional: + return js + raise falcon.HTTPBadRequest('Session Expired', 'This user session has expired. Please log-in again.') + req.context['session'] = session + return js + + +class Login(object): + def on_post(self, req, resp): + resp.cache_control = 'private, no-cache, no-store' + js = get_json_payload(req, True) + username = js.get('username') + password = js.get('password') + if username is None: + response = {'code': 1, 'message': 'Username was not provided'} + elif password is None: + response = {'code': 2, 'message': 'Password was not provided'} + else: + try: + session = client.authenticate_and_create_session(username, password) + except AuthenticationFailed: + response = {'code': 3, 'message': 'Username or password do not match'} + else: + response = {'code': 0, 'message': 'OK', 'sid':session.get_sid()} + old_session = req.context.get('session') + if old_session: + old_session.destroy() + resp.body = json.dumps(response) + resp.content_type = 'application/json' + resp.status = falcon.HTTP_200 + +class Activity(object): + def on_get(self, req, resp): + resp.body = '{}' + resp.status = falcon.HTTP_200 + +class Query(object): + def on_post(self, req, resp): + resp.cache_control = 'private, no-cache, no-store' + js = get_json_payload(req) + session = req.context['session'] + step = js.get('step') + if step is None: + response = {'code': 1, 'message': '"step" is not set'} + else: + prolog = session.get_prolog() + if step == 'run': + program = js.get('program') + query = js.get('query') + language = js.get('language') + problem_group = js.get('problem_group') + problem = js.get('problem') + if program is None: + response = {'code': 2, 'message': 'No program specified'} + elif query is None: + response = {'code': 3, 'message': 'No query specified'} + elif language is None: + response = {'code': 4, 'message': 'Language identifier not given'} + elif problem_group is None: + response = {'code': 5, 'message': 'Problem group identifier not given'} + elif problem is None: + response = {'code': 6, 'message': 'Problem identifier not given'} + else: + messages, status, have_more = prolog.run_for_user(session.get_uid(), language, problem_group, problem, program, query) + response = {'code': 0, 'message': 'ok', 'terminal': {'messages': messages, 'status': status, 'have_more': have_more}} + elif step == 'next': + messages, status, have_more = prolog.step() + response = {'code': 0, 'message': 'ok', 'terminal': {'messages': messages, 'status': status, 'have_more': have_more}} + elif step == 'end': + messages, status, have_more = prolog.end() + response = {'code': 0, 'message': 'ok', 'terminal': {'messages': messages, 'status': status, 'have_more': have_more}} + else: + response = {'code': 7, 'message': 'Unknown prolog step: {0}'.format(step)} + + resp.body = json.dumps(response) + resp.content_type = 'application/json' + resp.status = falcon.HTTP_200 + +login = Login() +api.add_route('/login', login) + +activity = Activity() +api.add_route('/activity', activity) + +query = Query() +api.add_route('/query', query) -- cgit v1.2.1