From 401bef2dcb434c23eb783131fb36d952043b9f31 Mon Sep 17 00:00:00 2001 From: Timotej Lazar Date: Thu, 3 Sep 2015 17:07:27 +0200 Subject: Use multiprocessing.managers for the Python runner --- python/engine.py | 88 ------------------------------- python/runner/main.py | 132 +++++++++++++++++++++++++++-------------------- server/python_session.py | 28 +++++++--- server/user_session.py | 3 ++ 4 files changed, 101 insertions(+), 150 deletions(-) delete mode 100755 python/engine.py diff --git a/python/engine.py b/python/engine.py deleted file mode 100755 index 51c08b4..0000000 --- a/python/engine.py +++ /dev/null @@ -1,88 +0,0 @@ -#!/usr/bin/python3 - -from fcntl import fcntl, F_GETFL, F_SETFL -import http.client -import json -import os -import subprocess - -# For each input, load [code] into a new environment and run it using a Python -# runner daemon (so we can get structured results instead of parsing stdout). -# Inputs are given as a list of pairs (expression, stdin). Return a list of -# answers as tuples (result, stdout, stderr, exception). If the code times out -# on some input, 'timed out' is returned instead at the corresponding position -# in the answer list. -def run(code=None, inputs=None, timeout=1.0): - address, port = 'localhost', 3031 # TODO put this somewhere sane - def request(path, args=None): - headers = {'Content-Type': 'application/json;charset=utf-8'} - body = json.dumps(args) - try: - conn = http.client.HTTPConnection(address, port, timeout=30) - conn.request('GET', path, body=body, headers=headers) - response = conn.getresponse() - if response.status != http.client.OK: - raise Exception('server returned {}'.format(response.status)) - reply = json.loads(response.read().decode('utf-8')) - return reply - finally: - conn.close() - return request('/run', {'code': code, 'inputs': inputs, 'timeout': timeout}) - -# Start and return a new Python interpreter process. -def create(): - directory = os.path.dirname(os.path.realpath(__file__)) - script = os.path.join(directory, 'runner', 'interpreter.py') - p = subprocess.Popen([script], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - # Set the non-blocking flag for stdout. - flags = fcntl(p.stdout, F_GETFL) - fcntl(p.stdout, F_SETFL, flags | os.O_NONBLOCK) - return p - -# Read any available bytes from the interpreter's stdout. -def pull(interpreter): - stdout = interpreter.stdout.read() - if stdout: - stdout = stdout.decode('utf-8') - return stdout - -# Push a string to the interpreter's stdin. -def push(interpreter, stdin): - interpreter.stdin.write(stdin.encode('utf-8')) - interpreter.stdin.flush() - -# Kill an interpreter process. -def destroy(interpreter): - interpreter.kill() - - -# Basic sanity check. -if __name__ == '__main__': - import time - - code = '''\ -import sys -def foo(x): - y = int(input()) - print(x+y) - sys.stderr.write('bar') - if x+y == 6: - while True: - pass - return x*y -''' - inputs = [ - ('foo(1)', '2\n'), - ('foo(2)', '4\n'), - ('foo(3)', '6\n'), - ] - - start = time.monotonic() - result = run(code=code, inputs=inputs, timeout=1.0) - end = time.monotonic() - for r in result['results']: - print(r) - print('time taken: {:.3f}'.format(end-start)) diff --git a/python/runner/main.py b/python/runner/main.py index b9233cd..4e1af53 100755 --- a/python/runner/main.py +++ b/python/runner/main.py @@ -1,18 +1,23 @@ #!/usr/bin/python3 -from http.server import BaseHTTPRequestHandler, HTTPServer -from socketserver import ForkingMixIn - +from fcntl import fcntl, F_GETFL, F_SETFL import io -import json import multiprocessing +import multiprocessing.managers +import os +import subprocess import sys +import threading import time +import uuid + +interpreters = {} +module_access_lock = threading.Lock() # Execute [code] and evaluate [expr]. Input is given by the string [stdin]. # Return result of evaluation, the contents of stdout and stderr, and the # exception traceback. -def run_exec(conn, code, expr=None, stdin=''): +def _run_exec(conn, code, expr=None, stdin=''): result, out, err, exc = None, None, None, None sys.stdin = io.StringIO(stdin) sys.stdout = io.StringIO() @@ -35,61 +40,78 @@ def run_exec(conn, code, expr=None, stdin=''): sys.stderr.close() conn.send((result, out, err, exc)) -# Call run_exec in a separate process for each input and return a list of -# results from those calls. If a call times out, 'timed out' is returned in -# place of exception traceback. -def run(code, inputs, timeout): - # Launch processes. - futures = [] - for expr, stdin in inputs: - conn_parent, conn_child = multiprocessing.Pipe() - p = multiprocessing.Process(target=run_exec, args=(conn_child, code, expr, stdin)) - p.start() - futures.append((p, conn_parent)) +class Python(object): + # Call run_exec in a separate process for each input and return a list of + # results from those calls. If a call times out, 'timed out' is returned in + # place of exception traceback. + def run(self, code, inputs, timeout): + # Launch processes. + futures = [] + for expr, stdin in inputs: + conn_parent, conn_child = multiprocessing.Pipe() + p = multiprocessing.Process(target=_run_exec, args=(conn_child, code, expr, stdin)) + p.start() + futures.append((p, conn_parent)) - # Wait for results. - results = [] - start = time.monotonic() - for p, conn in futures: - now = time.monotonic() - real_timeout = max(0, timeout - (now - start)) - if conn.poll(real_timeout): - results.append(conn.recv()) - else: - results.append('timed out') - p.terminate() + # Wait for results. + results = [] + start = time.monotonic() + for p, conn in futures: + now = time.monotonic() + real_timeout = max(0, timeout - (now - start)) + if conn.poll(real_timeout): + results.append(conn.recv()) + else: + results.append('timed out') + p.terminate() + return results - return results + # Start and return a new Python interpreter process. + def create(self): + directory = os.path.dirname(os.path.realpath(__file__)) + script = os.path.join(directory, 'interpreter.py') + p = subprocess.Popen([script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + # Set the non-blocking flag for stdout. + flags = fcntl(p.stdout, F_GETFL) + fcntl(p.stdout, F_SETFL, flags | os.O_NONBLOCK) -class RequestHandler(BaseHTTPRequestHandler): - def do_OPTIONS(self): - self.send_response(200) - self.end_headers() + interpreter_id = uuid.uuid4().hex + with module_access_lock: + interpreters[interpreter_id] = p + return interpreter_id - def do_GET(self): - reply = {'status': 'error'} - if self.path == '/run': - length = int(self.headers.get('Content-Length', 0)) - text = self.rfile.read(length) - request = json.loads(text.decode('utf-8')) - results = run(request.get('code', ''), - request.get('inputs', []), - request.get('timeout', 1.0)) - reply = {'status': 'ok', 'results': results} + # Read any available bytes from the interpreter's stdout. + def pull(self, interpreter_id): + with module_access_lock: + interpreter = interpreters[interpreter_id] + stdout = interpreter.stdout.read() + if stdout: + stdout = stdout.decode('utf-8') + return stdout - reply_text = json.dumps(reply).encode('utf-8') - self.send_response(200) - self.send_header('Content-Length', len(reply_text)) - self.end_headers() - self.wfile.write(reply_text) + # Push a string to the interpreter's stdin. + def push(self, interpreter_id, stdin): + with module_access_lock: + interpreter = interpreters[interpreter_id] + interpreter.stdin.write(stdin.encode('utf-8')) + interpreter.stdin.flush() -class ForkingHTTPServer(ForkingMixIn, HTTPServer): + # Kill an interpreter process. + def destroy(self, interpreter_id): + with module_access_lock: + interpreter = interpreters[interpreter_id] + del interpreters[interpreter_id] + interpreter.kill() + +class PythonManager(multiprocessing.managers.BaseManager): pass -server = ForkingHTTPServer(('localhost', 3031), RequestHandler) -print('Python engine started.') -try: - server.serve_forever() -except KeyboardInterrupt: - print('Keyboard interrupt received, exiting.') - server.server_close() +PythonManager.register('Python', callable=Python) + +if __name__ == '__main__': + print('Python engine started.') + m = PythonManager(address=('localhost', 3031), authkey=b'c0d3q3y-python') + m.get_server().serve_forever() diff --git a/server/python_session.py b/server/python_session.py index 641c8ee..648a901 100644 --- a/server/python_session.py +++ b/server/python_session.py @@ -1,7 +1,7 @@ # coding=utf-8 +import multiprocessing.managers import threading -import python.engine from db.models import Problem from . import problems @@ -18,31 +18,45 @@ class PythonSession(object): self._access_lock = threading.Lock() self._interpreter = None + # Proxy for calling the Python runner. We use a separate connection for + # each session so the runner can be restarted without affecting the + # server. + _m = multiprocessing.managers.BaseManager(address=('localhost', 3031), + authkey=b'c0d3q3y-python') + _m.register('Python') + _m.connect() + self._python = _m.Python() + + def run(self, code=None, inputs=None, timeout=1.0): + with self._access_lock: + return self._python.run(code, inputs, timeout) + def create(self): with self._access_lock: - if not self._interpreter: - self._interpreter = python.engine.create() + if self._interpreter is None: + self._interpreter = self._python.create() def pull(self): with self._access_lock: if self._interpreter is None: return 'Python is not running' - return python.engine.pull(self._interpreter) + return self._python.pull(self._interpreter) def push(self, stdin): with self._access_lock: if self._interpreter is not None: - python.engine.push(self._interpreter, stdin) + self._python.push(self._interpreter, stdin) def destroy(self): with self._access_lock: if self._interpreter is not None: - python.engine.destroy(self._interpreter) + self._python.destroy(self._interpreter) self._interpreter = None def __del__(self): + # no locking needed if GC is removing us, as there cannot be any concurrent access by definition if hasattr(self, '_interpreter') and self._interpreter is not None: - python.engine.destroy(self._interpreter) + self._python.destroy(self._interpreter) self._interpreter = None def hint(self, user_id, problem_id, program): diff --git a/server/user_session.py b/server/user_session.py index 60289dc..f21102c 100644 --- a/server/user_session.py +++ b/server/user_session.py @@ -133,6 +133,9 @@ class UserSession(object): if hasattr(self, 'prolog_session') and (self.prolog_session is not None): self.prolog_session.end() self.prolog_session = None + if hasattr(self, 'python_session') and (self.python_session is not None): + self.python_session.destroy() + self.python_session = None # TODO: add any cleanups as features are added! def get_session_by_id(sid): -- cgit v1.2.1