#!/usr/bin/python3 import collections import html import http.client import json from operator import itemgetter import re import time import urllib def strip_html(text): return html.unescape(re.sub(r']*>', '', text)) # Create a new pengine and initialize it with [code]. Return engine ID and a # list of messages from Prolog. def create(code='', timeout=10): opts = {'format': 'json-html', 'destroy': False, 'src_text': code} reply, output = request('POST', '/pengine/create', body=json.dumps(opts), timeout=timeout) return reply.get('id'), output def ask(engine, query, timeout=10): # Strip trailing whitespace and periods from the query. query = query.rstrip().rstrip('.') return send(engine, 'ask(({}),[])'.format(query), timeout=timeout) def next(engine, n=1, timeout=10): return send(engine, 'next({})'.format(n), timeout=timeout) def stop(engine, timeout=10): return send(engine, 'stop', timeout=timeout) def destroy(engine): params = urllib.parse.urlencode({'ids': engine}) try: # We don't care about the answer here, so don't wait for it. request('GET', '/pengine/destroy_all?' + params, timeout=0.01) except: pass def send(engine, event, timeout=10): params = urllib.parse.urlencode({ 'id': engine, 'event': event, 'format': 'json-html'}) return request('GET', path='/pengine/send?' + params, timeout=timeout) # Return the main reply and pull potential output replies. address, port = '127.0.0.1', 3030 # TODO put this somewhere sane def request(method, path, body=None, timeout=10): headers = {'Content-Type': 'application/json;charset=utf-8'} messages = [] try: conn = http.client.HTTPConnection(address, port, timeout=timeout) conn.request(method, path, body, headers=headers) while True: response = conn.getresponse() if response.status != http.client.OK: raise Exception('server returned {}'.format(response.status)) reply = json.loads(response.read().decode('utf-8')) if reply.get('event') == 'output': messages.append(_get_message(reply)) # Pull the next output. These requests should return instantly # as no additional processing needs to be done in the pengine. params = urllib.parse.urlencode({ 'id': reply['id'], 'format': 'json-html'}) conn.request('GET', '/pengine/pull_response?' + params, headers=headers) else: return reply, messages finally: conn.close() # Strip boilerplate from Prolog messages … ugly. def _get_message(reply): data = strip_html(reply['data']).strip() # Prepend the message with formatted location. # NOTE in the final version we probably want to return the location object where = '' if reply['message'] in ('error', 'warning'): if 'location' in reply: loc = reply['location'] where += 'near line ' + str(loc['line']) if 'ch' in loc: where += ', character ' + str(loc['ch']) where += ': ' # Strip HTML and pengine IDs from the messages. text = strip_html(data) text = re.sub(r"pengine://[0-9]*/src:([0-9]*:)* ", '', text) text = re.sub(r"'[0-9]{10,}':", '', text) return reply['message'], where + text # Return the value of variable [name] in the JSON object returned by Prolog. def get_var(data, name): for binding in data['variables']: if name in binding['variables']: return strip_html(binding['value']) return None # Return a string describing variable bindings and residuals in the JSON object # returned by Prolog. def pretty_vars(data): result = [] for binding in data['variables']: var_list = binding['variables'] value = binding['value'] result.append(' = '.join(var_list) + ' = ' + strip_html(value)) if 'residuals' in data: result += [strip_html(b) for b in data['residuals']] return ',\n'.join(result) if result else 'true' # Run [query] in pengine with ID [engine] and check whether all solutions # specified by [answers] (a list of binding dictionaries) are returned. This # function succeeds if [query] finds each solution in [answers] at least once # within [timeout] seconds, and fails when it finds any other solution. def check_answers(engine, query, answers, timeout=10): seen = [] start = time.monotonic() try: # Run the query. reply, output = ask(engine, query, timeout) answer, error, more = process_answer(reply) if answer: bindings, constraints = answer if bindings not in answers: return False if bindings not in seen: seen.append(bindings) # Continue while there are more potential answers and time remaining. while more: real_timeout = timeout - (time.monotonic()-start) if real_timeout <= 0: break reply, output = next(engine, timeout=real_timeout) answer, error, more = process_answer(reply) if answer: bindings, constraints = answer if bindings not in answers: return False if bindings not in seen: seen.append(bindings) except: pass return len(seen) == len(answers) # Run [query] in the pengine with id [engine] and return the first answer only # found within [timeout] seconds. # used for quick hint debugging by Sasha def ask_one(engine, query, timeout=1): # Returns either an error message, true, false, timeout (see below), or bindings # Timeout is "returned" as an unhandled exception -- deliberately so # Run the query. reply, output = ask(engine, query, timeout) if 'error' in map(itemgetter(0), output): return 'query syntax error' # TODO: Tim, this doesn't really work. It just proceeds and returns as a runtime error. answer, error, more = process_answer(reply) if error: # query caused a runtime error return 'runtime error: ' + error[-1] elif answer: # no error, query satisfied if not answer[0]: # empty bindings return 'true' else: return answer[0] # for now return just bindings without constraints else: # no error, query failed return 'false' def ask_truth(engine, query, timeout=1): # Returns either True or False # (Runtime) error is False! # Timeout is an unhandled exception -- deliberately so # Run the query. reply, output = ask(engine, query, timeout) if 'error' in map(itemgetter(0), output): return False # TODO: Tim, this doesn't really work. It just proceeds and returns as a runtime error. answer, error, more = process_answer(reply) if error: # query caused a runtime error return False elif answer: return True else: # no error, query failed return False # reformats prolog's answer into a neater form def process_answer(reply): # When an engine is destroyed, a nested data object has the actual # query result. if reply['event'] == 'destroy': reply = reply['data'] if reply['event'] == 'success': # Return a dictionary {var: value} and a list of constraints (as # strings) from the JSON object returned by Prolog. data = reply['data'][0] bindings = {} for binding in data['variables']: value = strip_html(binding['value']) for name in binding['variables']: bindings[name] = value constraints = [strip_html(r) for r in data.get('residuals', [])] return (bindings, constraints), None, reply['more'] elif reply['event'] == 'failure': return None, None, False elif reply['event'] == 'error': # Remove potential module name (engine ID) from the error message. error = ('error', reply['data'].replace("'{}':".format(reply['id']), '')) return None, error, False # Basic sanity check. if __name__ == '__main__': answers, messages = run('b(Y). a(X) :- {X > 3}, (X = 5 ; {X > 4}).', 'a(X)', timeout=1) print(messages) for bindings, constraints in answers: print('bindings: {}, constraints: {}'.format(bindings, constraints))