From 401bef2dcb434c23eb783131fb36d952043b9f31 Mon Sep 17 00:00:00 2001 From: Timotej Lazar Date: Thu, 3 Sep 2015 17:07:27 +0200 Subject: Use multiprocessing.managers for the Python runner --- python/engine.py | 88 --------------------------------- python/runner/main.py | 132 +++++++++++++++++++++++++++++--------------------- 2 files changed, 77 insertions(+), 143 deletions(-) delete mode 100755 python/engine.py (limited to 'python') diff --git a/python/engine.py b/python/engine.py deleted file mode 100755 index 51c08b4..0000000 --- a/python/engine.py +++ /dev/null @@ -1,88 +0,0 @@ -#!/usr/bin/python3 - -from fcntl import fcntl, F_GETFL, F_SETFL -import http.client -import json -import os -import subprocess - -# For each input, load [code] into a new environment and run it using a Python -# runner daemon (so we can get structured results instead of parsing stdout). -# Inputs are given as a list of pairs (expression, stdin). Return a list of -# answers as tuples (result, stdout, stderr, exception). If the code times out -# on some input, 'timed out' is returned instead at the corresponding position -# in the answer list. -def run(code=None, inputs=None, timeout=1.0): - address, port = 'localhost', 3031 # TODO put this somewhere sane - def request(path, args=None): - headers = {'Content-Type': 'application/json;charset=utf-8'} - body = json.dumps(args) - try: - conn = http.client.HTTPConnection(address, port, timeout=30) - conn.request('GET', path, body=body, headers=headers) - response = conn.getresponse() - if response.status != http.client.OK: - raise Exception('server returned {}'.format(response.status)) - reply = json.loads(response.read().decode('utf-8')) - return reply - finally: - conn.close() - return request('/run', {'code': code, 'inputs': inputs, 'timeout': timeout}) - -# Start and return a new Python interpreter process. -def create(): - directory = os.path.dirname(os.path.realpath(__file__)) - script = os.path.join(directory, 'runner', 'interpreter.py') - p = subprocess.Popen([script], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - # Set the non-blocking flag for stdout. - flags = fcntl(p.stdout, F_GETFL) - fcntl(p.stdout, F_SETFL, flags | os.O_NONBLOCK) - return p - -# Read any available bytes from the interpreter's stdout. -def pull(interpreter): - stdout = interpreter.stdout.read() - if stdout: - stdout = stdout.decode('utf-8') - return stdout - -# Push a string to the interpreter's stdin. -def push(interpreter, stdin): - interpreter.stdin.write(stdin.encode('utf-8')) - interpreter.stdin.flush() - -# Kill an interpreter process. -def destroy(interpreter): - interpreter.kill() - - -# Basic sanity check. -if __name__ == '__main__': - import time - - code = '''\ -import sys -def foo(x): - y = int(input()) - print(x+y) - sys.stderr.write('bar') - if x+y == 6: - while True: - pass - return x*y -''' - inputs = [ - ('foo(1)', '2\n'), - ('foo(2)', '4\n'), - ('foo(3)', '6\n'), - ] - - start = time.monotonic() - result = run(code=code, inputs=inputs, timeout=1.0) - end = time.monotonic() - for r in result['results']: - print(r) - print('time taken: {:.3f}'.format(end-start)) diff --git a/python/runner/main.py b/python/runner/main.py index b9233cd..4e1af53 100755 --- a/python/runner/main.py +++ b/python/runner/main.py @@ -1,18 +1,23 @@ #!/usr/bin/python3 -from http.server import BaseHTTPRequestHandler, HTTPServer -from socketserver import ForkingMixIn - +from fcntl import fcntl, F_GETFL, F_SETFL import io -import json import multiprocessing +import multiprocessing.managers +import os +import subprocess import sys +import threading import time +import uuid + +interpreters = {} +module_access_lock = threading.Lock() # Execute [code] and evaluate [expr]. Input is given by the string [stdin]. # Return result of evaluation, the contents of stdout and stderr, and the # exception traceback. -def run_exec(conn, code, expr=None, stdin=''): +def _run_exec(conn, code, expr=None, stdin=''): result, out, err, exc = None, None, None, None sys.stdin = io.StringIO(stdin) sys.stdout = io.StringIO() @@ -35,61 +40,78 @@ def run_exec(conn, code, expr=None, stdin=''): sys.stderr.close() conn.send((result, out, err, exc)) -# Call run_exec in a separate process for each input and return a list of -# results from those calls. If a call times out, 'timed out' is returned in -# place of exception traceback. -def run(code, inputs, timeout): - # Launch processes. - futures = [] - for expr, stdin in inputs: - conn_parent, conn_child = multiprocessing.Pipe() - p = multiprocessing.Process(target=run_exec, args=(conn_child, code, expr, stdin)) - p.start() - futures.append((p, conn_parent)) +class Python(object): + # Call run_exec in a separate process for each input and return a list of + # results from those calls. If a call times out, 'timed out' is returned in + # place of exception traceback. + def run(self, code, inputs, timeout): + # Launch processes. + futures = [] + for expr, stdin in inputs: + conn_parent, conn_child = multiprocessing.Pipe() + p = multiprocessing.Process(target=_run_exec, args=(conn_child, code, expr, stdin)) + p.start() + futures.append((p, conn_parent)) - # Wait for results. - results = [] - start = time.monotonic() - for p, conn in futures: - now = time.monotonic() - real_timeout = max(0, timeout - (now - start)) - if conn.poll(real_timeout): - results.append(conn.recv()) - else: - results.append('timed out') - p.terminate() + # Wait for results. + results = [] + start = time.monotonic() + for p, conn in futures: + now = time.monotonic() + real_timeout = max(0, timeout - (now - start)) + if conn.poll(real_timeout): + results.append(conn.recv()) + else: + results.append('timed out') + p.terminate() + return results - return results + # Start and return a new Python interpreter process. + def create(self): + directory = os.path.dirname(os.path.realpath(__file__)) + script = os.path.join(directory, 'interpreter.py') + p = subprocess.Popen([script], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + # Set the non-blocking flag for stdout. + flags = fcntl(p.stdout, F_GETFL) + fcntl(p.stdout, F_SETFL, flags | os.O_NONBLOCK) -class RequestHandler(BaseHTTPRequestHandler): - def do_OPTIONS(self): - self.send_response(200) - self.end_headers() + interpreter_id = uuid.uuid4().hex + with module_access_lock: + interpreters[interpreter_id] = p + return interpreter_id - def do_GET(self): - reply = {'status': 'error'} - if self.path == '/run': - length = int(self.headers.get('Content-Length', 0)) - text = self.rfile.read(length) - request = json.loads(text.decode('utf-8')) - results = run(request.get('code', ''), - request.get('inputs', []), - request.get('timeout', 1.0)) - reply = {'status': 'ok', 'results': results} + # Read any available bytes from the interpreter's stdout. + def pull(self, interpreter_id): + with module_access_lock: + interpreter = interpreters[interpreter_id] + stdout = interpreter.stdout.read() + if stdout: + stdout = stdout.decode('utf-8') + return stdout - reply_text = json.dumps(reply).encode('utf-8') - self.send_response(200) - self.send_header('Content-Length', len(reply_text)) - self.end_headers() - self.wfile.write(reply_text) + # Push a string to the interpreter's stdin. + def push(self, interpreter_id, stdin): + with module_access_lock: + interpreter = interpreters[interpreter_id] + interpreter.stdin.write(stdin.encode('utf-8')) + interpreter.stdin.flush() -class ForkingHTTPServer(ForkingMixIn, HTTPServer): + # Kill an interpreter process. + def destroy(self, interpreter_id): + with module_access_lock: + interpreter = interpreters[interpreter_id] + del interpreters[interpreter_id] + interpreter.kill() + +class PythonManager(multiprocessing.managers.BaseManager): pass -server = ForkingHTTPServer(('localhost', 3031), RequestHandler) -print('Python engine started.') -try: - server.serve_forever() -except KeyboardInterrupt: - print('Keyboard interrupt received, exiting.') - server.server_close() +PythonManager.register('Python', callable=Python) + +if __name__ == '__main__': + print('Python engine started.') + m = PythonManager(address=('localhost', 3031), authkey=b'c0d3q3y-python') + m.get_server().serve_forever() -- cgit v1.2.1