# coding=utf-8 from fcntl import fcntl, F_GETFL, F_SETFL import io import multiprocessing import os import selectors import signal import subprocess import sys import threading import time import server from db.models import Problem __all__ = ['PythonSession'] class PythonSession(server.LanguageSession): """Abstracts a Python session. Only public methods are available to the outside world due to the use of multiprocessing managers. Therefore prefix any private methods with an underscore (_). No properties are accessible; use getters and setters instead. Values are passed by value instead of by reference (deep copy!). """ def __init__(self, output_cb=None): self._access_lock = threading.Lock() self._sent_hints = [] self._notifier, receiver = multiprocessing.Pipe() self._interpreter = threading.Thread(target=_interpreter, kwargs={'control': receiver, 'callback': output_cb}) self._interpreter.start() def run(self, code=None, inputs=None, timeout=1.0): # Launch processes. futures = [] for expr, stdin in inputs: conn_parent, conn_child = multiprocessing.Pipe() p = multiprocessing.Process(target=_run_exec, args=(conn_child, code, expr, stdin)) p.start() futures.append((p, conn_parent)) # Wait for results. results = [] start = time.monotonic() for p, conn in futures: now = time.monotonic() real_timeout = max(0, timeout - (now - start)) if conn.poll(real_timeout): results.append(conn.recv()) else: results.append((None, None, None, 'timed out')) p.terminate() return results def exec(self, program): self._notifier.send(('exec', program)) def push(self, stdin): self._notifier.send(('push', stdin)) def stop(self): self._notifier.send(('stop', None)) def destroy(self): self._notifier.send(('done', None)) def __del__(self): self.destroy() def hint(self, sid, problem_id, program): language, problem_group, problem = Problem.get_identifier(problem_id) language_module = server.problems.load_language(language, 'common') problem_module = server.problems.load_problem(language, problem_group, problem, 'common') hints = [] # check if the program is already correct passed, _ = problem_module.test(self.run, program) if passed: hints = [{'id': 'program_already_correct'}] if not hints and hasattr(language_module, 'hint'): hints = language_module.hint(self.run, program) if not hints and hasattr(problem_module, 'hint'): hints = problem_module.hint(self.run, program) if not hints: hints = [{'id': 'no_hint'}] self._instantiate_and_save_hints(language_module, problem_module, hints) return hints def test(self, sid, problem_id, program): language, problem_group, problem = Problem.get_identifier(problem_id) language_module = server.problems.load_language(language, 'common') problem_module = server.problems.load_problem(language, problem_group, problem, 'common') try: passed, hints = problem_module.test(self.run, program) except AttributeError as ex: hints = [{'id': 'system_error', 'args': {'message': 'test function does not exist'}}] self._instantiate_and_save_hints(language_module, problem_module, hints) return hints # Add hint parameters (such as message index) based on hint class. Append # the finalized hints to the list of sent hints. def _instantiate_and_save_hints(self, language_mod, problem_mod, hints): with self._access_lock: for hint in hints: for mod in [language_mod, problem_mod]: if hasattr(mod, 'hint_type') and hint['id'] in mod.hint_type: hint_type = mod.hint_type[hint['id']] hint_type.instantiate(hint, self._sent_hints) self._sent_hints.extend(hints) def _interpreter(control, callback): basedir = os.path.split(os.path.dirname(os.path.realpath(__file__)))[0] script = os.path.join(basedir, 'python', 'runner', 'interpreter.py') # If the sandbox wrapper exists, use it to switch to user "nobody" and # enforce additional limits. Unless the daemon is running as root we are # not able to signal nobody's PIDs, so switch user again for the killing. sandbox = os.path.join(basedir, 'python', 'runner', 'sandbox') terminator = os.path.join(basedir, 'python', 'runner', 'terminator') if os.path.exists(sandbox) and os.path.exists(terminator): newuser = 'nobody' # TODO make this configurable args = [sandbox, newuser, script] kill = lambda proc, sig: subprocess.call([terminator, newuser, str(proc.pid), str(sig)]) else: args = [script] kill = lambda proc, sig: proc.send_signal(sig) done = False proc = None # Remember how much text and how many newlines we received this second; if # it is too much, kill the interpreter. # TODO this is a hack to prevent the JS console from becoming unresponsive, # it should be fixed there. now = 0 length = newlines = 0 selector = selectors.DefaultSelector() def command(conn): nonlocal proc, done # Get a control command. try: cmd, data = conn.recv() if cmd == 'exec': exec_str = 'exec("""\\\n{}\n""")\n'.format(data.replace('"', '\\"')) proc.stdin.write(exec_str.encode('utf-8')) proc.stdin.flush() elif cmd == 'push': proc.stdin.write(data.encode('utf-8')) proc.stdin.flush() elif cmd == 'stop': kill(proc, signal.SIGINT) elif cmd == 'done': done = True except: pass def communicate(conn): nonlocal proc, callback, now, length, newlines if time.monotonic() - now > 1.0: length = newlines = 0 now = time.monotonic() # Communicate with child process. data = proc.stdout.read1(1024) if data and length < 100000 and newlines < 1000: # NOTE this might fail if read() stops in the middle of utf8 sequence text = data.decode('utf-8') if text: callback(text) length += len(text) newlines += text.count('\n') else: selector.unregister(conn) if proc.poll() is None: # Process has not terminated yet, make sure it does. kill(proc, signal.SIGKILL) proc = None length = newlines = 0 callback('Interpreter restarted.\n') selector.register(control, selectors.EVENT_READ, command) while not done: # Ensure the interpreter process is running. if proc is None: proc = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) # Set the non-blocking flag for stdout. flags = fcntl(proc.stdout.fileno(), F_GETFL) fcntl(proc.stdout.fileno(), F_SETFL, flags | os.O_NONBLOCK) selector.register(proc.stdout, selectors.EVENT_READ, communicate) events = selector.select() for key, mask in events: if mask & selectors.EVENT_READ: try: key.data(key.fileobj) except: pass # We are done, kill the child. selector.close() if proc is not None: kill(proc, signal.SIGKILL) # Execute [code] and evaluate [expr]. Input is given by the string [stdin]. # Return result of evaluation, the contents of stdout and stderr, and the # exception traceback. # TODO sandbox this def _run_exec(conn, code, expr=None, stdin=''): result, out, err, exc = None, None, None, None sys.stdin = io.StringIO(stdin) sys.stdout = io.StringIO() sys.stderr = io.StringIO() try: env = {} if code: exec(code, env) if expr: result = eval(expr, env) except Exception as ex: # Exception is not JSON serializable, so return traceback as string # (without the first entry, which is this function). import traceback e_type, e_value, e_tb = sys.exc_info() stack = traceback.extract_tb(e_tb) exc = ''.join( ['Traceback (most recent call last):\n'] + [' line {}, in {}\n'.format(lineno, name) + (line+'\n' if line else '') for filename, lineno, name, line in stack[1:]] + traceback.format_exception_only(e_type, e_value) ).rstrip() finally: out = sys.stdout.getvalue() err = sys.stderr.getvalue() sys.stdin.close() sys.stdout.close() sys.stderr.close() conn.send((result, out, err, exc)) server.language_session_handlers['python'] = lambda user_session, problem_id, language_identifier, group_identifier, problem_identifier: PythonSession(lambda text: user_session.send({'event': 'terminal_output', 'text': text}))