summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorTimotej Lazar <timotej.lazar@araneo.org>2015-09-03 17:07:27 +0200
committerTimotej Lazar <timotej.lazar@araneo.org>2015-09-03 17:07:27 +0200
commit401bef2dcb434c23eb783131fb36d952043b9f31 (patch)
tree589a317e0c4d71318bb6b1f37ef465a4296973f5 /python
parent3a17e047b53760c787c050432372170e745a318e (diff)
Use multiprocessing.managers for the Python runner
Diffstat (limited to 'python')
-rwxr-xr-xpython/engine.py88
-rwxr-xr-xpython/runner/main.py132
2 files changed, 77 insertions, 143 deletions
diff --git a/python/engine.py b/python/engine.py
deleted file mode 100755
index 51c08b4..0000000
--- a/python/engine.py
+++ /dev/null
@@ -1,88 +0,0 @@
-#!/usr/bin/python3
-
-from fcntl import fcntl, F_GETFL, F_SETFL
-import http.client
-import json
-import os
-import subprocess
-
-# For each input, load [code] into a new environment and run it using a Python
-# runner daemon (so we can get structured results instead of parsing stdout).
-# Inputs are given as a list of pairs (expression, stdin). Return a list of
-# answers as tuples (result, stdout, stderr, exception). If the code times out
-# on some input, 'timed out' is returned instead at the corresponding position
-# in the answer list.
-def run(code=None, inputs=None, timeout=1.0):
- address, port = 'localhost', 3031 # TODO put this somewhere sane
- def request(path, args=None):
- headers = {'Content-Type': 'application/json;charset=utf-8'}
- body = json.dumps(args)
- try:
- conn = http.client.HTTPConnection(address, port, timeout=30)
- conn.request('GET', path, body=body, headers=headers)
- response = conn.getresponse()
- if response.status != http.client.OK:
- raise Exception('server returned {}'.format(response.status))
- reply = json.loads(response.read().decode('utf-8'))
- return reply
- finally:
- conn.close()
- return request('/run', {'code': code, 'inputs': inputs, 'timeout': timeout})
-
-# Start and return a new Python interpreter process.
-def create():
- directory = os.path.dirname(os.path.realpath(__file__))
- script = os.path.join(directory, 'runner', 'interpreter.py')
- p = subprocess.Popen([script],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- # Set the non-blocking flag for stdout.
- flags = fcntl(p.stdout, F_GETFL)
- fcntl(p.stdout, F_SETFL, flags | os.O_NONBLOCK)
- return p
-
-# Read any available bytes from the interpreter's stdout.
-def pull(interpreter):
- stdout = interpreter.stdout.read()
- if stdout:
- stdout = stdout.decode('utf-8')
- return stdout
-
-# Push a string to the interpreter's stdin.
-def push(interpreter, stdin):
- interpreter.stdin.write(stdin.encode('utf-8'))
- interpreter.stdin.flush()
-
-# Kill an interpreter process.
-def destroy(interpreter):
- interpreter.kill()
-
-
-# Basic sanity check.
-if __name__ == '__main__':
- import time
-
- code = '''\
-import sys
-def foo(x):
- y = int(input())
- print(x+y)
- sys.stderr.write('bar')
- if x+y == 6:
- while True:
- pass
- return x*y
-'''
- inputs = [
- ('foo(1)', '2\n'),
- ('foo(2)', '4\n'),
- ('foo(3)', '6\n'),
- ]
-
- start = time.monotonic()
- result = run(code=code, inputs=inputs, timeout=1.0)
- end = time.monotonic()
- for r in result['results']:
- print(r)
- print('time taken: {:.3f}'.format(end-start))
diff --git a/python/runner/main.py b/python/runner/main.py
index b9233cd..4e1af53 100755
--- a/python/runner/main.py
+++ b/python/runner/main.py
@@ -1,18 +1,23 @@
#!/usr/bin/python3
-from http.server import BaseHTTPRequestHandler, HTTPServer
-from socketserver import ForkingMixIn
-
+from fcntl import fcntl, F_GETFL, F_SETFL
import io
-import json
import multiprocessing
+import multiprocessing.managers
+import os
+import subprocess
import sys
+import threading
import time
+import uuid
+
+interpreters = {}
+module_access_lock = threading.Lock()
# Execute [code] and evaluate [expr]. Input is given by the string [stdin].
# Return result of evaluation, the contents of stdout and stderr, and the
# exception traceback.
-def run_exec(conn, code, expr=None, stdin=''):
+def _run_exec(conn, code, expr=None, stdin=''):
result, out, err, exc = None, None, None, None
sys.stdin = io.StringIO(stdin)
sys.stdout = io.StringIO()
@@ -35,61 +40,78 @@ def run_exec(conn, code, expr=None, stdin=''):
sys.stderr.close()
conn.send((result, out, err, exc))
-# Call run_exec in a separate process for each input and return a list of
-# results from those calls. If a call times out, 'timed out' is returned in
-# place of exception traceback.
-def run(code, inputs, timeout):
- # Launch processes.
- futures = []
- for expr, stdin in inputs:
- conn_parent, conn_child = multiprocessing.Pipe()
- p = multiprocessing.Process(target=run_exec, args=(conn_child, code, expr, stdin))
- p.start()
- futures.append((p, conn_parent))
+class Python(object):
+ # Call run_exec in a separate process for each input and return a list of
+ # results from those calls. If a call times out, 'timed out' is returned in
+ # place of exception traceback.
+ def run(self, code, inputs, timeout):
+ # Launch processes.
+ futures = []
+ for expr, stdin in inputs:
+ conn_parent, conn_child = multiprocessing.Pipe()
+ p = multiprocessing.Process(target=_run_exec, args=(conn_child, code, expr, stdin))
+ p.start()
+ futures.append((p, conn_parent))
- # Wait for results.
- results = []
- start = time.monotonic()
- for p, conn in futures:
- now = time.monotonic()
- real_timeout = max(0, timeout - (now - start))
- if conn.poll(real_timeout):
- results.append(conn.recv())
- else:
- results.append('timed out')
- p.terminate()
+ # Wait for results.
+ results = []
+ start = time.monotonic()
+ for p, conn in futures:
+ now = time.monotonic()
+ real_timeout = max(0, timeout - (now - start))
+ if conn.poll(real_timeout):
+ results.append(conn.recv())
+ else:
+ results.append('timed out')
+ p.terminate()
+ return results
- return results
+ # Start and return a new Python interpreter process.
+ def create(self):
+ directory = os.path.dirname(os.path.realpath(__file__))
+ script = os.path.join(directory, 'interpreter.py')
+ p = subprocess.Popen([script],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ # Set the non-blocking flag for stdout.
+ flags = fcntl(p.stdout, F_GETFL)
+ fcntl(p.stdout, F_SETFL, flags | os.O_NONBLOCK)
-class RequestHandler(BaseHTTPRequestHandler):
- def do_OPTIONS(self):
- self.send_response(200)
- self.end_headers()
+ interpreter_id = uuid.uuid4().hex
+ with module_access_lock:
+ interpreters[interpreter_id] = p
+ return interpreter_id
- def do_GET(self):
- reply = {'status': 'error'}
- if self.path == '/run':
- length = int(self.headers.get('Content-Length', 0))
- text = self.rfile.read(length)
- request = json.loads(text.decode('utf-8'))
- results = run(request.get('code', ''),
- request.get('inputs', []),
- request.get('timeout', 1.0))
- reply = {'status': 'ok', 'results': results}
+ # Read any available bytes from the interpreter's stdout.
+ def pull(self, interpreter_id):
+ with module_access_lock:
+ interpreter = interpreters[interpreter_id]
+ stdout = interpreter.stdout.read()
+ if stdout:
+ stdout = stdout.decode('utf-8')
+ return stdout
- reply_text = json.dumps(reply).encode('utf-8')
- self.send_response(200)
- self.send_header('Content-Length', len(reply_text))
- self.end_headers()
- self.wfile.write(reply_text)
+ # Push a string to the interpreter's stdin.
+ def push(self, interpreter_id, stdin):
+ with module_access_lock:
+ interpreter = interpreters[interpreter_id]
+ interpreter.stdin.write(stdin.encode('utf-8'))
+ interpreter.stdin.flush()
-class ForkingHTTPServer(ForkingMixIn, HTTPServer):
+ # Kill an interpreter process.
+ def destroy(self, interpreter_id):
+ with module_access_lock:
+ interpreter = interpreters[interpreter_id]
+ del interpreters[interpreter_id]
+ interpreter.kill()
+
+class PythonManager(multiprocessing.managers.BaseManager):
pass
-server = ForkingHTTPServer(('localhost', 3031), RequestHandler)
-print('Python engine started.')
-try:
- server.serve_forever()
-except KeyboardInterrupt:
- print('Keyboard interrupt received, exiting.')
- server.server_close()
+PythonManager.register('Python', callable=Python)
+
+if __name__ == '__main__':
+ print('Python engine started.')
+ m = PythonManager(address=('localhost', 3031), authkey=b'c0d3q3y-python')
+ m.get_server().serve_forever()