#!/usr/bin/python3 from fcntl import fcntl, F_GETFL, F_SETFL import io import multiprocessing import multiprocessing.managers import os import subprocess import sys import threading import time import uuid interpreters = {} module_access_lock = threading.Lock() # Execute [code] and evaluate [expr]. Input is given by the string [stdin]. # Return result of evaluation, the contents of stdout and stderr, and the # exception traceback. def _run_exec(conn, code, expr=None, stdin=''): result, out, err, exc = None, None, None, None sys.stdin = io.StringIO(stdin) sys.stdout = io.StringIO() sys.stderr = io.StringIO() try: env = {} if code: exec(code, env) if expr: result = eval(expr, env) except Exception as ex: # Exception is not JSON serializable, so return traceback as string. import traceback exc = traceback.format_exc() finally: out = sys.stdout.getvalue() err = sys.stderr.getvalue() sys.stdin.close() sys.stdout.close() sys.stderr.close() conn.send((result, out, err, exc)) class Python(object): # Call run_exec in a separate process for each input and return a list of # results from those calls. If a call times out, 'timed out' is returned in # place of exception traceback. def run(self, code, inputs, timeout): # Launch processes. futures = [] for expr, stdin in inputs: conn_parent, conn_child = multiprocessing.Pipe() p = multiprocessing.Process(target=_run_exec, args=(conn_child, code, expr, stdin)) p.start() futures.append((p, conn_parent)) # Wait for results. results = [] start = time.monotonic() for p, conn in futures: now = time.monotonic() real_timeout = max(0, timeout - (now - start)) if conn.poll(real_timeout): results.append(conn.recv()) else: results.append('timed out') p.terminate() return results # Start and return a new Python interpreter process. def create(self): directory = os.path.dirname(os.path.realpath(__file__)) script = os.path.join(directory, 'interpreter.py') p = subprocess.Popen([script], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) # Set the non-blocking flag for stdout. flags = fcntl(p.stdout, F_GETFL) fcntl(p.stdout, F_SETFL, flags | os.O_NONBLOCK) interpreter_id = uuid.uuid4().hex with module_access_lock: interpreters[interpreter_id] = p return interpreter_id # Read any available bytes from the interpreter's stdout. def pull(self, interpreter_id): with module_access_lock: interpreter = interpreters[interpreter_id] stdout = interpreter.stdout.read() if stdout: stdout = stdout.decode('utf-8') return stdout # Push a string to the interpreter's stdin. def push(self, interpreter_id, stdin): with module_access_lock: interpreter = interpreters[interpreter_id] interpreter.stdin.write(stdin.encode('utf-8')) interpreter.stdin.flush() # Kill an interpreter process. def destroy(self, interpreter_id): with module_access_lock: interpreter = interpreters[interpreter_id] del interpreters[interpreter_id] interpreter.kill() class PythonManager(multiprocessing.managers.BaseManager): pass PythonManager.register('Python', callable=Python) if __name__ == '__main__': print('Python engine started.') m = PythonManager(address=('localhost', 3031), authkey=b'c0d3q3y-python') m.get_server().serve_forever()