From 251c5e2ba0e85103c55cf31026739b2e7e9d4b90 Mon Sep 17 00:00:00 2001
From: Timotej Lazar <timotej.lazar@araneo.org>
Date: Wed, 16 Sep 2015 16:10:59 +0200
Subject: Implement async. comm. with Python interpreter

Creating, destroying and communicationg with the interpreter subprocess
is now handled by a thread attached to PythonSession. Interpreter is
sandboxed using libseccomp.
---
 python/interpreter.py        |  40 ++++++++++++
 python/runner/interpreter.py |   8 ---
 python/runner/main.py        | 117 ---------------------------------
 readme.md                    |  40 +++++++++---
 server/handlers.py           |   9 ---
 server/python_session.py     | 151 ++++++++++++++++++++++++++++++++-----------
 server/user_session.py       |   3 +-
 web/main.js                  |   5 --
 8 files changed, 186 insertions(+), 187 deletions(-)
 create mode 100755 python/interpreter.py
 delete mode 100755 python/runner/interpreter.py
 delete mode 100755 python/runner/main.py

diff --git a/python/interpreter.py b/python/interpreter.py
new file mode 100755
index 0000000..87de3aa
--- /dev/null
+++ b/python/interpreter.py
@@ -0,0 +1,40 @@
+#!/usr/bin/python3
+
+import code
+import sys
+
+import seccomp
+
+f = seccomp.SyscallFilter(defaction=seccomp.KILL)
+# Necessary for Python.
+f.add_rule(seccomp.ALLOW, "exit_group")
+f.add_rule(seccomp.ALLOW, "rt_sigaction")
+f.add_rule(seccomp.ALLOW, "brk")
+
+# Mostly harmless.
+f.add_rule(seccomp.ALLOW, "mprotect")
+
+# Allow reading from stdin and writing to stdout/stderr.
+f.add_rule(seccomp.ALLOW, "read", seccomp.Arg(0, seccomp.EQ, sys.stdin.fileno()))
+f.add_rule(seccomp.ALLOW, "write", seccomp.Arg(0, seccomp.EQ, sys.stdout.fileno()))
+f.add_rule(seccomp.ALLOW, "write", seccomp.Arg(0, seccomp.EQ, sys.stderr.fileno()))
+
+f.add_rule(seccomp.ALLOW, "ioctl")
+f.add_rule(seccomp.ALLOW, "mmap")
+f.add_rule(seccomp.ALLOW, "munmap")
+
+# Needed for finding source code for exceptions.
+f.add_rule(seccomp.ALLOW, "stat")
+f.add_rule(seccomp.ALLOW, "open", seccomp.Arg(1, seccomp.MASKED_EQ, 0x3, 0))
+f.add_rule(seccomp.ALLOW, "fcntl")
+f.add_rule(seccomp.ALLOW, "fstat")
+f.add_rule(seccomp.ALLOW, "lseek")
+f.add_rule(seccomp.ALLOW, "read")
+f.add_rule(seccomp.ALLOW, "close")
+
+# Needed for code.InteractiveConsole.
+f.add_rule(seccomp.ALLOW, "access")
+f.add_rule(seccomp.ALLOW, "select")
+f.load()
+
+code.interact(banner='')
diff --git a/python/runner/interpreter.py b/python/runner/interpreter.py
deleted file mode 100755
index 5fa320a..0000000
--- a/python/runner/interpreter.py
+++ /dev/null
@@ -1,8 +0,0 @@
-#!/usr/bin/python3
-
-# Apparently there is no (working) way to get a non-blocking stdout if we call
-# the Python interpreter directly with subprocess.Popen. For some reason, this
-# works.
-
-import code
-code.interact(banner='')
diff --git a/python/runner/main.py b/python/runner/main.py
deleted file mode 100755
index 4e1af53..0000000
--- a/python/runner/main.py
+++ /dev/null
@@ -1,117 +0,0 @@
-#!/usr/bin/python3
-
-from fcntl import fcntl, F_GETFL, F_SETFL
-import io
-import multiprocessing
-import multiprocessing.managers
-import os
-import subprocess
-import sys
-import threading
-import time
-import uuid
-
-interpreters = {}
-module_access_lock = threading.Lock()
-
-# Execute [code] and evaluate [expr]. Input is given by the string [stdin].
-# Return result of evaluation, the contents of stdout and stderr, and the
-# exception traceback.
-def _run_exec(conn, code, expr=None, stdin=''):
-    result, out, err, exc = None, None, None, None
-    sys.stdin = io.StringIO(stdin)
-    sys.stdout = io.StringIO()
-    sys.stderr = io.StringIO()
-    try:
-        env = {}
-        if code:
-            exec(code, env)
-        if expr:
-            result = eval(expr, env)
-    except Exception as ex:
-        # Exception is not JSON serializable, so return traceback as string.
-        import traceback
-        exc = traceback.format_exc()
-    finally:
-        out = sys.stdout.getvalue()
-        err = sys.stderr.getvalue()
-        sys.stdin.close()
-        sys.stdout.close()
-        sys.stderr.close()
-        conn.send((result, out, err, exc))
-
-class Python(object):
-    # Call run_exec in a separate process for each input and return a list of
-    # results from those calls. If a call times out, 'timed out' is returned in
-    # place of exception traceback.
-    def run(self, code, inputs, timeout):
-        # Launch processes.
-        futures = []
-        for expr, stdin in inputs:
-            conn_parent, conn_child = multiprocessing.Pipe()
-            p = multiprocessing.Process(target=_run_exec, args=(conn_child, code, expr, stdin))
-            p.start()
-            futures.append((p, conn_parent))
-
-        # Wait for results.
-        results = []
-        start = time.monotonic()
-        for p, conn in futures:
-            now = time.monotonic()
-            real_timeout = max(0, timeout - (now - start))
-            if conn.poll(real_timeout):
-                results.append(conn.recv())
-            else:
-                results.append('timed out')
-            p.terminate()
-        return results
-
-    # Start and return a new Python interpreter process.
-    def create(self):
-        directory = os.path.dirname(os.path.realpath(__file__))
-        script = os.path.join(directory, 'interpreter.py')
-        p = subprocess.Popen([script],
-                stdin=subprocess.PIPE,
-                stdout=subprocess.PIPE,
-                stderr=subprocess.STDOUT)
-        # Set the non-blocking flag for stdout.
-        flags = fcntl(p.stdout, F_GETFL)
-        fcntl(p.stdout, F_SETFL, flags | os.O_NONBLOCK)
-
-        interpreter_id = uuid.uuid4().hex
-        with module_access_lock:
-            interpreters[interpreter_id] = p
-        return interpreter_id
-
-    # Read any available bytes from the interpreter's stdout.
-    def pull(self, interpreter_id):
-        with module_access_lock:
-            interpreter = interpreters[interpreter_id]
-        stdout = interpreter.stdout.read()
-        if stdout:
-            stdout = stdout.decode('utf-8')
-        return stdout
-
-    # Push a string to the interpreter's stdin.
-    def push(self, interpreter_id, stdin):
-        with module_access_lock:
-            interpreter = interpreters[interpreter_id]
-        interpreter.stdin.write(stdin.encode('utf-8'))
-        interpreter.stdin.flush()
-
-    # Kill an interpreter process.
-    def destroy(self, interpreter_id):
-        with module_access_lock:
-            interpreter = interpreters[interpreter_id]
-            del interpreters[interpreter_id]
-        interpreter.kill()
-
-class PythonManager(multiprocessing.managers.BaseManager):
-    pass
-
-PythonManager.register('Python', callable=Python)
-
-if __name__ == '__main__':
-    print('Python engine started.')
-    m = PythonManager(address=('localhost', 3031), authkey=b'c0d3q3y-python')
-    m.get_server().serve_forever()
diff --git a/readme.md b/readme.md
index a7e11ca..61b0ac6 100644
--- a/readme.md
+++ b/readme.md
@@ -1,6 +1,19 @@
 Deployment
 ==========
 
+Install the following packages:
+
+  - apache2
+  - python3 (>= 3.4)
+  - python3-ply
+  - python3-psycopg2
+  - python3-termcolor
+  - swi-prolog-nox (>= 7.2)
+  - nodejs (>= 10.0.22)
+
+SWI prolog
+----------
+
 To use the correct SWI prolog package (>= 7.2) on Debian, add a custom
 repository by creating the file /etc/apt/sources.list.d/prolog.list
 containing the following 2 lines:
@@ -16,17 +29,23 @@ apt-get update
 apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys EF8406856DBFCA18
 apt-get update
 
-Install the following packages:
+libseccomp
+----------
 
-  - apache2
-  - python3 (>= 3.4)
-  - python3-ply
-  - python3-psycopg2
-  - python3-termcolor
-  - swi-prolog-nox (>= 7.2)
-  - nodejs (>= 10.0.22)
+A new version of libseccomp (≥ 2.2) with Python bindings is needed to sandbox
+Python interpreters. This is not available yet in most distributions, so fetch
+the latest sources with
+
+    git clone https://github.com/seccomp/libseccomp
+
+then build and install it with
+
+    ./configure --enable-python
+    make
+    make install
 
-Settings:
+Settings
+--------
 
   - point webroot to codeq-web
   - set up reverse proxy for /ws/ to the node server:
@@ -43,7 +62,8 @@ Settings:
   - run "npm install" inside the "web" directory to install all dependencies
     (they will be installed inside the "web" directory)
 
-Running:
+Running
+-------
 
   - run the prolog interpreter server: swipl prolog/runner/main.pl
   - run the python interpreter server: python3 python/runner/main.py
diff --git a/server/handlers.py b/server/handlers.py
index fb50ff4..0cc474f 100644
--- a/server/handlers.py
+++ b/server/handlers.py
@@ -101,14 +101,6 @@ class Query(CodeqService):
                 request.reply(result)
 
 
-# Pull stdout/stderr from the session's Python interpreter. TODO: convert to async handling
-class PythonPull(CodeqService):
-    def process(self, request):
-        python = request.session.get_python()
-        output = python.pull()
-        request.reply({'code': 0, 'message': 'ok', 'terminal': {'text': output if output else ''}})
-
-
 # Push stdin to the session's Python interpreter. TODO: convert to async handling
 class PythonPush(CodeqService):
     def process(self, request):
@@ -198,7 +190,6 @@ incoming_handlers = {
     'activity': Activity(),
     'query': Query(),
     'python_push': PythonPush(),
-    'python_pull': PythonPull(),
     'hint': Hint(),
     'test': Test()
 }
diff --git a/server/python_session.py b/server/python_session.py
index 4af455f..33fe7cc 100644
--- a/server/python_session.py
+++ b/server/python_session.py
@@ -1,8 +1,15 @@
 # coding=utf-8
 
 import ast
-import multiprocessing.managers
+from fcntl import fcntl, F_GETFL, F_SETFL
+import io
+import multiprocessing
+import os
+import queue
+import subprocess
+import sys
 import threading
+import time
 
 import server.user_session
 from db.models import Problem
@@ -17,53 +24,45 @@ class PythonSession(object):
     No properties are accessible; use getters and setters instead.
     Values are passed by value instead of by reference (deep copy!).
     """
-    def __init__(self):
+    def __init__(self, output_cb=None):
         self._access_lock = threading.Lock()
         self._sent_hints = []
-        self._interpreter = None
 
-        # Proxy for calling the Python runner. We use a separate connection for
-        # each session so the runner can be restarted without affecting the
-        # server.
-        _m = multiprocessing.managers.BaseManager(address=('localhost', 3031),
-                                                  authkey=b'c0d3q3y-python')
-        _m.register('Python')
-        _m.connect()
-        self._python = _m.Python()
-
-        self.create()
+        self._control = queue.Queue()
+        self._interpreter = threading.Thread(target=_interpreter,
+                                         kwargs={'control': self._control, 'callback': output_cb})
+        self._interpreter.start()
 
     def run(self, code=None, inputs=None, timeout=1.0):
-        with self._access_lock:
-            return self._python.run(code, inputs, timeout)
-
-    def create(self):
-        with self._access_lock:
-            if self._interpreter is None:
-                self._interpreter = self._python.create()
-
-    def pull(self):
-        with self._access_lock:
-            if self._interpreter is None:
-                return None
-            return self._python.pull(self._interpreter)
+        # Launch processes.
+        futures = []
+        for expr, stdin in inputs:
+            conn_parent, conn_child = multiprocessing.Pipe()
+            p = multiprocessing.Process(target=_run_exec, args=(conn_child, code, expr, stdin))
+            p.start()
+            futures.append((p, conn_parent))
+
+        # Wait for results.
+        results = []
+        start = time.monotonic()
+        for p, conn in futures:
+            now = time.monotonic()
+            real_timeout = max(0, timeout - (now - start))
+            if conn.poll(real_timeout):
+                results.append(conn.recv())
+            else:
+                results.append('timed out')
+            p.terminate()
+        return results
 
     def push(self, stdin):
-        with self._access_lock:
-            if self._interpreter is not None:
-                self._python.push(self._interpreter, stdin)
+        self._control.put_nowait(('push', stdin))
 
     def destroy(self):
-        with self._access_lock:
-            if self._interpreter is not None:
-                self._python.destroy(self._interpreter)
-                self._interpreter = None
+        self._control.put_nowait(('done', None))
 
     def __del__(self):
-        # no locking needed if GC is removing us, as there cannot be any concurrent access by definition
-        if hasattr(self, '_interpreter') and self._interpreter is not None:
-            self._python.destroy(self._interpreter)
-            self._interpreter = None
+        self.destroy()
 
     def hint(self, sid, problem_id, program):
         language, problem_group, problem = Problem.get_identifier(problem_id)
@@ -105,3 +104,81 @@ class PythonSession(object):
                         hint_type = mod.hint_type[hint['id']]
                         hint_type.instantiate(hint, self._sent_hints)
             self._sent_hints.extend(hints)
+
+def _interpreter(control, callback):
+    directory = os.path.dirname(os.path.realpath(__file__))
+    # TODO drop privileges using a wrapper
+    script = os.path.join(directory, '..', 'python', 'interpreter.py')
+
+    proc = None
+    while True:
+        # Ensure the interpreter process is running.
+        if proc is None:
+            proc = subprocess.Popen([script],
+                    stdin=subprocess.PIPE,
+                    stdout=subprocess.PIPE,
+                    stderr=subprocess.STDOUT)
+            # Set the non-blocking flag for stdout.
+            flags = fcntl(proc.stdout.fileno(), F_GETFL)
+            fcntl(proc.stdout.fileno(), F_SETFL, flags | os.O_NONBLOCK)
+
+        # Get a control command.
+        try:
+            cmd, data = control.get_nowait()
+            if cmd == 'push':
+                proc.stdin.write(data.encode('utf-8'))
+                proc.stdin.flush()
+            elif cmd == 'done':
+                break
+        except:
+            pass
+
+        # Communicate with child process.
+        retcode = proc.poll()
+        if retcode is None:
+            data = proc.stdout.read()
+            if data:
+                # NOTE this might fail if read() stops in the middle of utf8 sequence
+                text = data.decode('utf-8')
+                if text:
+                    callback(text)
+        else:
+            if retcode == -31:
+                callback('Child killed due to sandbox misbehavior.\n')
+            else:
+                callback('Child exited with status "{}".\n'.format(retcode))
+            proc = None
+
+        # TODO we should select() on control and proc.stdout instead of polling
+        time.sleep(0.1)
+
+    # We are done, kill the child.
+    if proc is not None:
+        proc.kill()
+
+# Execute [code] and evaluate [expr]. Input is given by the string [stdin].
+# Return result of evaluation, the contents of stdout and stderr, and the
+# exception traceback.
+# TODO sandbox this
+def _run_exec(conn, code, expr=None, stdin=''):
+    result, out, err, exc = None, None, None, None
+    sys.stdin = io.StringIO(stdin)
+    sys.stdout = io.StringIO()
+    sys.stderr = io.StringIO()
+    try:
+        env = {}
+        if code:
+            exec(code, env)
+        if expr:
+            result = eval(expr, env)
+    except Exception as ex:
+        # Exception is not JSON serializable, so return traceback as string.
+        import traceback
+        exc = traceback.format_exc()
+    finally:
+        out = sys.stdout.getvalue()
+        err = sys.stderr.getvalue()
+        sys.stdin.close()
+        sys.stdout.close()
+        sys.stderr.close()
+        conn.send((result, out, err, exc))
diff --git a/server/user_session.py b/server/user_session.py
index 2cc629c..0d8535c 100644
--- a/server/user_session.py
+++ b/server/user_session.py
@@ -62,7 +62,8 @@ class UserSession(object):
     def get_python(self):
         with self._access_lock:
             if self.python_session is None:
-                self.python_session = python_session.PythonSession()  # lazy init
+                self.python_session = python_session.PythonSession(
+                    output_cb=lambda text: self.send({'event': 'terminal_output', 'text': text}))
             return self.python_session
 
     def get_problem_data(self, language, problem_group, problem):
diff --git a/web/main.js b/web/main.js
index c4c8691..03e9dad 100644
--- a/web/main.js
+++ b/web/main.js
@@ -137,11 +137,6 @@ var guiHandlers = {
         sendDataToPython(message).then(session.send, session.end).done();
     },
 
-    'python_pull': function actionPythonPull(session, message) {
-        logger.debug('Received python_pull from GUI');
-        sendDataToPython(message).then(session.send, session.end).done();
-    },
-
     'hint': function actionHint(session, message) {
         logger.debug('Received hint from GUI');
         sendDataToPython(message).then(session.send, session.end).done();
-- 
cgit v1.2.1