#!/usr/bin/python3 # CodeQ: an online programming tutor. # Copyright (C) 2015 UL FRI # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) any # later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more # details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import html import http.client import json from operator import itemgetter import re import socket # for socket.timeout exception import time import urllib3 def strip_html(text): return html.unescape(re.sub(r']*>', '', text)) # Create a new pengine and initialize it with [code]. Return engine ID and a # list of messages from Prolog. def create(code='', timeout=10): opts = json.dumps({'format': 'json-html', 'destroy': False, 'src_text': code}) reply, output = request('POST', '/pengine/create', params=opts, timeout=timeout) return reply.get('id'), output def ask(engine, query, timeout=10): # Strip trailing whitespace and periods from the query. query = query.rstrip().rstrip('.') return send(engine, 'ask(({}),[])'.format(query), timeout=timeout) def next(engine, n=1, timeout=10): return send(engine, 'next({})'.format(n), timeout=timeout) def stop(engine, timeout=10): return send(engine, 'stop', timeout=timeout) def destroy(engine): try: # We don't care about the answer here, so don't wait for it. params = {'ids': engine} request('GET', '/pengine/destroy_all', params=params, timeout=0.01) except: pass def send(engine, event, timeout=10): params = {'id': engine, 'event': event, 'format': 'json-html'} return request('GET', path='/pengine/send', params=params, timeout=timeout) # Return the main reply and pull potential output replies. def request(method, path, params=None, timeout=10): try: if method == 'GET': response = conn.request_encode_url(method, path, fields=params, timeout=timeout) else: response = conn.urlopen(method, path, body=params, timeout=timeout) messages = [] while True: if response.status != http.client.OK: raise Exception('server returned {}'.format(response.status)) reply = json.loads(response.data.decode('utf-8')) if isinstance(reply, dict) and reply.get('event') == 'output': messages.append(_get_message(reply)) # Pull the next output. These requests should return instantly # as no additional processing needs to be done in the pengine. params = {'id': reply['id'], 'format': 'json-html'} response = conn.request_encode_url('GET', '/pengine/pull_response', fields=params) else: return reply, messages except urllib3.exceptions.ReadTimeoutError: raise socket.timeout # Strip boilerplate from Prolog messages … ugly. def _get_message(reply): data = strip_html(reply['data']).strip() # Prepend the message with formatted location. # NOTE in the final version we probably want to return the location object where = '' if reply['message'] in ('error', 'warning'): if 'location' in reply: loc = reply['location'] where += 'near line ' + str(loc['line']) if 'ch' in loc: where += ', character ' + str(loc['ch']) where += ': ' # Strip HTML and pengine IDs from the messages. text = strip_html(data) text = re.sub(r"pengine://[0-9a-z-]*/src:([0-9]*:)* ", '', text) text = re.sub(r"'[0-9]{10,}':", '', text) return reply['message'], where + text # Return the value of variable [name] in the JSON object returned by Prolog. def get_var(data, name): for binding in data['variables']: if name in binding['variables']: return strip_html(binding['value']) return None # Return a string describing variable bindings and residuals in the JSON object # returned by Prolog. def pretty_vars(data): result = [] for binding in data['variables']: var_list = binding['variables'] value = binding['value'] result.append(' = '.join(var_list) + ' = ' + strip_html(value)) if 'residuals' in data: result += [strip_html(b) for b in data['residuals']] return ',\n'.join(result) if result else 'true' # Run [query] in pengine with ID [engine] and check whether all solutions # specified by [answers] (a list of binding dictionaries) are returned. This # function succeeds if [query] finds each solution in [answers] at least once # within [timeout] seconds, and fails when it finds any other solution. def check_answers(engine, query, answers, timeout=10): seen = [] # Return false (only) if there is no expected answer where values of all # specified variables match values in [answer]. def check_answer(answer): if not answer: return True bindings, constraints = answer for expected in answers: if all(bindings.get(var) == val for var, val in expected.items()): if expected not in seen: seen.append(expected) return True return False start = time.monotonic() try: # Limit inferences for each solution to curb unbounded recursion. limited = '''(call_with_inference_limit(({}), 500000, _ILR), _ILR \= inference_limit_exceeded)'''.format(query) # Run the query. reply, output = ask(engine, limited, timeout) answer, error, more = process_answer(reply) if not check_answer(answer): return False # Continue while there are more potential answers and time remaining. while more: real_timeout = timeout - (time.monotonic()-start) if real_timeout <= 0: break reply, output = next(engine, timeout=real_timeout) answer, error, more = process_answer(reply) if not check_answer(answer): return False except: pass return len(seen) == len(answers) # Run [query] in the pengine with id [engine] and return the first answer only # found within [timeout] seconds. # used for quick hint debugging by Sasha def ask_one(engine, query, timeout=1): # Returns either an error message, true, false, timeout (see below), or bindings # Timeout is "returned" as an unhandled exception -- deliberately so # Run the query. reply, output = ask(engine, query, timeout) if 'error' in map(itemgetter(0), output): return 'query syntax error' # TODO: Tim, this doesn't really work. It just proceeds and returns as a runtime error. answer, error, more = process_answer(reply) if error: # query caused a runtime error return 'runtime error: ' + error[-1] elif answer: # no error, query satisfied if not answer[0]: # empty bindings return 'true' else: return answer[0] # for now return just bindings without constraints else: # no error, query failed return 'false' def ask_truth(engine, query, timeout=1): # Returns either True or False # (Runtime) error is False! # Timeout is an unhandled exception -- deliberately so # Run the query. reply, output = ask(engine, query, timeout) if 'error' in map(itemgetter(0), output): return False # TODO: Tim, this doesn't really work. It just proceeds and returns as a runtime error. answer, error, more = process_answer(reply) if error: # query caused a runtime error return False elif answer: return True else: # no error, query failed return False def ask_truthTO(engine, query, timeout=1): # Returns either True or False # (Runtime) error is False! # Timeout *is* handled: returned as False # Run the query. try: reply, output = ask(engine, query, timeout) except socket.timeout as ex: return False if 'error' in map(itemgetter(0), output): return False # TODO: Tim, this doesn't really work. It just proceeds and returns as a runtime error. answer, error, more = process_answer(reply) if error: # query caused a runtime error return False elif answer: return True else: # no error, query failed return False # reformats prolog's answer into a neater form def process_answer(reply): # When an engine is destroyed, a nested data object has the actual # query result. if reply['event'] == 'destroy': reply = reply['data'] if reply['event'] == 'success': # Return a dictionary {var: value} and a list of constraints (as # strings) from the JSON object returned by Prolog. data = reply['data'][0] bindings = {} for binding in data['variables']: value = strip_html(binding['value']) for name in binding['variables']: bindings[name] = value constraints = [strip_html(r) for r in data.get('residuals', [])] return (bindings, constraints), None, reply['more'] elif reply['event'] == 'failure': return None, None, False elif reply['event'] == 'error': # Remove potential module name (engine ID) from the error message. error = ('error', reply['data'].replace("'{}':".format(reply['id']), '')) return None, error, False address, port = '127.0.0.1', 3030 # TODO put this somewhere sane conn = urllib3.connectionpool.HTTPConnectionPool( host=address, port=port, timeout=10, maxsize=100, headers={'Content-Type': 'application/json;charset=utf-8'}, retries=False) # Basic sanity check. if __name__ == '__main__': engine, messages = create('b(Y). a(X) :- {X > 3}, (X = 5 ; {X > 4}).') print(engine, messages) if engine is not None: reply = ask(engine, 'a(X)', timeout=1.0) print(reply) destroy(engine)