diff options
Diffstat (limited to 'prolog/engine.py')
-rw-r--r-- | prolog/engine.py | 134 |
1 files changed, 134 insertions, 0 deletions
diff --git a/prolog/engine.py b/prolog/engine.py new file mode 100644 index 0000000..af79535 --- /dev/null +++ b/prolog/engine.py @@ -0,0 +1,134 @@ +#!/usr/bin/python3 + +import collections +import http.client +import json +import re +import urllib + +# Create a new pengine. If query is given: run it, destroy the engine and +# return the reply. Otherwise, return the ID of the new pengine. +def create(code=''): + opts = {'format': 'json-s', 'destroy': True, 'src_text': code} + reply, messages = request('POST', '/pengine/create', body=json.dumps(opts)) + + if reply['event'] != 'create' or 'error' in messages: + raise Exception('\n'.join(messages['error'])) + + return reply['id'] + +def create_and_ask(code, query): + opts = {'format': 'json-s', 'destroy': True, 'src_text': code, 'ask': query} + reply, messages = request('POST', '/pengine/create', body=json.dumps(opts)) + + if reply['event'] != 'create' or 'error' in messages: + raise Exception('\n'.join(messages['error'])) + + return reply['answer']['data'] + +def ask(engine, query): + return send(engine, 'ask(({}),[])'.format(query)) + +def next(engine, n=1): + return send(engine, 'next({})'.format(n)) + +def stop(engine): + return send(engine, 'stop') + +def destroy(engine): + return send(engine, 'destroy') + +def send(engine, event): + params = urllib.parse.urlencode({ + 'id': engine, + 'event': event, + 'format': 'json-s'}) + reply, messages = request('GET', '/pengine/send?' + params) + return reply + +# Return the main reply and pull potential output replies. +address, port = 'localhost', 3030 # TODO put this somewhere sane +def request(method, path, body=None): + headers = {'Content-Type': 'application/json;charset=utf-8'} + messages = collections.defaultdict(list) + try: + conn = http.client.HTTPConnection(address, port, timeout=10) + conn.request(method, path, body, headers=headers) + while True: + response = conn.getresponse() + if response.status != http.client.OK: + raise Exception('server returned {}'.format(response.status)) + + reply = json.loads(response.read().decode('utf-8')) + if reply['event'] == 'output': + msg_type, msg_data = get_message(reply) + messages[msg_type].append(msg_data) + + # Request next reply. + params = urllib.parse.urlencode({ + 'id': reply['id'], + 'format': 'json-s'}) + conn.request('GET', '/pengine/pull_response?' + params, headers=headers) + else: + return reply, messages + finally: + conn.close() + +# Strip boilerplate from Prolog messages … ugly. +def get_message(reply): + match = re.match(r'.*<pre class="[^"]*">(.*)</pre>.*', + reply['data'], flags=re.DOTALL) + data = match.group(1).strip() + message = '' + if reply['message'] == 'error': + if 'location' in reply: + loc = reply['location'] + message += 'near line ' + str(loc['line']) + if 'ch' in loc: + message += ', character ' + str(loc['ch']) + message += ': ' + + if reply.get('code') == 'syntax_error': + match = re.match(r'^.*Syntax error: (.*)$', data, flags=re.DOTALL) + message += match.group(1) + elif reply.get('code') == 'permission_error': + match = re.match(r'^.*(No permission [^\n]*)', data, flags=re.DOTALL) + message += match.group(1) + elif reply.get('code') == 'type_error': + match = re.match(r'^.*(Type error: [^\n]*)', data, flags=re.DOTALL) + message += match.group(1) + else: + message += data + + # Replace anonymous variable names with _. + message = re.sub(r'_G[0-9]*', '_', message) + return reply['message'], message + +# Test whether [code] is a correct solution for problem [name]. Runs all tests +# and returns a list of results. Raises an exception on error. +def test_all(name, code): + reply = create_and_ask(code=code, query="run_tests({}, Results)".format(name)) + + if reply['event'] != 'success': + raise Exception('testing procedure failed') + + results = re.findall(r'(?:success|failure)\([^)]*\)', reply['data'][0]['Results']) + n_total = len(results) + n_passed = len([r for r in results if r.startswith('success')]) + return n_passed, n_total + +# Test whether [code] is a correct solution for problem [name]. Returns a bool +# and stops on first failure. +def test(name, code): + try: + reply = create_and_ask(code=code, query='run_tests({})'.format(name)) + return reply['event'] == 'success' + except Exception as ex: + return False + +# Basic sanity check. +if __name__ == '__main__': + engine = PrologEngine(code='dup([],[]). dup([H|T],[H,H|TT]) :- dup(T,TT).') + print('engine id is ' + engine.id) + print(engine.ask("run_tests({}, Result)".format('dup/2'))) + engine.destroy() |