From 251c5e2ba0e85103c55cf31026739b2e7e9d4b90 Mon Sep 17 00:00:00 2001 From: Timotej Lazar Date: Wed, 16 Sep 2015 16:10:59 +0200 Subject: Implement async. comm. with Python interpreter Creating, destroying and communicationg with the interpreter subprocess is now handled by a thread attached to PythonSession. Interpreter is sandboxed using libseccomp. --- python/interpreter.py | 40 ++++++++++++ python/runner/interpreter.py | 8 --- python/runner/main.py | 117 --------------------------------- readme.md | 40 +++++++++--- server/handlers.py | 9 --- server/python_session.py | 151 ++++++++++++++++++++++++++++++++----------- server/user_session.py | 3 +- web/main.js | 5 -- 8 files changed, 186 insertions(+), 187 deletions(-) create mode 100755 python/interpreter.py delete mode 100755 python/runner/interpreter.py delete mode 100755 python/runner/main.py diff --git a/python/interpreter.py b/python/interpreter.py new file mode 100755 index 0000000..87de3aa --- /dev/null +++ b/python/interpreter.py @@ -0,0 +1,40 @@ +#!/usr/bin/python3 + +import code +import sys + +import seccomp + +f = seccomp.SyscallFilter(defaction=seccomp.KILL) +# Necessary for Python. +f.add_rule(seccomp.ALLOW, "exit_group") +f.add_rule(seccomp.ALLOW, "rt_sigaction") +f.add_rule(seccomp.ALLOW, "brk") + +# Mostly harmless. +f.add_rule(seccomp.ALLOW, "mprotect") + +# Allow reading from stdin and writing to stdout/stderr. +f.add_rule(seccomp.ALLOW, "read", seccomp.Arg(0, seccomp.EQ, sys.stdin.fileno())) +f.add_rule(seccomp.ALLOW, "write", seccomp.Arg(0, seccomp.EQ, sys.stdout.fileno())) +f.add_rule(seccomp.ALLOW, "write", seccomp.Arg(0, seccomp.EQ, sys.stderr.fileno())) + +f.add_rule(seccomp.ALLOW, "ioctl") +f.add_rule(seccomp.ALLOW, "mmap") +f.add_rule(seccomp.ALLOW, "munmap") + +# Needed for finding source code for exceptions. +f.add_rule(seccomp.ALLOW, "stat") +f.add_rule(seccomp.ALLOW, "open", seccomp.Arg(1, seccomp.MASKED_EQ, 0x3, 0)) +f.add_rule(seccomp.ALLOW, "fcntl") +f.add_rule(seccomp.ALLOW, "fstat") +f.add_rule(seccomp.ALLOW, "lseek") +f.add_rule(seccomp.ALLOW, "read") +f.add_rule(seccomp.ALLOW, "close") + +# Needed for code.InteractiveConsole. +f.add_rule(seccomp.ALLOW, "access") +f.add_rule(seccomp.ALLOW, "select") +f.load() + +code.interact(banner='') diff --git a/python/runner/interpreter.py b/python/runner/interpreter.py deleted file mode 100755 index 5fa320a..0000000 --- a/python/runner/interpreter.py +++ /dev/null @@ -1,8 +0,0 @@ -#!/usr/bin/python3 - -# Apparently there is no (working) way to get a non-blocking stdout if we call -# the Python interpreter directly with subprocess.Popen. For some reason, this -# works. - -import code -code.interact(banner='') diff --git a/python/runner/main.py b/python/runner/main.py deleted file mode 100755 index 4e1af53..0000000 --- a/python/runner/main.py +++ /dev/null @@ -1,117 +0,0 @@ -#!/usr/bin/python3 - -from fcntl import fcntl, F_GETFL, F_SETFL -import io -import multiprocessing -import multiprocessing.managers -import os -import subprocess -import sys -import threading -import time -import uuid - -interpreters = {} -module_access_lock = threading.Lock() - -# Execute [code] and evaluate [expr]. Input is given by the string [stdin]. -# Return result of evaluation, the contents of stdout and stderr, and the -# exception traceback. -def _run_exec(conn, code, expr=None, stdin=''): - result, out, err, exc = None, None, None, None - sys.stdin = io.StringIO(stdin) - sys.stdout = io.StringIO() - sys.stderr = io.StringIO() - try: - env = {} - if code: - exec(code, env) - if expr: - result = eval(expr, env) - except Exception as ex: - # Exception is not JSON serializable, so return traceback as string. - import traceback - exc = traceback.format_exc() - finally: - out = sys.stdout.getvalue() - err = sys.stderr.getvalue() - sys.stdin.close() - sys.stdout.close() - sys.stderr.close() - conn.send((result, out, err, exc)) - -class Python(object): - # Call run_exec in a separate process for each input and return a list of - # results from those calls. If a call times out, 'timed out' is returned in - # place of exception traceback. - def run(self, code, inputs, timeout): - # Launch processes. - futures = [] - for expr, stdin in inputs: - conn_parent, conn_child = multiprocessing.Pipe() - p = multiprocessing.Process(target=_run_exec, args=(conn_child, code, expr, stdin)) - p.start() - futures.append((p, conn_parent)) - - # Wait for results. - results = [] - start = time.monotonic() - for p, conn in futures: - now = time.monotonic() - real_timeout = max(0, timeout - (now - start)) - if conn.poll(real_timeout): - results.append(conn.recv()) - else: - results.append('timed out') - p.terminate() - return results - - # Start and return a new Python interpreter process. - def create(self): - directory = os.path.dirname(os.path.realpath(__file__)) - script = os.path.join(directory, 'interpreter.py') - p = subprocess.Popen([script], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - # Set the non-blocking flag for stdout. - flags = fcntl(p.stdout, F_GETFL) - fcntl(p.stdout, F_SETFL, flags | os.O_NONBLOCK) - - interpreter_id = uuid.uuid4().hex - with module_access_lock: - interpreters[interpreter_id] = p - return interpreter_id - - # Read any available bytes from the interpreter's stdout. - def pull(self, interpreter_id): - with module_access_lock: - interpreter = interpreters[interpreter_id] - stdout = interpreter.stdout.read() - if stdout: - stdout = stdout.decode('utf-8') - return stdout - - # Push a string to the interpreter's stdin. - def push(self, interpreter_id, stdin): - with module_access_lock: - interpreter = interpreters[interpreter_id] - interpreter.stdin.write(stdin.encode('utf-8')) - interpreter.stdin.flush() - - # Kill an interpreter process. - def destroy(self, interpreter_id): - with module_access_lock: - interpreter = interpreters[interpreter_id] - del interpreters[interpreter_id] - interpreter.kill() - -class PythonManager(multiprocessing.managers.BaseManager): - pass - -PythonManager.register('Python', callable=Python) - -if __name__ == '__main__': - print('Python engine started.') - m = PythonManager(address=('localhost', 3031), authkey=b'c0d3q3y-python') - m.get_server().serve_forever() diff --git a/readme.md b/readme.md index a7e11ca..61b0ac6 100644 --- a/readme.md +++ b/readme.md @@ -1,6 +1,19 @@ Deployment ========== +Install the following packages: + + - apache2 + - python3 (>= 3.4) + - python3-ply + - python3-psycopg2 + - python3-termcolor + - swi-prolog-nox (>= 7.2) + - nodejs (>= 10.0.22) + +SWI prolog +---------- + To use the correct SWI prolog package (>= 7.2) on Debian, add a custom repository by creating the file /etc/apt/sources.list.d/prolog.list containing the following 2 lines: @@ -16,17 +29,23 @@ apt-get update apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys EF8406856DBFCA18 apt-get update -Install the following packages: +libseccomp +---------- - - apache2 - - python3 (>= 3.4) - - python3-ply - - python3-psycopg2 - - python3-termcolor - - swi-prolog-nox (>= 7.2) - - nodejs (>= 10.0.22) +A new version of libseccomp (≥ 2.2) with Python bindings is needed to sandbox +Python interpreters. This is not available yet in most distributions, so fetch +the latest sources with + + git clone https://github.com/seccomp/libseccomp + +then build and install it with + + ./configure --enable-python + make + make install -Settings: +Settings +-------- - point webroot to codeq-web - set up reverse proxy for /ws/ to the node server: @@ -43,7 +62,8 @@ Settings: - run "npm install" inside the "web" directory to install all dependencies (they will be installed inside the "web" directory) -Running: +Running +------- - run the prolog interpreter server: swipl prolog/runner/main.pl - run the python interpreter server: python3 python/runner/main.py diff --git a/server/handlers.py b/server/handlers.py index fb50ff4..0cc474f 100644 --- a/server/handlers.py +++ b/server/handlers.py @@ -101,14 +101,6 @@ class Query(CodeqService): request.reply(result) -# Pull stdout/stderr from the session's Python interpreter. TODO: convert to async handling -class PythonPull(CodeqService): - def process(self, request): - python = request.session.get_python() - output = python.pull() - request.reply({'code': 0, 'message': 'ok', 'terminal': {'text': output if output else ''}}) - - # Push stdin to the session's Python interpreter. TODO: convert to async handling class PythonPush(CodeqService): def process(self, request): @@ -198,7 +190,6 @@ incoming_handlers = { 'activity': Activity(), 'query': Query(), 'python_push': PythonPush(), - 'python_pull': PythonPull(), 'hint': Hint(), 'test': Test() } diff --git a/server/python_session.py b/server/python_session.py index 4af455f..33fe7cc 100644 --- a/server/python_session.py +++ b/server/python_session.py @@ -1,8 +1,15 @@ # coding=utf-8 import ast -import multiprocessing.managers +from fcntl import fcntl, F_GETFL, F_SETFL +import io +import multiprocessing +import os +import queue +import subprocess +import sys import threading +import time import server.user_session from db.models import Problem @@ -17,53 +24,45 @@ class PythonSession(object): No properties are accessible; use getters and setters instead. Values are passed by value instead of by reference (deep copy!). """ - def __init__(self): + def __init__(self, output_cb=None): self._access_lock = threading.Lock() self._sent_hints = [] - self._interpreter = None - # Proxy for calling the Python runner. We use a separate connection for - # each session so the runner can be restarted without affecting the - # server. - _m = multiprocessing.managers.BaseManager(address=('localhost', 3031), - authkey=b'c0d3q3y-python') - _m.register('Python') - _m.connect() - self._python = _m.Python() - - self.create() + self._control = queue.Queue() + self._interpreter = threading.Thread(target=_interpreter, + kwargs={'control': self._control, 'callback': output_cb}) + self._interpreter.start() def run(self, code=None, inputs=None, timeout=1.0): - with self._access_lock: - return self._python.run(code, inputs, timeout) - - def create(self): - with self._access_lock: - if self._interpreter is None: - self._interpreter = self._python.create() - - def pull(self): - with self._access_lock: - if self._interpreter is None: - return None - return self._python.pull(self._interpreter) + # Launch processes. + futures = [] + for expr, stdin in inputs: + conn_parent, conn_child = multiprocessing.Pipe() + p = multiprocessing.Process(target=_run_exec, args=(conn_child, code, expr, stdin)) + p.start() + futures.append((p, conn_parent)) + + # Wait for results. + results = [] + start = time.monotonic() + for p, conn in futures: + now = time.monotonic() + real_timeout = max(0, timeout - (now - start)) + if conn.poll(real_timeout): + results.append(conn.recv()) + else: + results.append('timed out') + p.terminate() + return results def push(self, stdin): - with self._access_lock: - if self._interpreter is not None: - self._python.push(self._interpreter, stdin) + self._control.put_nowait(('push', stdin)) def destroy(self): - with self._access_lock: - if self._interpreter is not None: - self._python.destroy(self._interpreter) - self._interpreter = None + self._control.put_nowait(('done', None)) def __del__(self): - # no locking needed if GC is removing us, as there cannot be any concurrent access by definition - if hasattr(self, '_interpreter') and self._interpreter is not None: - self._python.destroy(self._interpreter) - self._interpreter = None + self.destroy() def hint(self, sid, problem_id, program): language, problem_group, problem = Problem.get_identifier(problem_id) @@ -105,3 +104,81 @@ class PythonSession(object): hint_type = mod.hint_type[hint['id']] hint_type.instantiate(hint, self._sent_hints) self._sent_hints.extend(hints) + +def _interpreter(control, callback): + directory = os.path.dirname(os.path.realpath(__file__)) + # TODO drop privileges using a wrapper + script = os.path.join(directory, '..', 'python', 'interpreter.py') + + proc = None + while True: + # Ensure the interpreter process is running. + if proc is None: + proc = subprocess.Popen([script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + # Set the non-blocking flag for stdout. + flags = fcntl(proc.stdout.fileno(), F_GETFL) + fcntl(proc.stdout.fileno(), F_SETFL, flags | os.O_NONBLOCK) + + # Get a control command. + try: + cmd, data = control.get_nowait() + if cmd == 'push': + proc.stdin.write(data.encode('utf-8')) + proc.stdin.flush() + elif cmd == 'done': + break + except: + pass + + # Communicate with child process. + retcode = proc.poll() + if retcode is None: + data = proc.stdout.read() + if data: + # NOTE this might fail if read() stops in the middle of utf8 sequence + text = data.decode('utf-8') + if text: + callback(text) + else: + if retcode == -31: + callback('Child killed due to sandbox misbehavior.\n') + else: + callback('Child exited with status "{}".\n'.format(retcode)) + proc = None + + # TODO we should select() on control and proc.stdout instead of polling + time.sleep(0.1) + + # We are done, kill the child. + if proc is not None: + proc.kill() + +# Execute [code] and evaluate [expr]. Input is given by the string [stdin]. +# Return result of evaluation, the contents of stdout and stderr, and the +# exception traceback. +# TODO sandbox this +def _run_exec(conn, code, expr=None, stdin=''): + result, out, err, exc = None, None, None, None + sys.stdin = io.StringIO(stdin) + sys.stdout = io.StringIO() + sys.stderr = io.StringIO() + try: + env = {} + if code: + exec(code, env) + if expr: + result = eval(expr, env) + except Exception as ex: + # Exception is not JSON serializable, so return traceback as string. + import traceback + exc = traceback.format_exc() + finally: + out = sys.stdout.getvalue() + err = sys.stderr.getvalue() + sys.stdin.close() + sys.stdout.close() + sys.stderr.close() + conn.send((result, out, err, exc)) diff --git a/server/user_session.py b/server/user_session.py index 2cc629c..0d8535c 100644 --- a/server/user_session.py +++ b/server/user_session.py @@ -62,7 +62,8 @@ class UserSession(object): def get_python(self): with self._access_lock: if self.python_session is None: - self.python_session = python_session.PythonSession() # lazy init + self.python_session = python_session.PythonSession( + output_cb=lambda text: self.send({'event': 'terminal_output', 'text': text})) return self.python_session def get_problem_data(self, language, problem_group, problem): diff --git a/web/main.js b/web/main.js index c4c8691..03e9dad 100644 --- a/web/main.js +++ b/web/main.js @@ -137,11 +137,6 @@ var guiHandlers = { sendDataToPython(message).then(session.send, session.end).done(); }, - 'python_pull': function actionPythonPull(session, message) { - logger.debug('Received python_pull from GUI'); - sendDataToPython(message).then(session.send, session.end).done(); - }, - 'hint': function actionHint(session, message) { logger.debug('Received hint from GUI'); sendDataToPython(message).then(session.send, session.end).done(); -- cgit v1.2.1