From 819ab10281c9bd6c000364c3a243959edd18abf7 Mon Sep 17 00:00:00 2001 From: Timotej Lazar Date: Thu, 15 Jan 2015 12:10:22 +0100 Subject: Move pymonkey stuff to monkey/ Importing pymonkey into webmonkey, let's see how this works. --- monkey/prolog/engine.py | 135 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 135 insertions(+) create mode 100644 monkey/prolog/engine.py (limited to 'monkey/prolog/engine.py') diff --git a/monkey/prolog/engine.py b/monkey/prolog/engine.py new file mode 100644 index 0000000..dff577c --- /dev/null +++ b/monkey/prolog/engine.py @@ -0,0 +1,135 @@ +#!/usr/bin/python3 + +import http.client +import json +import re +import urllib + +address, port = 'localhost', 3030 + +class PrologEngine(object): + def __init__(self, address=address, port=port, code='', destroy=False, id=None): + self.conn = http.client.HTTPConnection(address, port, timeout=10) + + # If existing engine ID is given, use it. + if id: + self.id = id + return + + # Otherwise, create a new engine. + hdrs = {'Content-Type': 'application/json;charset=utf-8'} + opts = json.dumps({'destroy': destroy, 'src_text': code, 'format': 'json-s'}) + reply, outputs = self.request('POST', '/pengine/create', body=opts, headers=hdrs) + + failed = (reply['event'] != 'create') + warnings = [] + errors = [] + for output in outputs: + print(output) + message = PrologEngine.parse_prolog_output(output) + if output['message'] == 'warning': + warnings.append(message) + elif output['message'] == 'error': + failed = True + errors.append(message) + + if failed: + raise Exception('\n'.join(errors)) + + self.id = reply['id'] + + def send(self, event): + params = urllib.parse.urlencode({ + 'id': self.id, + 'event': event, + 'format': 'json-s'}) + reply, outputs = self.request('GET', '/pengine/send?' + params) + return reply + + def ask(self, query): + event = 'ask(({}),[])'.format(query) + reply = self.send(event) + return reply + + def next(self, n=1): + event = 'next({})'.format(n) + reply = self.send(event) + return reply + + def stop(self): + return self.send('stop') + + def destroy(self): + reply = self.send('destroy') + self.id = None + self.conn.close() + self.conn = None + + # Return the main reply and possible output replies. + def request(self, method, path, body=None, headers={}): + self.conn.request(method, path, body, headers) + outputs = [] + while True: + response = self.conn.getresponse() + if response.status != http.client.OK: + raise Exception('server returned {}'.format(response.status)) + reply = json.loads(response.read().decode('utf-8')) + self.id = reply['id'] + if reply['event'] == 'output': + outputs.append(reply) + params = urllib.parse.urlencode({ + 'id': self.id, + 'format': 'json-s'}) + self.conn.request('GET', '/pengine/pull_response?' + params) + else: + return reply, outputs + + # Check if output is an error message and return a prettified version of it. + def parse_prolog_output(output): + match = re.match(r'.*
(.*)
.*', + output['data'], flags=re.DOTALL) + data = match.group(1).strip() + message = '' + if output['message'] == 'error': + if 'location' in output: + loc = output['location'] + message += 'near line ' + str(loc['line']) + if 'ch' in loc: + message += ', character ' + str(loc['ch']) + message += ': ' + + if output.get('code') == 'syntax_error': + match = re.match(r'^.*Syntax error: (.*)$', data, flags=re.DOTALL) + message += match.group(1) + elif output.get('code') == 'permission_error': + match = re.match(r'^.*(No permission [^\n]*)', data, flags=re.DOTALL) + message += match.group(1) + elif output.get('code') == 'type_error': + match = re.match(r'^.*(Type error: [^\n]*)', data, flags=re.DOTALL) + message += match.group(1) + else: + message += data + + # Replace anonymous variable names with _. + message = re.sub(r'_G[0-9]*', '_', message) + return message + +def test(name, code): + engine = PrologEngine(code=code) + reply = engine.ask("run_tests({}, '{}', Results)".format(name, engine.id)) + engine.destroy() + + if reply['event'] != 'success': + raise Exception('testing procedure failed') + + results = re.findall(r'(?:success|failure)\([^)]*\)', reply['data'][0]['Results']) + n_total = len(results) + n_passed = len([r for r in results if r.startswith('success')]) + return (n_passed, n_total) + +# Basic sanity check. +if __name__ == '__main__': + engine = PrologEngine(code='dup([],[]). dup([H|T],[H,H|TT]) :- dup(T,TT).') + print('engine id is ' + engine.id) + print(engine.ask("run_tests({},'{}',Result)".format('dup/2', engine.id))) + engine.destroy() -- cgit v1.2.1