#!/usr/bin/python3
import collections
import html
import http.client
import json
from operator import itemgetter
import re
import socket
import time
import urllib
def strip_html(text):
return html.unescape(re.sub(r'?[a-z]+[^>]*>', '', text))
# Create a new pengine, initialize it with [code] and return Prolog's reply.
# The engine is destroyed after answering one query. If [query] is given, the
# first answer is returned and the engine destroyed.
def create(code='', query='', timeout=10):
opts = {'format': 'json-html', 'destroy': True, 'src_text': code}
if query:
opts['ask'] = query
reply, output = request('POST', '/pengine/create', body=json.dumps(opts), timeout=timeout)
# If query was given, the actual reply is nested in create/destroy objects.
if query:
reply = reply['answer']['data']
return reply, output
def abort(engine):
params = urllib.parse.urlencode({
'id': engine,
'format': 'json-html'})
try:
# We don't care about the answer here, so don't wait for it.
request('GET', '/pengine/abort?' + params, timeout=0.01)
except:
pass
def ask(engine, query, timeout=10):
return send(engine, 'ask(({}),[])'.format(query), timeout=timeout)
def next(engine, n=1, timeout=10):
return send(engine, 'next({})'.format(n), timeout=timeout)
def stop(engine, timeout=10):
return send(engine, 'stop', timeout=timeout)
def destroy(engine):
try:
# We don't care about the answer here, so don't wait for it.
send(engine, 'destroy([force(true)])', timeout=0.01)
except:
pass
def send(engine, event, timeout=10):
params = urllib.parse.urlencode({
'id': engine,
'event': event,
'format': 'json-html'})
return request('GET', path='/pengine/send?' + params, timeout=timeout)
# Return the main reply and pull potential output replies.
address, port = 'localhost', 3030 # TODO put this somewhere sane
def request(method, path, body=None, timeout=10):
headers = {'Content-Type': 'application/json;charset=utf-8'}
messages = []
try:
conn = http.client.HTTPConnection(address, port, timeout=timeout)
conn.request(method, path, body, headers=headers)
while True:
response = conn.getresponse()
if response.status != http.client.OK:
raise Exception('server returned {}'.format(response.status))
reply = json.loads(response.read().decode('utf-8'))
if reply.get('event') == 'output':
messages.append(_get_message(reply))
# Pull the next output. These requests should return instantly
# as no additional processing needs to be done in the pengine.
params = urllib.parse.urlencode({
'id': reply['id'],
'format': 'json-html'})
conn.request('GET', '/pengine/pull_response?' + params, headers=headers)
else:
return reply, messages
finally:
conn.close()
# Strip boilerplate from Prolog messages … ugly.
def _get_message(reply):
data = strip_html(reply['data']).strip()
# Prepend the message with formatted location.
# NOTE in the final version we probably want to return the location object
where = ''
if reply['message'] in ('error', 'warning'):
if 'location' in reply:
loc = reply['location']
where += 'near line ' + str(loc['line'])
if 'ch' in loc:
where += ', character ' + str(loc['ch'])
where += ': '
# Strip HTML and pengine IDs from the messages.
text = strip_html(data)
text = re.sub(r"pengine://[0-9]*/src:[0-9]*: ", '', text)
text = re.sub(r"'[0-9]{10,}':", '', text)
return reply['message'], where + text
# Return the value of variable [name] in the JSON object returned by Prolog.
def get_var(data, name):
for binding in data['variables']:
if name in binding['variables']:
return strip_html(binding['value'])
return None
# Return a string describing variable bindings and residuals in the JSON object
# returned by Prolog.
def pretty_vars(data):
result = []
for binding in data['variables']:
var_list = binding['variables']
value = binding['value']
result.append(' = '.join(var_list) + ' = ' + strip_html(value))
if 'residuals' in data:
result += [strip_html(b) for b in data['residuals']]
return ',\n'.join(result) if result else 'true'
# Get all solutions to [query] given background knowledge [code] that are found
# within [timeout] seconds.
def run(code, query, timeout):
# Returns a tuple ((bindings, constraints), error, more?) for one answer.
def process_answer(reply):
# When an engine is destroyed, a nested data object has the actual
# query result.
if reply['event'] == 'destroy':
reply = reply['data']
if reply['event'] == 'success':
# Return a dictionary {var: value} and a list of constraints (as
# strings) from the JSON object returned by Prolog.
data = reply['data'][0]
bindings = {}
for binding in data['variables']:
value = strip_html(binding['value'])
for name in binding['variables']:
bindings[name] = value
constraints = [strip_html(r) for r in data.get('residuals', [])]
return (bindings, constraints), None, reply['more']
elif reply['event'] == 'failure':
return None, None, False
elif reply['event'] == 'error':
# Remove potential module name (engine ID) from the error message.
error = ('error', reply['data'].replace("'{}':".format(reply['id']), ''))
return None, error, False
start = time.monotonic()
result, messages = [], []
engine = None
try:
# Create a new pengine.
reply, output = create(code=code, timeout=timeout)
messages += output
if reply.get('event') != 'create':
raise Exception('System error: creating pengine')
engine = reply['id']
if 'error' in map(itemgetter(0), messages):
return None, messages
# Run the query.
real_timeout = timeout - (time.monotonic()-start)
if real_timeout <= 0:
raise socket.timeout()
reply, output = ask(engine, query, real_timeout)
messages += output
if 'error' in map(itemgetter(0), output):
return None, messages
bindings, error, more = process_answer(reply)
if bindings:
result.append(bindings)
if error:
messages.append(error)
# Continue while there are more potential answers.
while more:
real_timeout = timeout - (time.monotonic()-start)
if real_timeout <= 0:
raise socket.timeout()
reply, output = next(engine, timeout=real_timeout)
messages += output
bindings, error, more = process_answer(reply)
if bindings:
result.append(bindings)
if error:
messages.append(error)
except socket.timeout as ex:
result.append('timed out')
abort(engine)
finally:
if engine:
destroy(engine)
return result, messages
# Basic sanity check.
if __name__ == '__main__':
answers, messages = run('b(Y). a(X) :- {X > 3}, (X = 5 ; {X > 4}).', 'a(X)', timeout=1)
print(messages)
for bindings, constraints in answers:
print('bindings: {}, constraints: {}'.format(bindings, constraints))