From 401bef2dcb434c23eb783131fb36d952043b9f31 Mon Sep 17 00:00:00 2001 From: Timotej Lazar Date: Thu, 3 Sep 2015 17:07:27 +0200 Subject: Use multiprocessing.managers for the Python runner --- python/runner/main.py | 132 +++++++++++++++++++++++++++++--------------------- 1 file changed, 77 insertions(+), 55 deletions(-) (limited to 'python/runner') diff --git a/python/runner/main.py b/python/runner/main.py index b9233cd..4e1af53 100755 --- a/python/runner/main.py +++ b/python/runner/main.py @@ -1,18 +1,23 @@ #!/usr/bin/python3 -from http.server import BaseHTTPRequestHandler, HTTPServer -from socketserver import ForkingMixIn - +from fcntl import fcntl, F_GETFL, F_SETFL import io -import json import multiprocessing +import multiprocessing.managers +import os +import subprocess import sys +import threading import time +import uuid + +interpreters = {} +module_access_lock = threading.Lock() # Execute [code] and evaluate [expr]. Input is given by the string [stdin]. # Return result of evaluation, the contents of stdout and stderr, and the # exception traceback. -def run_exec(conn, code, expr=None, stdin=''): +def _run_exec(conn, code, expr=None, stdin=''): result, out, err, exc = None, None, None, None sys.stdin = io.StringIO(stdin) sys.stdout = io.StringIO() @@ -35,61 +40,78 @@ def run_exec(conn, code, expr=None, stdin=''): sys.stderr.close() conn.send((result, out, err, exc)) -# Call run_exec in a separate process for each input and return a list of -# results from those calls. If a call times out, 'timed out' is returned in -# place of exception traceback. -def run(code, inputs, timeout): - # Launch processes. - futures = [] - for expr, stdin in inputs: - conn_parent, conn_child = multiprocessing.Pipe() - p = multiprocessing.Process(target=run_exec, args=(conn_child, code, expr, stdin)) - p.start() - futures.append((p, conn_parent)) +class Python(object): + # Call run_exec in a separate process for each input and return a list of + # results from those calls. If a call times out, 'timed out' is returned in + # place of exception traceback. + def run(self, code, inputs, timeout): + # Launch processes. + futures = [] + for expr, stdin in inputs: + conn_parent, conn_child = multiprocessing.Pipe() + p = multiprocessing.Process(target=_run_exec, args=(conn_child, code, expr, stdin)) + p.start() + futures.append((p, conn_parent)) - # Wait for results. - results = [] - start = time.monotonic() - for p, conn in futures: - now = time.monotonic() - real_timeout = max(0, timeout - (now - start)) - if conn.poll(real_timeout): - results.append(conn.recv()) - else: - results.append('timed out') - p.terminate() + # Wait for results. + results = [] + start = time.monotonic() + for p, conn in futures: + now = time.monotonic() + real_timeout = max(0, timeout - (now - start)) + if conn.poll(real_timeout): + results.append(conn.recv()) + else: + results.append('timed out') + p.terminate() + return results - return results + # Start and return a new Python interpreter process. + def create(self): + directory = os.path.dirname(os.path.realpath(__file__)) + script = os.path.join(directory, 'interpreter.py') + p = subprocess.Popen([script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + # Set the non-blocking flag for stdout. + flags = fcntl(p.stdout, F_GETFL) + fcntl(p.stdout, F_SETFL, flags | os.O_NONBLOCK) -class RequestHandler(BaseHTTPRequestHandler): - def do_OPTIONS(self): - self.send_response(200) - self.end_headers() + interpreter_id = uuid.uuid4().hex + with module_access_lock: + interpreters[interpreter_id] = p + return interpreter_id - def do_GET(self): - reply = {'status': 'error'} - if self.path == '/run': - length = int(self.headers.get('Content-Length', 0)) - text = self.rfile.read(length) - request = json.loads(text.decode('utf-8')) - results = run(request.get('code', ''), - request.get('inputs', []), - request.get('timeout', 1.0)) - reply = {'status': 'ok', 'results': results} + # Read any available bytes from the interpreter's stdout. + def pull(self, interpreter_id): + with module_access_lock: + interpreter = interpreters[interpreter_id] + stdout = interpreter.stdout.read() + if stdout: + stdout = stdout.decode('utf-8') + return stdout - reply_text = json.dumps(reply).encode('utf-8') - self.send_response(200) - self.send_header('Content-Length', len(reply_text)) - self.end_headers() - self.wfile.write(reply_text) + # Push a string to the interpreter's stdin. + def push(self, interpreter_id, stdin): + with module_access_lock: + interpreter = interpreters[interpreter_id] + interpreter.stdin.write(stdin.encode('utf-8')) + interpreter.stdin.flush() -class ForkingHTTPServer(ForkingMixIn, HTTPServer): + # Kill an interpreter process. + def destroy(self, interpreter_id): + with module_access_lock: + interpreter = interpreters[interpreter_id] + del interpreters[interpreter_id] + interpreter.kill() + +class PythonManager(multiprocessing.managers.BaseManager): pass -server = ForkingHTTPServer(('localhost', 3031), RequestHandler) -print('Python engine started.') -try: - server.serve_forever() -except KeyboardInterrupt: - print('Keyboard interrupt received, exiting.') - server.server_close() +PythonManager.register('Python', callable=Python) + +if __name__ == '__main__': + print('Python engine started.') + m = PythonManager(address=('localhost', 3031), authkey=b'c0d3q3y-python') + m.get_server().serve_forever() -- cgit v1.2.1