summaryrefslogtreecommitdiff
path: root/server
diff options
context:
space:
mode:
authorTimotej Lazar <timotej.lazar@araneo.org>2015-09-16 16:10:59 +0200
committerTimotej Lazar <timotej.lazar@araneo.org>2015-09-16 16:10:59 +0200
commit251c5e2ba0e85103c55cf31026739b2e7e9d4b90 (patch)
treed01abacb559c69fdfe3ef205a683496533da8369 /server
parentb9f1c57fdd6097e776235c105c58c29f84399523 (diff)
Implement async. comm. with Python interpreter
Creating, destroying and communicationg with the interpreter subprocess is now handled by a thread attached to PythonSession. Interpreter is sandboxed using libseccomp.
Diffstat (limited to 'server')
-rw-r--r--server/handlers.py9
-rw-r--r--server/python_session.py151
-rw-r--r--server/user_session.py3
3 files changed, 116 insertions, 47 deletions
diff --git a/server/handlers.py b/server/handlers.py
index fb50ff4..0cc474f 100644
--- a/server/handlers.py
+++ b/server/handlers.py
@@ -101,14 +101,6 @@ class Query(CodeqService):
request.reply(result)
-# Pull stdout/stderr from the session's Python interpreter. TODO: convert to async handling
-class PythonPull(CodeqService):
- def process(self, request):
- python = request.session.get_python()
- output = python.pull()
- request.reply({'code': 0, 'message': 'ok', 'terminal': {'text': output if output else ''}})
-
-
# Push stdin to the session's Python interpreter. TODO: convert to async handling
class PythonPush(CodeqService):
def process(self, request):
@@ -198,7 +190,6 @@ incoming_handlers = {
'activity': Activity(),
'query': Query(),
'python_push': PythonPush(),
- 'python_pull': PythonPull(),
'hint': Hint(),
'test': Test()
}
diff --git a/server/python_session.py b/server/python_session.py
index 4af455f..33fe7cc 100644
--- a/server/python_session.py
+++ b/server/python_session.py
@@ -1,8 +1,15 @@
# coding=utf-8
import ast
-import multiprocessing.managers
+from fcntl import fcntl, F_GETFL, F_SETFL
+import io
+import multiprocessing
+import os
+import queue
+import subprocess
+import sys
import threading
+import time
import server.user_session
from db.models import Problem
@@ -17,53 +24,45 @@ class PythonSession(object):
No properties are accessible; use getters and setters instead.
Values are passed by value instead of by reference (deep copy!).
"""
- def __init__(self):
+ def __init__(self, output_cb=None):
self._access_lock = threading.Lock()
self._sent_hints = []
- self._interpreter = None
- # Proxy for calling the Python runner. We use a separate connection for
- # each session so the runner can be restarted without affecting the
- # server.
- _m = multiprocessing.managers.BaseManager(address=('localhost', 3031),
- authkey=b'c0d3q3y-python')
- _m.register('Python')
- _m.connect()
- self._python = _m.Python()
-
- self.create()
+ self._control = queue.Queue()
+ self._interpreter = threading.Thread(target=_interpreter,
+ kwargs={'control': self._control, 'callback': output_cb})
+ self._interpreter.start()
def run(self, code=None, inputs=None, timeout=1.0):
- with self._access_lock:
- return self._python.run(code, inputs, timeout)
-
- def create(self):
- with self._access_lock:
- if self._interpreter is None:
- self._interpreter = self._python.create()
-
- def pull(self):
- with self._access_lock:
- if self._interpreter is None:
- return None
- return self._python.pull(self._interpreter)
+ # Launch processes.
+ futures = []
+ for expr, stdin in inputs:
+ conn_parent, conn_child = multiprocessing.Pipe()
+ p = multiprocessing.Process(target=_run_exec, args=(conn_child, code, expr, stdin))
+ p.start()
+ futures.append((p, conn_parent))
+
+ # Wait for results.
+ results = []
+ start = time.monotonic()
+ for p, conn in futures:
+ now = time.monotonic()
+ real_timeout = max(0, timeout - (now - start))
+ if conn.poll(real_timeout):
+ results.append(conn.recv())
+ else:
+ results.append('timed out')
+ p.terminate()
+ return results
def push(self, stdin):
- with self._access_lock:
- if self._interpreter is not None:
- self._python.push(self._interpreter, stdin)
+ self._control.put_nowait(('push', stdin))
def destroy(self):
- with self._access_lock:
- if self._interpreter is not None:
- self._python.destroy(self._interpreter)
- self._interpreter = None
+ self._control.put_nowait(('done', None))
def __del__(self):
- # no locking needed if GC is removing us, as there cannot be any concurrent access by definition
- if hasattr(self, '_interpreter') and self._interpreter is not None:
- self._python.destroy(self._interpreter)
- self._interpreter = None
+ self.destroy()
def hint(self, sid, problem_id, program):
language, problem_group, problem = Problem.get_identifier(problem_id)
@@ -105,3 +104,81 @@ class PythonSession(object):
hint_type = mod.hint_type[hint['id']]
hint_type.instantiate(hint, self._sent_hints)
self._sent_hints.extend(hints)
+
+def _interpreter(control, callback):
+ directory = os.path.dirname(os.path.realpath(__file__))
+ # TODO drop privileges using a wrapper
+ script = os.path.join(directory, '..', 'python', 'interpreter.py')
+
+ proc = None
+ while True:
+ # Ensure the interpreter process is running.
+ if proc is None:
+ proc = subprocess.Popen([script],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ # Set the non-blocking flag for stdout.
+ flags = fcntl(proc.stdout.fileno(), F_GETFL)
+ fcntl(proc.stdout.fileno(), F_SETFL, flags | os.O_NONBLOCK)
+
+ # Get a control command.
+ try:
+ cmd, data = control.get_nowait()
+ if cmd == 'push':
+ proc.stdin.write(data.encode('utf-8'))
+ proc.stdin.flush()
+ elif cmd == 'done':
+ break
+ except:
+ pass
+
+ # Communicate with child process.
+ retcode = proc.poll()
+ if retcode is None:
+ data = proc.stdout.read()
+ if data:
+ # NOTE this might fail if read() stops in the middle of utf8 sequence
+ text = data.decode('utf-8')
+ if text:
+ callback(text)
+ else:
+ if retcode == -31:
+ callback('Child killed due to sandbox misbehavior.\n')
+ else:
+ callback('Child exited with status "{}".\n'.format(retcode))
+ proc = None
+
+ # TODO we should select() on control and proc.stdout instead of polling
+ time.sleep(0.1)
+
+ # We are done, kill the child.
+ if proc is not None:
+ proc.kill()
+
+# Execute [code] and evaluate [expr]. Input is given by the string [stdin].
+# Return result of evaluation, the contents of stdout and stderr, and the
+# exception traceback.
+# TODO sandbox this
+def _run_exec(conn, code, expr=None, stdin=''):
+ result, out, err, exc = None, None, None, None
+ sys.stdin = io.StringIO(stdin)
+ sys.stdout = io.StringIO()
+ sys.stderr = io.StringIO()
+ try:
+ env = {}
+ if code:
+ exec(code, env)
+ if expr:
+ result = eval(expr, env)
+ except Exception as ex:
+ # Exception is not JSON serializable, so return traceback as string.
+ import traceback
+ exc = traceback.format_exc()
+ finally:
+ out = sys.stdout.getvalue()
+ err = sys.stderr.getvalue()
+ sys.stdin.close()
+ sys.stdout.close()
+ sys.stderr.close()
+ conn.send((result, out, err, exc))
diff --git a/server/user_session.py b/server/user_session.py
index 2cc629c..0d8535c 100644
--- a/server/user_session.py
+++ b/server/user_session.py
@@ -62,7 +62,8 @@ class UserSession(object):
def get_python(self):
with self._access_lock:
if self.python_session is None:
- self.python_session = python_session.PythonSession() # lazy init
+ self.python_session = python_session.PythonSession(
+ output_cb=lambda text: self.send({'event': 'terminal_output', 'text': text}))
return self.python_session
def get_problem_data(self, language, problem_group, problem):