1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
#!/usr/bin/python3
from fcntl import fcntl, F_GETFL, F_SETFL
import http.client
import json
import os
import subprocess
# For each input, load [code] into a new environment and run it using a Python
# runner daemon (so we can get structured results instead of parsing stdout).
# Inputs are given as a list of pairs (expression, stdin). Return a list of
# answers as tuples (result, stdout, stderr, exception). If the code times out
# on some input, 'timed out' is returned instead at the corresponding position
# in the answer list.
def run(code=None, inputs=None, timeout=1.0):
address, port = 'localhost', 3031 # TODO put this somewhere sane
def request(path, args=None):
headers = {'Content-Type': 'application/json;charset=utf-8'}
body = json.dumps(args)
try:
conn = http.client.HTTPConnection(address, port, timeout=30)
conn.request('GET', path, body=body, headers=headers)
response = conn.getresponse()
if response.status != http.client.OK:
raise Exception('server returned {}'.format(response.status))
reply = json.loads(response.read().decode('utf-8'))
return reply
finally:
conn.close()
return request('/run', {'code': code, 'inputs': inputs, 'timeout': timeout})
# Start and return a new Python interpreter process.
def create():
directory = os.path.dirname(os.path.realpath(__file__))
script = os.path.join(directory, 'runner', 'interpreter.py')
p = subprocess.Popen([script],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
# Set the non-blocking flag for stdout.
flags = fcntl(p.stdout, F_GETFL)
fcntl(p.stdout, F_SETFL, flags | os.O_NONBLOCK)
return p
# Read any available bytes from the interpreter's stdout.
def pull(interpreter):
stdout = interpreter.stdout.read()
if stdout:
stdout = stdout.decode('utf-8')
return stdout
# Push a string to the interpreter's stdin.
def push(interpreter, stdin):
interpreter.stdin.write(stdin.encode('utf-8'))
interpreter.stdin.flush()
# Kill an interpreter process.
def destroy(interpreter):
interpreter.kill()
# Basic sanity check.
if __name__ == '__main__':
import time
code = '''\
import sys
def foo(x):
y = int(input())
print(x+y)
sys.stderr.write('bar')
if x+y == 6:
while True:
pass
return x*y
'''
inputs = [
('foo(1)', '2\n'),
('foo(2)', '4\n'),
('foo(3)', '6\n'),
]
start = time.monotonic()
result = run(code=code, inputs=inputs, timeout=1.0)
end = time.monotonic()
for r in result['results']:
print(r)
print('time taken: {:.3f}'.format(end-start))
|