1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
#!/usr/bin/python3
from fcntl import fcntl, F_GETFL, F_SETFL
import io
import multiprocessing
import multiprocessing.managers
import os
import subprocess
import sys
import threading
import time
import uuid
interpreters = {}
module_access_lock = threading.Lock()
# Execute [code] and evaluate [expr]. Input is given by the string [stdin].
# Return result of evaluation, the contents of stdout and stderr, and the
# exception traceback.
def _run_exec(conn, code, expr=None, stdin=''):
result, out, err, exc = None, None, None, None
sys.stdin = io.StringIO(stdin)
sys.stdout = io.StringIO()
sys.stderr = io.StringIO()
try:
env = {}
if code:
exec(code, env)
if expr:
result = eval(expr, env)
except Exception as ex:
# Exception is not JSON serializable, so return traceback as string.
import traceback
exc = traceback.format_exc()
finally:
out = sys.stdout.getvalue()
err = sys.stderr.getvalue()
sys.stdin.close()
sys.stdout.close()
sys.stderr.close()
conn.send((result, out, err, exc))
class Python(object):
# Call run_exec in a separate process for each input and return a list of
# results from those calls. If a call times out, 'timed out' is returned in
# place of exception traceback.
def run(self, code, inputs, timeout):
# Launch processes.
futures = []
for expr, stdin in inputs:
conn_parent, conn_child = multiprocessing.Pipe()
p = multiprocessing.Process(target=_run_exec, args=(conn_child, code, expr, stdin))
p.start()
futures.append((p, conn_parent))
# Wait for results.
results = []
start = time.monotonic()
for p, conn in futures:
now = time.monotonic()
real_timeout = max(0, timeout - (now - start))
if conn.poll(real_timeout):
results.append(conn.recv())
else:
results.append('timed out')
p.terminate()
return results
# Start and return a new Python interpreter process.
def create(self):
directory = os.path.dirname(os.path.realpath(__file__))
script = os.path.join(directory, 'interpreter.py')
p = subprocess.Popen([script],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
# Set the non-blocking flag for stdout.
flags = fcntl(p.stdout, F_GETFL)
fcntl(p.stdout, F_SETFL, flags | os.O_NONBLOCK)
interpreter_id = uuid.uuid4().hex
with module_access_lock:
interpreters[interpreter_id] = p
return interpreter_id
# Read any available bytes from the interpreter's stdout.
def pull(self, interpreter_id):
with module_access_lock:
interpreter = interpreters[interpreter_id]
stdout = interpreter.stdout.read()
if stdout:
stdout = stdout.decode('utf-8')
return stdout
# Push a string to the interpreter's stdin.
def push(self, interpreter_id, stdin):
with module_access_lock:
interpreter = interpreters[interpreter_id]
interpreter.stdin.write(stdin.encode('utf-8'))
interpreter.stdin.flush()
# Kill an interpreter process.
def destroy(self, interpreter_id):
with module_access_lock:
interpreter = interpreters[interpreter_id]
del interpreters[interpreter_id]
interpreter.kill()
class PythonManager(multiprocessing.managers.BaseManager):
pass
PythonManager.register('Python', callable=Python)
if __name__ == '__main__':
print('Python engine started.')
m = PythonManager(address=('localhost', 3031), authkey=b'c0d3q3y-python')
m.get_server().serve_forever()
|