#!/usr/bin/python3 import collections import http.client import json import re import urllib # Create a new pengine, initialize it with [code] and return its ID. The engine # will be destroyed after completing one query. def create(code=''): opts = {'format': 'json-s', 'destroy': True, 'src_text': code} reply, messages = request('POST', '/pengine/create', body=json.dumps(opts)) if reply.get('event') != 'create' or 'error' in messages: raise Exception('\n'.join(messages['error'])) return reply['id'] # Create a new pengine with [code], run [query] and destroy the engine. Return # the reply from engine. def create_and_ask(code, query): opts = {'format': 'json-s', 'destroy': True, 'src_text': code, 'ask': query} reply, messages = request('POST', '/pengine/create', body=json.dumps(opts)) if reply.get('event') != 'create' or 'error' in messages: raise Exception('\n'.join(messages['error'])) return reply['answer']['data'] def ask(engine, query): return send(engine, 'ask(({}),[])'.format(query)) def next(engine, n=1): return send(engine, 'next({})'.format(n)) def stop(engine): return send(engine, 'stop') def destroy(engine): return send(engine, 'destroy') def send(engine, event): params = urllib.parse.urlencode({ 'id': engine, 'event': event, 'format': 'json-s'}) reply, messages = request('GET', '/pengine/send?' + params) return reply # Return the main reply and pull potential output replies. address, port = 'localhost', 3030 # TODO put this somewhere sane def request(method, path, body=None): headers = {'Content-Type': 'application/json;charset=utf-8'} messages = collections.defaultdict(list) try: conn = http.client.HTTPConnection(address, port, timeout=10) conn.request(method, path, body, headers=headers) while True: response = conn.getresponse() if response.status != http.client.OK: raise Exception('server returned {}'.format(response.status)) reply = json.loads(response.read().decode('utf-8')) if reply.get('event') == 'output': msg_type, msg_data = get_message(reply) messages[msg_type].append(msg_data) # Request next reply. params = urllib.parse.urlencode({ 'id': reply['id'], 'format': 'json-s'}) conn.request('GET', '/pengine/pull_response?' + params, headers=headers) else: return reply, messages finally: conn.close() # Strip boilerplate from Prolog messages … ugly. def get_message(reply): match = re.match(r'.*
(.*)
.*', reply['data'], flags=re.DOTALL) data = match.group(1).strip() message = '' if reply['message'] == 'error': if 'location' in reply: loc = reply['location'] message += 'near line ' + str(loc['line']) if 'ch' in loc: message += ', character ' + str(loc['ch']) message += ': ' if reply.get('code') == 'syntax_error': match = re.match(r'^.*Syntax error: (.*)$', data, flags=re.DOTALL) data = match.group(1) elif reply.get('code') == 'permission_error': match = re.match(r'^.*(No permission [^\n]*)', data, flags=re.DOTALL) data = match.group(1) elif reply.get('code') == 'type_error': match = re.match(r'^.*(Type error: [^\n]*)', data, flags=re.DOTALL) data = match.group(1) message += data # Replace anonymous variable names with _. message = re.sub(r'_G[0-9]*', '_', message) return reply['message'], message # Test whether [code] is a correct solution for problem [name]. Runs all tests # and returns a list of results. Raises an exception on error. def test_all(name, code): reply = create_and_ask(code=code, query="run_tests({}, Results)".format(name)) if reply.get('event') != 'success': raise Exception('testing procedure failed') results = re.findall(r'(?:success|failure)\([^)]*\)', reply['data'][0]['Results']) n_total = len(results) n_passed = len([r for r in results if r.startswith('success')]) return n_passed, n_total # Test whether [code] is a correct solution for problem [name]. Returns a bool # and stops on first failure. def test(name, code): try: reply = create_and_ask(code=code, query='run_tests({})'.format(name)) return reply.get('event') == 'success' except Exception as ex: return False # Try to generate a random test case for problem [name] with [solution] that # fails for [code]. Give up after [tries] attempts. def create_failing_test(name, solution, code, tries=20): try: for i in range(tries): reply = create_and_ask(code=solution, query='create_test({}, Test)'.format(name)) testcase = reply['data'][0]['Test'] reply = create_and_ask(code=code, query='run_test({}, Result)'.format(testcase)) result = reply['data'][0]['Result'] if not result.startswith('success'): return testcase except Exception as ex: pass return None # Basic sanity check. if __name__ == '__main__': engine = create(code='dup([],[]). dup([H|T],[H,H|TT]) :- dup(T,TT).') print('engine id is ' + engine) print(ask(engine, "run_tests({}, Result)".format('dup/2'))) destroy(engine)