summaryrefslogtreecommitdiff
path: root/python/runner
diff options
context:
space:
mode:
authorTimotej Lazar <timotej.lazar@araneo.org>2015-09-03 17:07:27 +0200
committerTimotej Lazar <timotej.lazar@araneo.org>2015-09-03 17:07:27 +0200
commit401bef2dcb434c23eb783131fb36d952043b9f31 (patch)
tree589a317e0c4d71318bb6b1f37ef465a4296973f5 /python/runner
parent3a17e047b53760c787c050432372170e745a318e (diff)
Use multiprocessing.managers for the Python runner
Diffstat (limited to 'python/runner')
-rwxr-xr-xpython/runner/main.py132
1 files changed, 77 insertions, 55 deletions
diff --git a/python/runner/main.py b/python/runner/main.py
index b9233cd..4e1af53 100755
--- a/python/runner/main.py
+++ b/python/runner/main.py
@@ -1,18 +1,23 @@
#!/usr/bin/python3
-from http.server import BaseHTTPRequestHandler, HTTPServer
-from socketserver import ForkingMixIn
-
+from fcntl import fcntl, F_GETFL, F_SETFL
import io
-import json
import multiprocessing
+import multiprocessing.managers
+import os
+import subprocess
import sys
+import threading
import time
+import uuid
+
+interpreters = {}
+module_access_lock = threading.Lock()
# Execute [code] and evaluate [expr]. Input is given by the string [stdin].
# Return result of evaluation, the contents of stdout and stderr, and the
# exception traceback.
-def run_exec(conn, code, expr=None, stdin=''):
+def _run_exec(conn, code, expr=None, stdin=''):
result, out, err, exc = None, None, None, None
sys.stdin = io.StringIO(stdin)
sys.stdout = io.StringIO()
@@ -35,61 +40,78 @@ def run_exec(conn, code, expr=None, stdin=''):
sys.stderr.close()
conn.send((result, out, err, exc))
-# Call run_exec in a separate process for each input and return a list of
-# results from those calls. If a call times out, 'timed out' is returned in
-# place of exception traceback.
-def run(code, inputs, timeout):
- # Launch processes.
- futures = []
- for expr, stdin in inputs:
- conn_parent, conn_child = multiprocessing.Pipe()
- p = multiprocessing.Process(target=run_exec, args=(conn_child, code, expr, stdin))
- p.start()
- futures.append((p, conn_parent))
+class Python(object):
+ # Call run_exec in a separate process for each input and return a list of
+ # results from those calls. If a call times out, 'timed out' is returned in
+ # place of exception traceback.
+ def run(self, code, inputs, timeout):
+ # Launch processes.
+ futures = []
+ for expr, stdin in inputs:
+ conn_parent, conn_child = multiprocessing.Pipe()
+ p = multiprocessing.Process(target=_run_exec, args=(conn_child, code, expr, stdin))
+ p.start()
+ futures.append((p, conn_parent))
- # Wait for results.
- results = []
- start = time.monotonic()
- for p, conn in futures:
- now = time.monotonic()
- real_timeout = max(0, timeout - (now - start))
- if conn.poll(real_timeout):
- results.append(conn.recv())
- else:
- results.append('timed out')
- p.terminate()
+ # Wait for results.
+ results = []
+ start = time.monotonic()
+ for p, conn in futures:
+ now = time.monotonic()
+ real_timeout = max(0, timeout - (now - start))
+ if conn.poll(real_timeout):
+ results.append(conn.recv())
+ else:
+ results.append('timed out')
+ p.terminate()
+ return results
- return results
+ # Start and return a new Python interpreter process.
+ def create(self):
+ directory = os.path.dirname(os.path.realpath(__file__))
+ script = os.path.join(directory, 'interpreter.py')
+ p = subprocess.Popen([script],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ # Set the non-blocking flag for stdout.
+ flags = fcntl(p.stdout, F_GETFL)
+ fcntl(p.stdout, F_SETFL, flags | os.O_NONBLOCK)
-class RequestHandler(BaseHTTPRequestHandler):
- def do_OPTIONS(self):
- self.send_response(200)
- self.end_headers()
+ interpreter_id = uuid.uuid4().hex
+ with module_access_lock:
+ interpreters[interpreter_id] = p
+ return interpreter_id
- def do_GET(self):
- reply = {'status': 'error'}
- if self.path == '/run':
- length = int(self.headers.get('Content-Length', 0))
- text = self.rfile.read(length)
- request = json.loads(text.decode('utf-8'))
- results = run(request.get('code', ''),
- request.get('inputs', []),
- request.get('timeout', 1.0))
- reply = {'status': 'ok', 'results': results}
+ # Read any available bytes from the interpreter's stdout.
+ def pull(self, interpreter_id):
+ with module_access_lock:
+ interpreter = interpreters[interpreter_id]
+ stdout = interpreter.stdout.read()
+ if stdout:
+ stdout = stdout.decode('utf-8')
+ return stdout
- reply_text = json.dumps(reply).encode('utf-8')
- self.send_response(200)
- self.send_header('Content-Length', len(reply_text))
- self.end_headers()
- self.wfile.write(reply_text)
+ # Push a string to the interpreter's stdin.
+ def push(self, interpreter_id, stdin):
+ with module_access_lock:
+ interpreter = interpreters[interpreter_id]
+ interpreter.stdin.write(stdin.encode('utf-8'))
+ interpreter.stdin.flush()
-class ForkingHTTPServer(ForkingMixIn, HTTPServer):
+ # Kill an interpreter process.
+ def destroy(self, interpreter_id):
+ with module_access_lock:
+ interpreter = interpreters[interpreter_id]
+ del interpreters[interpreter_id]
+ interpreter.kill()
+
+class PythonManager(multiprocessing.managers.BaseManager):
pass
-server = ForkingHTTPServer(('localhost', 3031), RequestHandler)
-print('Python engine started.')
-try:
- server.serve_forever()
-except KeyboardInterrupt:
- print('Keyboard interrupt received, exiting.')
- server.server_close()
+PythonManager.register('Python', callable=Python)
+
+if __name__ == '__main__':
+ print('Python engine started.')
+ m = PythonManager(address=('localhost', 3031), authkey=b'c0d3q3y-python')
+ m.get_server().serve_forever()