summaryrefslogtreecommitdiff
path: root/python/runner/main.py
blob: 4e1af5303277d18e3adbc821ded093a3328f287f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#!/usr/bin/python3

from fcntl import fcntl, F_GETFL, F_SETFL
import io
import multiprocessing
import multiprocessing.managers
import os
import subprocess
import sys
import threading
import time
import uuid

interpreters = {}
module_access_lock = threading.Lock()

# Execute [code] and evaluate [expr]. Input is given by the string [stdin].
# Return result of evaluation, the contents of stdout and stderr, and the
# exception traceback.
def _run_exec(conn, code, expr=None, stdin=''):
    result, out, err, exc = None, None, None, None
    sys.stdin = io.StringIO(stdin)
    sys.stdout = io.StringIO()
    sys.stderr = io.StringIO()
    try:
        env = {}
        if code:
            exec(code, env)
        if expr:
            result = eval(expr, env)
    except Exception as ex:
        # Exception is not JSON serializable, so return traceback as string.
        import traceback
        exc = traceback.format_exc()
    finally:
        out = sys.stdout.getvalue()
        err = sys.stderr.getvalue()
        sys.stdin.close()
        sys.stdout.close()
        sys.stderr.close()
        conn.send((result, out, err, exc))

class Python(object):
    # Call run_exec in a separate process for each input and return a list of
    # results from those calls. If a call times out, 'timed out' is returned in
    # place of exception traceback.
    def run(self, code, inputs, timeout):
        # Launch processes.
        futures = []
        for expr, stdin in inputs:
            conn_parent, conn_child = multiprocessing.Pipe()
            p = multiprocessing.Process(target=_run_exec, args=(conn_child, code, expr, stdin))
            p.start()
            futures.append((p, conn_parent))

        # Wait for results.
        results = []
        start = time.monotonic()
        for p, conn in futures:
            now = time.monotonic()
            real_timeout = max(0, timeout - (now - start))
            if conn.poll(real_timeout):
                results.append(conn.recv())
            else:
                results.append('timed out')
            p.terminate()
        return results

    # Start and return a new Python interpreter process.
    def create(self):
        directory = os.path.dirname(os.path.realpath(__file__))
        script = os.path.join(directory, 'interpreter.py')
        p = subprocess.Popen([script],
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT)
        # Set the non-blocking flag for stdout.
        flags = fcntl(p.stdout, F_GETFL)
        fcntl(p.stdout, F_SETFL, flags | os.O_NONBLOCK)

        interpreter_id = uuid.uuid4().hex
        with module_access_lock:
            interpreters[interpreter_id] = p
        return interpreter_id

    # Read any available bytes from the interpreter's stdout.
    def pull(self, interpreter_id):
        with module_access_lock:
            interpreter = interpreters[interpreter_id]
        stdout = interpreter.stdout.read()
        if stdout:
            stdout = stdout.decode('utf-8')
        return stdout

    # Push a string to the interpreter's stdin.
    def push(self, interpreter_id, stdin):
        with module_access_lock:
            interpreter = interpreters[interpreter_id]
        interpreter.stdin.write(stdin.encode('utf-8'))
        interpreter.stdin.flush()

    # Kill an interpreter process.
    def destroy(self, interpreter_id):
        with module_access_lock:
            interpreter = interpreters[interpreter_id]
            del interpreters[interpreter_id]
        interpreter.kill()

class PythonManager(multiprocessing.managers.BaseManager):
    pass

PythonManager.register('Python', callable=Python)

if __name__ == '__main__':
    print('Python engine started.')
    m = PythonManager(address=('localhost', 3031), authkey=b'c0d3q3y-python')
    m.get_server().serve_forever()