#!/usr/bin/python3 import collections import http.client import json import re import urllib # Create a new pengine. If query is given: run it, destroy the engine and # return the reply. Otherwise, return the ID of the new pengine. def create(code=''): opts = {'format': 'json-s', 'destroy': True, 'src_text': code} reply, messages = request('POST', '/pengine/create', body=json.dumps(opts)) if reply['event'] != 'create' or 'error' in messages: raise Exception('\n'.join(messages['error'])) return reply['id'] def create_and_ask(code, query): opts = {'format': 'json-s', 'destroy': True, 'src_text': code, 'ask': query} reply, messages = request('POST', '/pengine/create', body=json.dumps(opts)) if reply['event'] != 'create' or 'error' in messages: raise Exception('\n'.join(messages['error'])) return reply['answer']['data'] def ask(engine, query): return send(engine, 'ask(({}),[])'.format(query)) def next(engine, n=1): return send(engine, 'next({})'.format(n)) def stop(engine): return send(engine, 'stop') def destroy(engine): return send(engine, 'destroy') def send(engine, event): params = urllib.parse.urlencode({ 'id': engine, 'event': event, 'format': 'json-s'}) reply, messages = request('GET', '/pengine/send?' + params) return reply # Return the main reply and pull potential output replies. address, port = 'localhost', 3030 # TODO put this somewhere sane def request(method, path, body=None): headers = {'Content-Type': 'application/json;charset=utf-8'} messages = collections.defaultdict(list) try: conn = http.client.HTTPConnection(address, port, timeout=10) conn.request(method, path, body, headers=headers) while True: response = conn.getresponse() if response.status != http.client.OK: raise Exception('server returned {}'.format(response.status)) reply = json.loads(response.read().decode('utf-8')) if reply['event'] == 'output': msg_type, msg_data = get_message(reply) messages[msg_type].append(msg_data) # Request next reply. params = urllib.parse.urlencode({ 'id': reply['id'], 'format': 'json-s'}) conn.request('GET', '/pengine/pull_response?' + params, headers=headers) else: return reply, messages finally: conn.close() # Strip boilerplate from Prolog messages … ugly. def get_message(reply): match = re.match(r'.*
(.*).*', reply['data'], flags=re.DOTALL) data = match.group(1).strip() message = '' if reply['message'] == 'error': if 'location' in reply: loc = reply['location'] message += 'near line ' + str(loc['line']) if 'ch' in loc: message += ', character ' + str(loc['ch']) message += ': ' if reply.get('code') == 'syntax_error': match = re.match(r'^.*Syntax error: (.*)$', data, flags=re.DOTALL) message += match.group(1) elif reply.get('code') == 'permission_error': match = re.match(r'^.*(No permission [^\n]*)', data, flags=re.DOTALL) message += match.group(1) elif reply.get('code') == 'type_error': match = re.match(r'^.*(Type error: [^\n]*)', data, flags=re.DOTALL) message += match.group(1) else: message += data # Replace anonymous variable names with _. message = re.sub(r'_G[0-9]*', '_', message) return reply['message'], message # Test whether [code] is a correct solution for problem [name]. Runs all tests # and returns a list of results. Raises an exception on error. def test_all(name, code): reply = create_and_ask(code=code, query="run_tests({}, Results)".format(name)) if reply['event'] != 'success': raise Exception('testing procedure failed') results = re.findall(r'(?:success|failure)\([^)]*\)', reply['data'][0]['Results']) n_total = len(results) n_passed = len([r for r in results if r.startswith('success')]) return n_passed, n_total # Test whether [code] is a correct solution for problem [name]. Returns a bool # and stops on first failure. def test(name, code): try: reply = create_and_ask(code=code, query='run_tests({})'.format(name)) return reply['event'] == 'success' except Exception as ex: return False # Basic sanity check. if __name__ == '__main__': engine = PrologEngine(code='dup([],[]). dup([H|T],[H,H|TT]) :- dup(T,TT).') print('engine id is ' + engine.id) print(engine.ask("run_tests({}, Result)".format('dup/2'))) engine.destroy()