#!/usr/bin/python3 import collections import http.client import json import re import urllib import xml.etree.ElementTree def strip_html(html_str): return ''.join(xml.etree.ElementTree.fromstring(html_str).itertext()) # Create a new pengine, initialize it with [code] and return Prolog's reply. # The engine is destroyed after answering one query. If [query] is given, the # first answer is returned and the engine destroyed. def create(code='', query=''): opts = {'format': 'json-html', 'destroy': True, 'src_text': code} if query: opts['ask'] = query reply, output = request('POST', '/pengine/create', body=json.dumps(opts)) # If query was given, the actual reply is nested in create/destroy objects. if query: reply = reply['answer']['data'] return reply, output def ask(engine, query): return send(engine, 'ask(({}),[])'.format(query)) def next(engine, n=1): return send(engine, 'next({})'.format(n)) def stop(engine): return send(engine, 'stop') def destroy(engine): return send(engine, 'destroy') def send(engine, event): params = urllib.parse.urlencode({ 'id': engine, 'event': event, 'format': 'json-html'}) return request('GET', '/pengine/send?' + params) # Return the main reply and pull potential output replies. address, port = 'localhost', 3030 # TODO put this somewhere sane def request(method, path, body=None): headers = {'Content-Type': 'application/json;charset=utf-8'} messages = [] try: conn = http.client.HTTPConnection(address, port, timeout=10) conn.request(method, path, body, headers=headers) while True: response = conn.getresponse() if response.status != http.client.OK: raise Exception('server returned {}'.format(response.status)) reply = json.loads(response.read().decode('utf-8')) if reply.get('event') == 'output': messages.append(get_message(reply)) # Request next reply. params = urllib.parse.urlencode({ 'id': reply['id'], 'format': 'json-html'}) conn.request('GET', '/pengine/pull_response?' + params, headers=headers) else: return reply, messages finally: conn.close() # Strip boilerplate from Prolog messages … ugly. def get_message(reply): data = strip_html(reply['data']).strip() message = '' if reply['message'] in ('error', 'warning'): message = reply['message'] + ': ' if 'location' in reply: loc = reply['location'] message += 'near line ' + str(loc['line']) if 'ch' in loc: message += ', character ' + str(loc['ch']) message += ': ' if reply.get('code') == 'syntax_error': match = re.match(r'^.*Syntax error: (.*)$', data, flags=re.DOTALL) data = match.group(1) elif reply.get('code') == 'permission_error': match = re.match(r'^.*(No permission [^\n]*)', data, flags=re.DOTALL) data = match.group(1) elif reply.get('code') == 'type_error': match = re.match(r'^.*(Type error: [^\n]*)', data, flags=re.DOTALL) data = match.group(1) message += data # Replace anonymous variable names with _. message = re.sub(r'_G[0-9]*', '_', message) return reply['message'], message # Return the value of variable [name] in the JSON object returned by Prolog. def get_var(data, name): for binding in data['variables']: if name in binding['variables']: return strip_html(binding['value']) return None # Return a string describing variable bindings and residuals in the JSON object # returned by Prolog. def pretty_vars(data): result = [] for binding in data['variables']: var_list = binding['variables'] value = binding['value'] result.append(' = '.join(var_list) + ' = ' + strip_html(value)) if 'residuals' in data: result += [strip_html(b) for b in data['residuals']] return ',\n'.join(result) if result else 'true' # Basic sanity check. if __name__ == '__main__': engine = create(code='dup([],[]). dup([H|T],[H,H|TT]) :- dup(T,TT).')['id'] print('engine id is ' + engine) print(ask(engine, "run_tests({}, Result)".format('dup/2'))) destroy(engine)