1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
#!/usr/bin/python3
import collections
import http.client
import json
import re
import urllib
import xml.etree.ElementTree
def strip_html(html_str):
return ''.join(xml.etree.ElementTree.fromstring(html_str).itertext())
# Create a new pengine, initialize it with [code] and return Prolog's reply.
# The engine is destroyed after answering one query. If [query] is given, the
# first answer is returned and the engine destroyed.
def create(code='', query=''):
opts = {'format': 'json-html', 'destroy': True, 'src_text': code}
if query:
opts['ask'] = query
reply, output = request('POST', '/pengine/create', body=json.dumps(opts))
# If query was given, the actual reply is nested in create/destroy objects.
if query:
reply = reply['answer']['data']
return reply, output
def ask(engine, query):
return send(engine, 'ask(({}),[])'.format(query))
def next(engine, n=1):
return send(engine, 'next({})'.format(n))
def stop(engine):
return send(engine, 'stop')
def destroy(engine):
return send(engine, 'destroy')
def send(engine, event):
params = urllib.parse.urlencode({
'id': engine,
'event': event,
'format': 'json-html'})
return request('GET', '/pengine/send?' + params)
# Return the main reply and pull potential output replies.
address, port = 'localhost', 3030 # TODO put this somewhere sane
def request(method, path, body=None):
headers = {'Content-Type': 'application/json;charset=utf-8'}
messages = []
try:
conn = http.client.HTTPConnection(address, port, timeout=10)
conn.request(method, path, body, headers=headers)
while True:
response = conn.getresponse()
if response.status != http.client.OK:
raise Exception('server returned {}'.format(response.status))
reply = json.loads(response.read().decode('utf-8'))
if reply.get('event') == 'output':
messages.append(get_message(reply))
# Request next reply.
params = urllib.parse.urlencode({
'id': reply['id'],
'format': 'json-html'})
conn.request('GET', '/pengine/pull_response?' + params, headers=headers)
else:
return reply, messages
finally:
conn.close()
# Strip boilerplate from Prolog messages … ugly.
def get_message(reply):
data = strip_html(reply['data']).strip()
message = ''
if reply['message'] in ('error', 'warning'):
message = reply['message'] + ': '
if 'location' in reply:
loc = reply['location']
message += 'near line ' + str(loc['line'])
if 'ch' in loc:
message += ', character ' + str(loc['ch'])
message += ': '
if reply.get('code') == 'syntax_error':
match = re.match(r'^.*Syntax error: (.*)$', data, flags=re.DOTALL)
data = match.group(1)
elif reply.get('code') == 'permission_error':
match = re.match(r'^.*(No permission [^\n]*)', data, flags=re.DOTALL)
data = match.group(1)
elif reply.get('code') == 'type_error':
match = re.match(r'^.*(Type error: [^\n]*)', data, flags=re.DOTALL)
data = match.group(1)
message += data
# Replace anonymous variable names with _.
message = re.sub(r'_G[0-9]*', '_', message)
return reply['message'], message
# Return the value of variable [name] in the JSON object returned by Prolog.
def get_var(data, name):
for binding in data['variables']:
if name in binding['variables']:
return strip_html(binding['value'])
return None
# Return a string describing variable bindings and residuals in the JSON object
# returned by Prolog.
def pretty_vars(data):
result = []
for binding in data['variables']:
var_list = binding['variables']
value = binding['value']
result.append(' = '.join(var_list) + ' = ' + strip_html(value))
if 'residuals' in data:
result += [strip_html(b) for b in data['residuals']]
return ',\n'.join(result) if result else 'true'
# Basic sanity check.
if __name__ == '__main__':
engine = create(code='dup([],[]). dup([H|T],[H,H|TT]) :- dup(T,TT).')['id']
print('engine id is ' + engine)
print(ask(engine, "run_tests({}, Result)".format('dup/2')))
destroy(engine)
|