#!/usr/bin/python3 from fcntl import fcntl, F_GETFL, F_SETFL import http.client import json import os import subprocess # For each input, load [code] into a new environment and run it using a Python # runner daemon (so we can get structured results instead of parsing stdout). # Inputs are given as a list of pairs (expression, stdin). Return a list of # answers as tuples (result, stdout, stderr, exception). If the code times out # on some input, 'timed out' is returned instead at the corresponding position # in the answer list. def run(code=None, inputs=None, timeout=1.0): address, port = 'localhost', 3031 # TODO put this somewhere sane def request(path, args=None): headers = {'Content-Type': 'application/json;charset=utf-8'} body = json.dumps(args) try: conn = http.client.HTTPConnection(address, port, timeout=30) conn.request('GET', path, body=body, headers=headers) response = conn.getresponse() if response.status != http.client.OK: raise Exception('server returned {}'.format(response.status)) reply = json.loads(response.read().decode('utf-8')) return reply finally: conn.close() return request('/run', {'code': code, 'inputs': inputs, 'timeout': timeout}) # Start and return a new Python interpreter process. def create(): directory = os.path.dirname(os.path.realpath(__file__)) script = os.path.join(directory, 'runner', 'interpreter.py') p = subprocess.Popen([script], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) # Set the non-blocking flag for stdout. flags = fcntl(p.stdout, F_GETFL) fcntl(p.stdout, F_SETFL, flags | os.O_NONBLOCK) return p # Read any available bytes from the interpreter's stdout. def pull(interpreter): stdout = interpreter.stdout.read() if stdout: stdout = stdout.decode('utf-8') return stdout # Push a string to the interpreter's stdin. def push(interpreter, stdin): interpreter.stdin.write(stdin.encode('utf-8')) interpreter.stdin.flush() # Kill an interpreter process. def destroy(interpreter): interpreter.kill() # Basic sanity check. if __name__ == '__main__': import time code = '''\ import sys def foo(x): y = int(input()) print(x+y) sys.stderr.write('bar') if x+y == 6: while True: pass return x*y ''' inputs = [ ('foo(1)', '2\n'), ('foo(2)', '4\n'), ('foo(3)', '6\n'), ] start = time.monotonic() result = run(code=code, inputs=inputs, timeout=1.0) end = time.monotonic() for r in result['results']: print(r) print('time taken: {:.3f}'.format(end-start))