From 251c5e2ba0e85103c55cf31026739b2e7e9d4b90 Mon Sep 17 00:00:00 2001 From: Timotej Lazar Date: Wed, 16 Sep 2015 16:10:59 +0200 Subject: Implement async. comm. with Python interpreter Creating, destroying and communicationg with the interpreter subprocess is now handled by a thread attached to PythonSession. Interpreter is sandboxed using libseccomp. --- server/handlers.py | 9 --- server/python_session.py | 151 +++++++++++++++++++++++++++++++++++------------ server/user_session.py | 3 +- 3 files changed, 116 insertions(+), 47 deletions(-) (limited to 'server') diff --git a/server/handlers.py b/server/handlers.py index fb50ff4..0cc474f 100644 --- a/server/handlers.py +++ b/server/handlers.py @@ -101,14 +101,6 @@ class Query(CodeqService): request.reply(result) -# Pull stdout/stderr from the session's Python interpreter. TODO: convert to async handling -class PythonPull(CodeqService): - def process(self, request): - python = request.session.get_python() - output = python.pull() - request.reply({'code': 0, 'message': 'ok', 'terminal': {'text': output if output else ''}}) - - # Push stdin to the session's Python interpreter. TODO: convert to async handling class PythonPush(CodeqService): def process(self, request): @@ -198,7 +190,6 @@ incoming_handlers = { 'activity': Activity(), 'query': Query(), 'python_push': PythonPush(), - 'python_pull': PythonPull(), 'hint': Hint(), 'test': Test() } diff --git a/server/python_session.py b/server/python_session.py index 4af455f..33fe7cc 100644 --- a/server/python_session.py +++ b/server/python_session.py @@ -1,8 +1,15 @@ # coding=utf-8 import ast -import multiprocessing.managers +from fcntl import fcntl, F_GETFL, F_SETFL +import io +import multiprocessing +import os +import queue +import subprocess +import sys import threading +import time import server.user_session from db.models import Problem @@ -17,53 +24,45 @@ class PythonSession(object): No properties are accessible; use getters and setters instead. Values are passed by value instead of by reference (deep copy!). """ - def __init__(self): + def __init__(self, output_cb=None): self._access_lock = threading.Lock() self._sent_hints = [] - self._interpreter = None - # Proxy for calling the Python runner. We use a separate connection for - # each session so the runner can be restarted without affecting the - # server. - _m = multiprocessing.managers.BaseManager(address=('localhost', 3031), - authkey=b'c0d3q3y-python') - _m.register('Python') - _m.connect() - self._python = _m.Python() - - self.create() + self._control = queue.Queue() + self._interpreter = threading.Thread(target=_interpreter, + kwargs={'control': self._control, 'callback': output_cb}) + self._interpreter.start() def run(self, code=None, inputs=None, timeout=1.0): - with self._access_lock: - return self._python.run(code, inputs, timeout) - - def create(self): - with self._access_lock: - if self._interpreter is None: - self._interpreter = self._python.create() - - def pull(self): - with self._access_lock: - if self._interpreter is None: - return None - return self._python.pull(self._interpreter) + # Launch processes. + futures = [] + for expr, stdin in inputs: + conn_parent, conn_child = multiprocessing.Pipe() + p = multiprocessing.Process(target=_run_exec, args=(conn_child, code, expr, stdin)) + p.start() + futures.append((p, conn_parent)) + + # Wait for results. + results = [] + start = time.monotonic() + for p, conn in futures: + now = time.monotonic() + real_timeout = max(0, timeout - (now - start)) + if conn.poll(real_timeout): + results.append(conn.recv()) + else: + results.append('timed out') + p.terminate() + return results def push(self, stdin): - with self._access_lock: - if self._interpreter is not None: - self._python.push(self._interpreter, stdin) + self._control.put_nowait(('push', stdin)) def destroy(self): - with self._access_lock: - if self._interpreter is not None: - self._python.destroy(self._interpreter) - self._interpreter = None + self._control.put_nowait(('done', None)) def __del__(self): - # no locking needed if GC is removing us, as there cannot be any concurrent access by definition - if hasattr(self, '_interpreter') and self._interpreter is not None: - self._python.destroy(self._interpreter) - self._interpreter = None + self.destroy() def hint(self, sid, problem_id, program): language, problem_group, problem = Problem.get_identifier(problem_id) @@ -105,3 +104,81 @@ class PythonSession(object): hint_type = mod.hint_type[hint['id']] hint_type.instantiate(hint, self._sent_hints) self._sent_hints.extend(hints) + +def _interpreter(control, callback): + directory = os.path.dirname(os.path.realpath(__file__)) + # TODO drop privileges using a wrapper + script = os.path.join(directory, '..', 'python', 'interpreter.py') + + proc = None + while True: + # Ensure the interpreter process is running. + if proc is None: + proc = subprocess.Popen([script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + # Set the non-blocking flag for stdout. + flags = fcntl(proc.stdout.fileno(), F_GETFL) + fcntl(proc.stdout.fileno(), F_SETFL, flags | os.O_NONBLOCK) + + # Get a control command. + try: + cmd, data = control.get_nowait() + if cmd == 'push': + proc.stdin.write(data.encode('utf-8')) + proc.stdin.flush() + elif cmd == 'done': + break + except: + pass + + # Communicate with child process. + retcode = proc.poll() + if retcode is None: + data = proc.stdout.read() + if data: + # NOTE this might fail if read() stops in the middle of utf8 sequence + text = data.decode('utf-8') + if text: + callback(text) + else: + if retcode == -31: + callback('Child killed due to sandbox misbehavior.\n') + else: + callback('Child exited with status "{}".\n'.format(retcode)) + proc = None + + # TODO we should select() on control and proc.stdout instead of polling + time.sleep(0.1) + + # We are done, kill the child. + if proc is not None: + proc.kill() + +# Execute [code] and evaluate [expr]. Input is given by the string [stdin]. +# Return result of evaluation, the contents of stdout and stderr, and the +# exception traceback. +# TODO sandbox this +def _run_exec(conn, code, expr=None, stdin=''): + result, out, err, exc = None, None, None, None + sys.stdin = io.StringIO(stdin) + sys.stdout = io.StringIO() + sys.stderr = io.StringIO() + try: + env = {} + if code: + exec(code, env) + if expr: + result = eval(expr, env) + except Exception as ex: + # Exception is not JSON serializable, so return traceback as string. + import traceback + exc = traceback.format_exc() + finally: + out = sys.stdout.getvalue() + err = sys.stderr.getvalue() + sys.stdin.close() + sys.stdout.close() + sys.stderr.close() + conn.send((result, out, err, exc)) diff --git a/server/user_session.py b/server/user_session.py index 2cc629c..0d8535c 100644 --- a/server/user_session.py +++ b/server/user_session.py @@ -62,7 +62,8 @@ class UserSession(object): def get_python(self): with self._access_lock: if self.python_session is None: - self.python_session = python_session.PythonSession() # lazy init + self.python_session = python_session.PythonSession( + output_cb=lambda text: self.send({'event': 'terminal_output', 'text': text})) return self.python_session def get_problem_data(self, language, problem_group, problem): -- cgit v1.2.1 From 1720db308bf4481d6be45d4f7f611bab576b1184 Mon Sep 17 00:00:00 2001 From: Timotej Lazar Date: Fri, 18 Sep 2015 13:53:58 +0200 Subject: Simplify exceptions returned by PythonSession.run Don't include the first stack entry or the filename (which is ). --- server/python_session.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) (limited to 'server') diff --git a/server/python_session.py b/server/python_session.py index 33fe7cc..62fcbf8 100644 --- a/server/python_session.py +++ b/server/python_session.py @@ -172,9 +172,17 @@ def _run_exec(conn, code, expr=None, stdin=''): if expr: result = eval(expr, env) except Exception as ex: - # Exception is not JSON serializable, so return traceback as string. + # Exception is not JSON serializable, so return traceback as string + # (without the first entry, which is this function). import traceback - exc = traceback.format_exc() + e_type, e_value, e_tb = sys.exc_info() + stack = traceback.extract_tb(e_tb) + exc = ''.join( + ['Traceback (most recent call last):\n'] + + [' line {}, in {}\n'.format(lineno, name) + (line+'\n' if line else '') + for filename, lineno, name, line in stack[1:]] + + traceback.format_exception_only(e_type, e_value) + ).rstrip() finally: out = sys.stdout.getvalue() err = sys.stderr.getvalue() -- cgit v1.2.1