#!/usr/bin/python3 import collections import html import http.client import json from operator import itemgetter import re import socket import time import urllib def strip_html(text): return html.unescape(re.sub(r']*>', '', text)) # Create a new pengine, initialize it with [code] and return Prolog's reply. # The engine is destroyed after answering one query. If [query] is given, the # first answer is returned and the engine destroyed. def create(code='', query='', timeout=10): opts = {'format': 'json-html', 'destroy': True, 'src_text': code} if query: opts['ask'] = query reply, output = request('POST', '/pengine/create', body=json.dumps(opts), timeout=timeout) # If query was given, the actual reply is nested in create/destroy objects. if query: reply = reply['answer']['data'] return reply, output def abort(engine): params = urllib.parse.urlencode({ 'id': engine, 'format': 'json-html'}) try: # We don't care about the answer here, so don't wait for it. request('GET', '/pengine/abort?' + params, timeout=0.01) except: pass def ask(engine, query, timeout=10): return send(engine, 'ask(({}),[])'.format(query), timeout=timeout) def next(engine, n=1, timeout=10): return send(engine, 'next({})'.format(n), timeout=timeout) def stop(engine, timeout=10): return send(engine, 'stop', timeout=timeout) def destroy(engine): try: # We don't care about the answer here, so don't wait for it. send(engine, 'destroy([force(true)])', timeout=0.01) except: pass def send(engine, event, timeout=10): params = urllib.parse.urlencode({ 'id': engine, 'event': event, 'format': 'json-html'}) return request('GET', path='/pengine/send?' + params, timeout=timeout) # Return the main reply and pull potential output replies. address, port = 'localhost', 3030 # TODO put this somewhere sane def request(method, path, body=None, timeout=10): headers = {'Content-Type': 'application/json;charset=utf-8'} messages = [] try: conn = http.client.HTTPConnection(address, port, timeout=timeout) conn.request(method, path, body, headers=headers) while True: response = conn.getresponse() if response.status != http.client.OK: raise Exception('server returned {}'.format(response.status)) reply = json.loads(response.read().decode('utf-8')) if reply.get('event') == 'output': messages.append(_get_message(reply)) # Pull the next output. These requests should return instantly # as no additional processing needs to be done in the pengine. params = urllib.parse.urlencode({ 'id': reply['id'], 'format': 'json-html'}) conn.request('GET', '/pengine/pull_response?' + params, headers=headers) else: return reply, messages finally: conn.close() # Strip boilerplate from Prolog messages … ugly. def _get_message(reply): data = strip_html(reply['data']).strip() # Prepend the message with formatted location. # NOTE in the final version we probably want to return the location object where = '' if reply['message'] in ('error', 'warning'): if 'location' in reply: loc = reply['location'] where += 'near line ' + str(loc['line']) if 'ch' in loc: where += ', character ' + str(loc['ch']) where += ': ' # Strip HTML and pengine IDs from the messages. text = strip_html(data) text = re.sub(r"pengine://[0-9]*/src:[0-9]*: ", '', text) text = re.sub(r"'[0-9]{10,}':", '', text) return reply['message'], where + text # Return the value of variable [name] in the JSON object returned by Prolog. def get_var(data, name): for binding in data['variables']: if name in binding['variables']: return strip_html(binding['value']) return None # Return a string describing variable bindings and residuals in the JSON object # returned by Prolog. def pretty_vars(data): result = [] for binding in data['variables']: var_list = binding['variables'] value = binding['value'] result.append(' = '.join(var_list) + ' = ' + strip_html(value)) if 'residuals' in data: result += [strip_html(b) for b in data['residuals']] return ',\n'.join(result) if result else 'true' # Get all solutions to [query] given background knowledge [code] that are found # within [timeout] seconds. def run(code, query, timeout): # Returns a tuple ((bindings, constraints), error, more?) for one answer. def process_answer(reply): # When an engine is destroyed, a nested data object has the actual # query result. if reply['event'] == 'destroy': reply = reply['data'] if reply['event'] == 'success': # Return a dictionary {var: value} and a list of constraints (as # strings) from the JSON object returned by Prolog. data = reply['data'][0] bindings = {} for binding in data['variables']: value = strip_html(binding['value']) for name in binding['variables']: bindings[name] = value constraints = [strip_html(r) for r in data.get('residuals', [])] return (bindings, constraints), None, reply['more'] elif reply['event'] == 'failure': return None, None, False elif reply['event'] == 'error': # Remove potential module name (engine ID) from the error message. error = ('error', reply['data'].replace("'{}':".format(reply['id']), '')) return None, error, False start = time.monotonic() result, messages = [], [] engine = None try: # Create a new pengine. reply, output = create(code=code, timeout=timeout) messages += output if reply.get('event') != 'create': raise Exception('System error: creating pengine') engine = reply['id'] if 'error' in map(itemgetter(0), messages): return None, messages # Run the query. real_timeout = timeout - (time.monotonic()-start) if real_timeout <= 0: raise socket.timeout() reply, output = ask(engine, query, real_timeout) messages += output if 'error' in map(itemgetter(0), output): return None, messages bindings, error, more = process_answer(reply) if bindings: result.append(bindings) if error: messages.append(error) # Continue while there are more potential answers. while more: real_timeout = timeout - (time.monotonic()-start) if real_timeout <= 0: raise socket.timeout() reply, output = next(engine, timeout=real_timeout) messages += output bindings, error, more = process_answer(reply) if bindings: result.append(bindings) if error: messages.append(error) except socket.timeout as ex: result.append('timeout') abort(engine) finally: if engine: destroy(engine) return result, messages # Basic sanity check. if __name__ == '__main__': answers, messages = run('b(Y). a(X) :- {X > 3}, (X = 5 ; {X > 4}).', 'a(X)', timeout=1) print(messages) for bindings, constraints in answers: print('bindings: {}, constraints: {}'.format(bindings, constraints))