summaryrefslogtreecommitdiff
path: root/monkey/prolog/engine.py
diff options
context:
space:
mode:
Diffstat (limited to 'monkey/prolog/engine.py')
-rw-r--r--monkey/prolog/engine.py135
1 files changed, 135 insertions, 0 deletions
diff --git a/monkey/prolog/engine.py b/monkey/prolog/engine.py
new file mode 100644
index 0000000..dff577c
--- /dev/null
+++ b/monkey/prolog/engine.py
@@ -0,0 +1,135 @@
+#!/usr/bin/python3
+
+import http.client
+import json
+import re
+import urllib
+
+address, port = 'localhost', 3030
+
+class PrologEngine(object):
+ def __init__(self, address=address, port=port, code='', destroy=False, id=None):
+ self.conn = http.client.HTTPConnection(address, port, timeout=10)
+
+ # If existing engine ID is given, use it.
+ if id:
+ self.id = id
+ return
+
+ # Otherwise, create a new engine.
+ hdrs = {'Content-Type': 'application/json;charset=utf-8'}
+ opts = json.dumps({'destroy': destroy, 'src_text': code, 'format': 'json-s'})
+ reply, outputs = self.request('POST', '/pengine/create', body=opts, headers=hdrs)
+
+ failed = (reply['event'] != 'create')
+ warnings = []
+ errors = []
+ for output in outputs:
+ print(output)
+ message = PrologEngine.parse_prolog_output(output)
+ if output['message'] == 'warning':
+ warnings.append(message)
+ elif output['message'] == 'error':
+ failed = True
+ errors.append(message)
+
+ if failed:
+ raise Exception('\n'.join(errors))
+
+ self.id = reply['id']
+
+ def send(self, event):
+ params = urllib.parse.urlencode({
+ 'id': self.id,
+ 'event': event,
+ 'format': 'json-s'})
+ reply, outputs = self.request('GET', '/pengine/send?' + params)
+ return reply
+
+ def ask(self, query):
+ event = 'ask(({}),[])'.format(query)
+ reply = self.send(event)
+ return reply
+
+ def next(self, n=1):
+ event = 'next({})'.format(n)
+ reply = self.send(event)
+ return reply
+
+ def stop(self):
+ return self.send('stop')
+
+ def destroy(self):
+ reply = self.send('destroy')
+ self.id = None
+ self.conn.close()
+ self.conn = None
+
+ # Return the main reply and possible output replies.
+ def request(self, method, path, body=None, headers={}):
+ self.conn.request(method, path, body, headers)
+ outputs = []
+ while True:
+ response = self.conn.getresponse()
+ if response.status != http.client.OK:
+ raise Exception('server returned {}'.format(response.status))
+ reply = json.loads(response.read().decode('utf-8'))
+ self.id = reply['id']
+ if reply['event'] == 'output':
+ outputs.append(reply)
+ params = urllib.parse.urlencode({
+ 'id': self.id,
+ 'format': 'json-s'})
+ self.conn.request('GET', '/pengine/pull_response?' + params)
+ else:
+ return reply, outputs
+
+ # Check if output is an error message and return a prettified version of it.
+ def parse_prolog_output(output):
+ match = re.match(r'.*<pre class="[^"]*">(.*)</pre>.*',
+ output['data'], flags=re.DOTALL)
+ data = match.group(1).strip()
+ message = ''
+ if output['message'] == 'error':
+ if 'location' in output:
+ loc = output['location']
+ message += 'near line ' + str(loc['line'])
+ if 'ch' in loc:
+ message += ', character ' + str(loc['ch'])
+ message += ': '
+
+ if output.get('code') == 'syntax_error':
+ match = re.match(r'^.*Syntax error: (.*)$', data, flags=re.DOTALL)
+ message += match.group(1)
+ elif output.get('code') == 'permission_error':
+ match = re.match(r'^.*(No permission [^\n]*)', data, flags=re.DOTALL)
+ message += match.group(1)
+ elif output.get('code') == 'type_error':
+ match = re.match(r'^.*(Type error: [^\n]*)', data, flags=re.DOTALL)
+ message += match.group(1)
+ else:
+ message += data
+
+ # Replace anonymous variable names with _.
+ message = re.sub(r'_G[0-9]*', '_', message)
+ return message
+
+def test(name, code):
+ engine = PrologEngine(code=code)
+ reply = engine.ask("run_tests({}, '{}', Results)".format(name, engine.id))
+ engine.destroy()
+
+ if reply['event'] != 'success':
+ raise Exception('testing procedure failed')
+
+ results = re.findall(r'(?:success|failure)\([^)]*\)', reply['data'][0]['Results'])
+ n_total = len(results)
+ n_passed = len([r for r in results if r.startswith('success')])
+ return (n_passed, n_total)
+
+# Basic sanity check.
+if __name__ == '__main__':
+ engine = PrologEngine(code='dup([],[]). dup([H|T],[H,H|TT]) :- dup(T,TT).')
+ print('engine id is ' + engine.id)
+ print(engine.ask("run_tests({},'{}',Result)".format('dup/2', engine.id)))
+ engine.destroy()