#!/usr/bin/python3
import collections
import html
import http.client
import json
from operator import itemgetter
import re
import socket
import time
import urllib
def strip_html(text):
return html.unescape(re.sub(r'?[a-z]+[^>]*>', '', text))
# Create a new pengine and initialize it with [code]. Return engine ID and a
# list of messages from Prolog.
def create(code='', timeout=10):
opts = {'format': 'json-html', 'destroy': False, 'src_text': code}
reply, output = request('POST', '/pengine/create', body=json.dumps(opts), timeout=timeout)
return reply.get('id'), output
def ask(engine, query, timeout=10):
return send(engine, 'ask(({}),[])'.format(query), timeout=timeout)
def next(engine, n=1, timeout=10):
return send(engine, 'next({})'.format(n), timeout=timeout)
def stop(engine, timeout=10):
return send(engine, 'stop', timeout=timeout)
def destroy(engine):
params = urllib.parse.urlencode({'ids': engine})
try:
# We don't care about the answer here, so don't wait for it.
request('GET', '/pengine/destroy_all?' + params, timeout=0.01)
except:
pass
def send(engine, event, timeout=10):
params = urllib.parse.urlencode({
'id': engine,
'event': event,
'format': 'json-html'})
return request('GET', path='/pengine/send?' + params, timeout=timeout)
# Return the main reply and pull potential output replies.
address, port = 'localhost', 3030 # TODO put this somewhere sane
def request(method, path, body=None, timeout=10):
headers = {'Content-Type': 'application/json;charset=utf-8'}
messages = []
try:
conn = http.client.HTTPConnection(address, port, timeout=timeout)
conn.request(method, path, body, headers=headers)
while True:
response = conn.getresponse()
if response.status != http.client.OK:
raise Exception('server returned {}'.format(response.status))
reply = json.loads(response.read().decode('utf-8'))
if reply.get('event') == 'output':
messages.append(_get_message(reply))
# Pull the next output. These requests should return instantly
# as no additional processing needs to be done in the pengine.
params = urllib.parse.urlencode({
'id': reply['id'],
'format': 'json-html'})
conn.request('GET', '/pengine/pull_response?' + params, headers=headers)
else:
return reply, messages
finally:
conn.close()
# Strip boilerplate from Prolog messages … ugly.
def _get_message(reply):
data = strip_html(reply['data']).strip()
# Prepend the message with formatted location.
# NOTE in the final version we probably want to return the location object
where = ''
if reply['message'] in ('error', 'warning'):
if 'location' in reply:
loc = reply['location']
where += 'near line ' + str(loc['line'])
if 'ch' in loc:
where += ', character ' + str(loc['ch'])
where += ': '
# Strip HTML and pengine IDs from the messages.
text = strip_html(data)
text = re.sub(r"pengine://[0-9]*/src:[0-9]*: ", '', text)
text = re.sub(r"'[0-9]{10,}':", '', text)
return reply['message'], where + text
# Return the value of variable [name] in the JSON object returned by Prolog.
def get_var(data, name):
for binding in data['variables']:
if name in binding['variables']:
return strip_html(binding['value'])
return None
# Return a string describing variable bindings and residuals in the JSON object
# returned by Prolog.
def pretty_vars(data):
result = []
for binding in data['variables']:
var_list = binding['variables']
value = binding['value']
result.append(' = '.join(var_list) + ' = ' + strip_html(value))
if 'residuals' in data:
result += [strip_html(b) for b in data['residuals']]
return ',\n'.join(result) if result else 'true'
# Run [query] in the pengine with id [engine] and return the list of answers
# found within [timeout] seconds. If a timeout occurs before the query is done,
# 'timed out' is appended as the last answer.
def ask_all(engine, query, timeout):
# Returns a tuple ((bindings, constraints), error, more?) for one answer.
def process_answer(reply):
# When an engine is destroyed, a nested data object has the actual
# query result.
if reply['event'] == 'destroy':
reply = reply['data']
if reply['event'] == 'success':
# Return a dictionary {var: value} and a list of constraints (as
# strings) from the JSON object returned by Prolog.
data = reply['data'][0]
bindings = {}
for binding in data['variables']:
value = strip_html(binding['value'])
for name in binding['variables']:
bindings[name] = value
constraints = [strip_html(r) for r in data.get('residuals', [])]
return (bindings, constraints), None, reply['more']
elif reply['event'] == 'failure':
return None, None, False
elif reply['event'] == 'error':
# Remove potential module name (engine ID) from the error message.
error = ('error', reply['data'].replace("'{}':".format(reply['id']), ''))
return None, error, False
start = time.monotonic()
answers, messages = [], []
try:
# Run the query.
reply, output = ask(engine, query, timeout)
messages += output
if 'error' in map(itemgetter(0), output):
return answers, messages
answer, error, more = process_answer(reply)
if answer:
answers.append(answer)
if error:
messages.append(error)
# Continue while there are more potential answers and time remaining.
while more:
real_timeout = timeout - (time.monotonic()-start)
if real_timeout <= 0:
raise socket.timeout()
reply, output = next(engine, timeout=real_timeout)
messages += output
answer, error, more = process_answer(reply)
if answer:
answers.append(answer)
if error:
messages.append(error)
except socket.timeout as ex:
answers.append('timed out')
return answers, messages
# Basic sanity check.
if __name__ == '__main__':
answers, messages = run('b(Y). a(X) :- {X > 3}, (X = 5 ; {X > 4}).', 'a(X)', timeout=1)
print(messages)
for bindings, constraints in answers:
print('bindings: {}, constraints: {}'.format(bindings, constraints))