From 251c5e2ba0e85103c55cf31026739b2e7e9d4b90 Mon Sep 17 00:00:00 2001 From: Timotej Lazar Date: Wed, 16 Sep 2015 16:10:59 +0200 Subject: Implement async. comm. with Python interpreter Creating, destroying and communicationg with the interpreter subprocess is now handled by a thread attached to PythonSession. Interpreter is sandboxed using libseccomp. --- python/interpreter.py | 40 ++++++++++++ python/runner/interpreter.py | 8 --- python/runner/main.py | 117 --------------------------------- readme.md | 40 +++++++++--- server/handlers.py | 9 --- server/python_session.py | 151 ++++++++++++++++++++++++++++++++----------- server/user_session.py | 3 +- web/main.js | 5 -- 8 files changed, 186 insertions(+), 187 deletions(-) create mode 100755 python/interpreter.py delete mode 100755 python/runner/interpreter.py delete mode 100755 python/runner/main.py diff --git a/python/interpreter.py b/python/interpreter.py new file mode 100755 index 0000000..87de3aa --- /dev/null +++ b/python/interpreter.py @@ -0,0 +1,40 @@ +#!/usr/bin/python3 + +import code +import sys + +import seccomp + +f = seccomp.SyscallFilter(defaction=seccomp.KILL) +# Necessary for Python. +f.add_rule(seccomp.ALLOW, "exit_group") +f.add_rule(seccomp.ALLOW, "rt_sigaction") +f.add_rule(seccomp.ALLOW, "brk") + +# Mostly harmless. +f.add_rule(seccomp.ALLOW, "mprotect") + +# Allow reading from stdin and writing to stdout/stderr. +f.add_rule(seccomp.ALLOW, "read", seccomp.Arg(0, seccomp.EQ, sys.stdin.fileno())) +f.add_rule(seccomp.ALLOW, "write", seccomp.Arg(0, seccomp.EQ, sys.stdout.fileno())) +f.add_rule(seccomp.ALLOW, "write", seccomp.Arg(0, seccomp.EQ, sys.stderr.fileno())) + +f.add_rule(seccomp.ALLOW, "ioctl") +f.add_rule(seccomp.ALLOW, "mmap") +f.add_rule(seccomp.ALLOW, "munmap") + +# Needed for finding source code for exceptions. +f.add_rule(seccomp.ALLOW, "stat") +f.add_rule(seccomp.ALLOW, "open", seccomp.Arg(1, seccomp.MASKED_EQ, 0x3, 0)) +f.add_rule(seccomp.ALLOW, "fcntl") +f.add_rule(seccomp.ALLOW, "fstat") +f.add_rule(seccomp.ALLOW, "lseek") +f.add_rule(seccomp.ALLOW, "read") +f.add_rule(seccomp.ALLOW, "close") + +# Needed for code.InteractiveConsole. +f.add_rule(seccomp.ALLOW, "access") +f.add_rule(seccomp.ALLOW, "select") +f.load() + +code.interact(banner='') diff --git a/python/runner/interpreter.py b/python/runner/interpreter.py deleted file mode 100755 index 5fa320a..0000000 --- a/python/runner/interpreter.py +++ /dev/null @@ -1,8 +0,0 @@ -#!/usr/bin/python3 - -# Apparently there is no (working) way to get a non-blocking stdout if we call -# the Python interpreter directly with subprocess.Popen. For some reason, this -# works. - -import code -code.interact(banner='') diff --git a/python/runner/main.py b/python/runner/main.py deleted file mode 100755 index 4e1af53..0000000 --- a/python/runner/main.py +++ /dev/null @@ -1,117 +0,0 @@ -#!/usr/bin/python3 - -from fcntl import fcntl, F_GETFL, F_SETFL -import io -import multiprocessing -import multiprocessing.managers -import os -import subprocess -import sys -import threading -import time -import uuid - -interpreters = {} -module_access_lock = threading.Lock() - -# Execute [code] and evaluate [expr]. Input is given by the string [stdin]. -# Return result of evaluation, the contents of stdout and stderr, and the -# exception traceback. -def _run_exec(conn, code, expr=None, stdin=''): - result, out, err, exc = None, None, None, None - sys.stdin = io.StringIO(stdin) - sys.stdout = io.StringIO() - sys.stderr = io.StringIO() - try: - env = {} - if code: - exec(code, env) - if expr: - result = eval(expr, env) - except Exception as ex: - # Exception is not JSON serializable, so return traceback as string. - import traceback - exc = traceback.format_exc() - finally: - out = sys.stdout.getvalue() - err = sys.stderr.getvalue() - sys.stdin.close() - sys.stdout.close() - sys.stderr.close() - conn.send((result, out, err, exc)) - -class Python(object): - # Call run_exec in a separate process for each input and return a list of - # results from those calls. If a call times out, 'timed out' is returned in - # place of exception traceback. - def run(self, code, inputs, timeout): - # Launch processes. - futures = [] - for expr, stdin in inputs: - conn_parent, conn_child = multiprocessing.Pipe() - p = multiprocessing.Process(target=_run_exec, args=(conn_child, code, expr, stdin)) - p.start() - futures.append((p, conn_parent)) - - # Wait for results. - results = [] - start = time.monotonic() - for p, conn in futures: - now = time.monotonic() - real_timeout = max(0, timeout - (now - start)) - if conn.poll(real_timeout): - results.append(conn.recv()) - else: - results.append('timed out') - p.terminate() - return results - - # Start and return a new Python interpreter process. - def create(self): - directory = os.path.dirname(os.path.realpath(__file__)) - script = os.path.join(directory, 'interpreter.py') - p = subprocess.Popen([script], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - # Set the non-blocking flag for stdout. - flags = fcntl(p.stdout, F_GETFL) - fcntl(p.stdout, F_SETFL, flags | os.O_NONBLOCK) - - interpreter_id = uuid.uuid4().hex - with module_access_lock: - interpreters[interpreter_id] = p - return interpreter_id - - # Read any available bytes from the interpreter's stdout. - def pull(self, interpreter_id): - with module_access_lock: - interpreter = interpreters[interpreter_id] - stdout = interpreter.stdout.read() - if stdout: - stdout = stdout.decode('utf-8') - return stdout - - # Push a string to the interpreter's stdin. - def push(self, interpreter_id, stdin): - with module_access_lock: - interpreter = interpreters[interpreter_id] - interpreter.stdin.write(stdin.encode('utf-8')) - interpreter.stdin.flush() - - # Kill an interpreter process. - def destroy(self, interpreter_id): - with module_access_lock: - interpreter = interpreters[interpreter_id] - del interpreters[interpreter_id] - interpreter.kill() - -class PythonManager(multiprocessing.managers.BaseManager): - pass - -PythonManager.register('Python', callable=Python) - -if __name__ == '__main__': - print('Python engine started.') - m = PythonManager(address=('localhost', 3031), authkey=b'c0d3q3y-python') - m.get_server().serve_forever() diff --git a/readme.md b/readme.md index a7e11ca..61b0ac6 100644 --- a/readme.md +++ b/readme.md @@ -1,6 +1,19 @@ Deployment ========== +Install the following packages: + + - apache2 + - python3 (>= 3.4) + - python3-ply + - python3-psycopg2 + - python3-termcolor + - swi-prolog-nox (>= 7.2) + - nodejs (>= 10.0.22) + +SWI prolog +---------- + To use the correct SWI prolog package (>= 7.2) on Debian, add a custom repository by creating the file /etc/apt/sources.list.d/prolog.list containing the following 2 lines: @@ -16,17 +29,23 @@ apt-get update apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys EF8406856DBFCA18 apt-get update -Install the following packages: +libseccomp +---------- - - apache2 - - python3 (>= 3.4) - - python3-ply - - python3-psycopg2 - - python3-termcolor - - swi-prolog-nox (>= 7.2) - - nodejs (>= 10.0.22) +A new version of libseccomp (≥ 2.2) with Python bindings is needed to sandbox +Python interpreters. This is not available yet in most distributions, so fetch +the latest sources with + + git clone https://github.com/seccomp/libseccomp + +then build and install it with + + ./configure --enable-python + make + make install -Settings: +Settings +-------- - point webroot to codeq-web - set up reverse proxy for /ws/ to the node server: @@ -43,7 +62,8 @@ Settings: - run "npm install" inside the "web" directory to install all dependencies (they will be installed inside the "web" directory) -Running: +Running +------- - run the prolog interpreter server: swipl prolog/runner/main.pl - run the python interpreter server: python3 python/runner/main.py diff --git a/server/handlers.py b/server/handlers.py index fb50ff4..0cc474f 100644 --- a/server/handlers.py +++ b/server/handlers.py @@ -101,14 +101,6 @@ class Query(CodeqService): request.reply(result) -# Pull stdout/stderr from the session's Python interpreter. TODO: convert to async handling -class PythonPull(CodeqService): - def process(self, request): - python = request.session.get_python() - output = python.pull() - request.reply({'code': 0, 'message': 'ok', 'terminal': {'text': output if output else ''}}) - - # Push stdin to the session's Python interpreter. TODO: convert to async handling class PythonPush(CodeqService): def process(self, request): @@ -198,7 +190,6 @@ incoming_handlers = { 'activity': Activity(), 'query': Query(), 'python_push': PythonPush(), - 'python_pull': PythonPull(), 'hint': Hint(), 'test': Test() } diff --git a/server/python_session.py b/server/python_session.py index 4af455f..33fe7cc 100644 --- a/server/python_session.py +++ b/server/python_session.py @@ -1,8 +1,15 @@ # coding=utf-8 import ast -import multiprocessing.managers +from fcntl import fcntl, F_GETFL, F_SETFL +import io +import multiprocessing +import os +import queue +import subprocess +import sys import threading +import time import server.user_session from db.models import Problem @@ -17,53 +24,45 @@ class PythonSession(object): No properties are accessible; use getters and setters instead. Values are passed by value instead of by reference (deep copy!). """ - def __init__(self): + def __init__(self, output_cb=None): self._access_lock = threading.Lock() self._sent_hints = [] - self._interpreter = None - # Proxy for calling the Python runner. We use a separate connection for - # each session so the runner can be restarted without affecting the - # server. - _m = multiprocessing.managers.BaseManager(address=('localhost', 3031), - authkey=b'c0d3q3y-python') - _m.register('Python') - _m.connect() - self._python = _m.Python() - - self.create() + self._control = queue.Queue() + self._interpreter = threading.Thread(target=_interpreter, + kwargs={'control': self._control, 'callback': output_cb}) + self._interpreter.start() def run(self, code=None, inputs=None, timeout=1.0): - with self._access_lock: - return self._python.run(code, inputs, timeout) - - def create(self): - with self._access_lock: - if self._interpreter is None: - self._interpreter = self._python.create() - - def pull(self): - with self._access_lock: - if self._interpreter is None: - return None - return self._python.pull(self._interpreter) + # Launch processes. + futures = [] + for expr, stdin in inputs: + conn_parent, conn_child = multiprocessing.Pipe() + p = multiprocessing.Process(target=_run_exec, args=(conn_child, code, expr, stdin)) + p.start() + futures.append((p, conn_parent)) + + # Wait for results. + results = [] + start = time.monotonic() + for p, conn in futures: + now = time.monotonic() + real_timeout = max(0, timeout - (now - start)) + if conn.poll(real_timeout): + results.append(conn.recv()) + else: + results.append('timed out') + p.terminate() + return results def push(self, stdin): - with self._access_lock: - if self._interpreter is not None: - self._python.push(self._interpreter, stdin) + self._control.put_nowait(('push', stdin)) def destroy(self): - with self._access_lock: - if self._interpreter is not None: - self._python.destroy(self._interpreter) - self._interpreter = None + self._control.put_nowait(('done', None)) def __del__(self): - # no locking needed if GC is removing us, as there cannot be any concurrent access by definition - if hasattr(self, '_interpreter') and self._interpreter is not None: - self._python.destroy(self._interpreter) - self._interpreter = None + self.destroy() def hint(self, sid, problem_id, program): language, problem_group, problem = Problem.get_identifier(problem_id) @@ -105,3 +104,81 @@ class PythonSession(object): hint_type = mod.hint_type[hint['id']] hint_type.instantiate(hint, self._sent_hints) self._sent_hints.extend(hints) + +def _interpreter(control, callback): + directory = os.path.dirname(os.path.realpath(__file__)) + # TODO drop privileges using a wrapper + script = os.path.join(directory, '..', 'python', 'interpreter.py') + + proc = None + while True: + # Ensure the interpreter process is running. + if proc is None: + proc = subprocess.Popen([script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + # Set the non-blocking flag for stdout. + flags = fcntl(proc.stdout.fileno(), F_GETFL) + fcntl(proc.stdout.fileno(), F_SETFL, flags | os.O_NONBLOCK) + + # Get a control command. + try: + cmd, data = control.get_nowait() + if cmd == 'push': + proc.stdin.write(data.encode('utf-8')) + proc.stdin.flush() + elif cmd == 'done': + break + except: + pass + + # Communicate with child process. + retcode = proc.poll() + if retcode is None: + data = proc.stdout.read() + if data: + # NOTE this might fail if read() stops in the middle of utf8 sequence + text = data.decode('utf-8') + if text: + callback(text) + else: + if retcode == -31: + callback('Child killed due to sandbox misbehavior.\n') + else: + callback('Child exited with status "{}".\n'.format(retcode)) + proc = None + + # TODO we should select() on control and proc.stdout instead of polling + time.sleep(0.1) + + # We are done, kill the child. + if proc is not None: + proc.kill() + +# Execute [code] and evaluate [expr]. Input is given by the string [stdin]. +# Return result of evaluation, the contents of stdout and stderr, and the +# exception traceback. +# TODO sandbox this +def _run_exec(conn, code, expr=None, stdin=''): + result, out, err, exc = None, None, None, None + sys.stdin = io.StringIO(stdin) + sys.stdout = io.StringIO() + sys.stderr = io.StringIO() + try: + env = {} + if code: + exec(code, env) + if expr: + result = eval(expr, env) + except Exception as ex: + # Exception is not JSON serializable, so return traceback as string. + import traceback + exc = traceback.format_exc() + finally: + out = sys.stdout.getvalue() + err = sys.stderr.getvalue() + sys.stdin.close() + sys.stdout.close() + sys.stderr.close() + conn.send((result, out, err, exc)) diff --git a/server/user_session.py b/server/user_session.py index 2cc629c..0d8535c 100644 --- a/server/user_session.py +++ b/server/user_session.py @@ -62,7 +62,8 @@ class UserSession(object): def get_python(self): with self._access_lock: if self.python_session is None: - self.python_session = python_session.PythonSession() # lazy init + self.python_session = python_session.PythonSession( + output_cb=lambda text: self.send({'event': 'terminal_output', 'text': text})) return self.python_session def get_problem_data(self, language, problem_group, problem): diff --git a/web/main.js b/web/main.js index c4c8691..03e9dad 100644 --- a/web/main.js +++ b/web/main.js @@ -137,11 +137,6 @@ var guiHandlers = { sendDataToPython(message).then(session.send, session.end).done(); }, - 'python_pull': function actionPythonPull(session, message) { - logger.debug('Received python_pull from GUI'); - sendDataToPython(message).then(session.send, session.end).done(); - }, - 'hint': function actionHint(session, message) { logger.debug('Received hint from GUI'); sendDataToPython(message).then(session.send, session.end).done(); -- cgit v1.2.1 From 404e1123d5ab743435b2736f94c4e2a055526c73 Mon Sep 17 00:00:00 2001 From: Timotej Lazar Date: Wed, 16 Sep 2015 16:26:04 +0200 Subject: Update readme --- readme.md | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/readme.md b/readme.md index 61b0ac6..0b16214 100644 --- a/readme.md +++ b/readme.md @@ -1,15 +1,15 @@ -Deployment -========== +Installation +============ Install the following packages: - apache2 + - nodejs (>= 10.0.22) - python3 (>= 3.4) - python3-ply - python3-psycopg2 - python3-termcolor - swi-prolog-nox (>= 7.2) - - nodejs (>= 10.0.22) SWI prolog ---------- @@ -44,8 +44,14 @@ then build and install it with make make install +nodejs +------ + +Run "npm install" inside the "web" directory to install all dependencies (they +will be installed inside the "web" directory) + Settings --------- +======== - point webroot to codeq-web - set up reverse proxy for /ws/ to the node server: @@ -59,14 +65,11 @@ Settings ProxyPassReverse /ws/ http://localhost:8083/ws/ - set _path_prefix in server.problems - set DB settings in db - - run "npm install" inside the "web" directory to install all dependencies - (they will be installed inside the "web" directory) Running -------- +======= - run the prolog interpreter server: swipl prolog/runner/main.pl - - run the python interpreter server: python3 python/runner/main.py - run the session daemon: python3 daemon.py - run the web server for client communication: node web/main.js -- cgit v1.2.1 From dcdfe0563ab269807f66304107f9123cfc2dd37e Mon Sep 17 00:00:00 2001 From: Timotej Lazar Date: Thu, 17 Sep 2015 18:01:08 +0200 Subject: Replace prolog.engine.ask_all with check_answers When testing a program we can stop searching for new solutions after we have received a wrong one. --- prolog/engine.py | 43 ++++++++++++++++++++----------------------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/prolog/engine.py b/prolog/engine.py index 50582c8..eca0826 100644 --- a/prolog/engine.py +++ b/prolog/engine.py @@ -6,7 +6,6 @@ import http.client import json from operator import itemgetter import re -import socket import time import urllib @@ -112,46 +111,45 @@ def pretty_vars(data): result += [strip_html(b) for b in data['residuals']] return ',\n'.join(result) if result else 'true' -# Run [query] in the pengine with id [engine] and return the list of answers -# found within [timeout] seconds. If a timeout occurs before the query is done, -# 'timed out' is appended as the last answer. -def ask_all(engine, query, timeout=10): - # Returns a tuple ((bindings, constraints), error, more?) for one answer. +# Run [query] in pengine with ID [engine] and check whether all solutions +# specified by [answers] (a list of binding dictionaries) are returned. This +# function succeeds if [query] finds each solution in [answers] at least once +# within [timeout] seconds, and fails when it finds any other solution. +def check_answers(engine, query, answers, timeout=10): + seen = [] start = time.monotonic() - answers, messages = [], [] try: # Run the query. reply, output = ask(engine, query, timeout) - messages += output - if 'error' in map(itemgetter(0), output): - return answers, messages answer, error, more = process_answer(reply) if answer: - answers.append(answer) - if error: - messages.append(error) + bindings, constraints = answer + if bindings not in answers: + return False + if bindings not in seen: + seen.append(bindings) # Continue while there are more potential answers and time remaining. while more: real_timeout = timeout - (time.monotonic()-start) if real_timeout <= 0: - raise socket.timeout() + break reply, output = next(engine, timeout=real_timeout) - messages += output answer, error, more = process_answer(reply) if answer: - answers.append(answer) - if error: - messages.append(error) - except socket.timeout as ex: - answers.append('timed out') - return answers, messages + bindings, constraints = answer + if bindings not in answers: + return False + if bindings not in seen: + seen.append(bindings) + except: + pass + return len(seen) == len(answers) # Run [query] in the pengine with id [engine] and return the first answer only # found within [timeout] seconds. # used for quick hint debugging by Sasha def ask_one(engine, query, timeout=1): - # quicker than ask_all as there could be many hint-triggering tests # Returns either an error message, true, false, timeout (see below), or bindings # Timeout is "returned" as an unhandled exception -- deliberately so @@ -175,7 +173,6 @@ def ask_one(engine, query, timeout=1): return 'false' def ask_truth(engine, query, timeout=1): - # quicker than ask_all as there could be many hint-triggering tests # Returns either True or False # (Runtime) error is False! # Timeout is an unhandled exception -- deliberately so -- cgit v1.2.1 From 1720db308bf4481d6be45d4f7f611bab576b1184 Mon Sep 17 00:00:00 2001 From: Timotej Lazar Date: Fri, 18 Sep 2015 13:53:58 +0200 Subject: Simplify exceptions returned by PythonSession.run Don't include the first stack entry or the filename (which is ). --- server/python_session.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/server/python_session.py b/server/python_session.py index 33fe7cc..62fcbf8 100644 --- a/server/python_session.py +++ b/server/python_session.py @@ -172,9 +172,17 @@ def _run_exec(conn, code, expr=None, stdin=''): if expr: result = eval(expr, env) except Exception as ex: - # Exception is not JSON serializable, so return traceback as string. + # Exception is not JSON serializable, so return traceback as string + # (without the first entry, which is this function). import traceback - exc = traceback.format_exc() + e_type, e_value, e_tb = sys.exc_info() + stack = traceback.extract_tb(e_tb) + exc = ''.join( + ['Traceback (most recent call last):\n'] + + [' line {}, in {}\n'.format(lineno, name) + (line+'\n' if line else '') + for filename, lineno, name, line in stack[1:]] + + traceback.format_exception_only(e_type, e_value) + ).rstrip() finally: out = sys.stdout.getvalue() err = sys.stderr.getvalue() -- cgit v1.2.1