diff options
author | Timotej Lazar <timotej.lazar@fri.uni-lj.si> | 2015-10-10 17:21:42 +0200 |
---|---|---|
committer | Timotej Lazar <timotej.lazar@fri.uni-lj.si> | 2015-10-10 17:21:42 +0200 |
commit | 2f9a24b250bb8f457765a3b2ac621a4176953f74 (patch) | |
tree | c85d34e454e7718b34476b03952627d7ae2e139d /server | |
parent | 0c45b02bdbb1e4be10aa82422f419d3cc4bbe7c9 (diff) |
Use a selector instead of polling Python interpreter
Diffstat (limited to 'server')
-rw-r--r-- | server/python_session.py | 103 |
1 files changed, 62 insertions, 41 deletions
diff --git a/server/python_session.py b/server/python_session.py index f4c482c..e1be4ba 100644 --- a/server/python_session.py +++ b/server/python_session.py @@ -1,11 +1,10 @@ # coding=utf-8 -import ast from fcntl import fcntl, F_GETFL, F_SETFL import io import multiprocessing import os -import queue +import selectors import signal import subprocess import sys @@ -28,9 +27,9 @@ class PythonSession(server.LanguageSession): self._access_lock = threading.Lock() self._sent_hints = [] - self._control = queue.Queue() + self._notifier, receiver = multiprocessing.Pipe() self._interpreter = threading.Thread(target=_interpreter, - kwargs={'control': self._control, 'callback': output_cb}) + kwargs={'control': receiver, 'callback': output_cb}) self._interpreter.start() def run(self, code=None, inputs=None, timeout=1.0): @@ -56,16 +55,16 @@ class PythonSession(server.LanguageSession): return results def exec(self, program): - self._control.put_nowait(('exec', program)) + self._notifier.send(('exec', program)) def push(self, stdin): - self._control.put_nowait(('push', stdin)) + self._notifier.send(('push', stdin)) def stop(self): - self._control.put_nowait(('stop', None)) + self._notifier.send(('stop', None)) def destroy(self): - self._control.put_nowait(('done', None)) + self._notifier.send(('done', None)) def __del__(self): self.destroy() @@ -132,21 +131,23 @@ def _interpreter(control, callback): args = [script] kill = lambda proc, sig: proc.send_signal(sig) + done = False proc = None - while True: - # Ensure the interpreter process is running. - if proc is None: - proc = subprocess.Popen(args, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - # Set the non-blocking flag for stdout. - flags = fcntl(proc.stdout.fileno(), F_GETFL) - fcntl(proc.stdout.fileno(), F_SETFL, flags | os.O_NONBLOCK) + # Remember how much text and how many newlines we received this second; if + # it is too much, kill the interpreter. + # TODO this is a hack to prevent the JS console from becoming unresponsive, + # it should be fixed there. + now = 0 + length = newlines = 0 + + selector = selectors.DefaultSelector() + + def command(conn): + nonlocal proc, done # Get a control command. try: - cmd, data = control.get_nowait() + cmd, data = conn.recv() if cmd == 'exec': exec_str = 'exec("""\\\n{}\n""")\n'.format(data.replace('"', '\\"')) proc.stdin.write(exec_str.encode('utf-8')) @@ -157,37 +158,57 @@ def _interpreter(control, callback): elif cmd == 'stop': kill(proc, signal.SIGINT) elif cmd == 'done': - break + done = True except: pass + def communicate(conn): + nonlocal proc, callback, now, length, newlines + if time.monotonic() - now > 1.0: + length = newlines = 0 + now = time.monotonic() + # Communicate with child process. - retcode = proc.poll() - if retcode is None: - data = proc.stdout.read() - if data: - if len(data) > 20000: - kill(proc, signal.SIGKILL) - proc = None - callback('Child killed for talking too much.\n') - else: - # NOTE this might fail if read() stops in the middle of utf8 sequence - text = data.decode('utf-8') - if text: - callback(text) + data = proc.stdout.read1(1024) + if data and length < 100000 and newlines < 1000: + # NOTE this might fail if read() stops in the middle of utf8 sequence + text = data.decode('utf-8') + if text: + callback(text) + length += len(text) + newlines += text.count('\n') else: - if retcode == -9: # killed by ulimit - callback('Child killed due to overconsumption.\n') - elif retcode == -31: # killed by seccomp - callback('Child killed due to sandbox misbehavior.\n') - else: - callback('Child exited with status {}.\n'.format(retcode)) + selector.unregister(conn) + if proc.poll() is None: + # Process has not terminated yet, make sure it does. + kill(proc, signal.SIGKILL) proc = None + length = newlines = 0 + callback('Interpreter restarted.\n') + + selector.register(control, selectors.EVENT_READ, command) + while not done: + # Ensure the interpreter process is running. + if proc is None: + proc = subprocess.Popen(args, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + # Set the non-blocking flag for stdout. + flags = fcntl(proc.stdout.fileno(), F_GETFL) + fcntl(proc.stdout.fileno(), F_SETFL, flags | os.O_NONBLOCK) + selector.register(proc.stdout, selectors.EVENT_READ, communicate) - # TODO we should select() on control and proc.stdout instead of polling - time.sleep(0.1) + events = selector.select() + for key, mask in events: + if mask & selectors.EVENT_READ: + try: + key.data(key.fileobj) + except: + pass # We are done, kill the child. + selector.close() if proc is not None: kill(proc, signal.SIGKILL) |