diff options
author | Timotej Lazar <timotej.lazar@araneo.org> | 2015-01-26 18:26:57 +0100 |
---|---|---|
committer | Aleš Smodiš <aless@guru.si> | 2015-08-11 14:26:01 +0200 |
commit | 60dd33f144165771881f60acb44552fb37a7d47f (patch) | |
tree | db4ef83ea6bacbc4825cbadaf18b511c45d2780d | |
parent | e155b4ba28bdcfe757c8aaab2673e84b49073132 (diff) |
Reduce number of requests when testing
Allow running a query when creating the pengine. Also replace
PrologEngine with a set of functions.
-rw-r--r-- | monkey/prolog/engine.py | 200 |
1 files changed, 94 insertions, 106 deletions
diff --git a/monkey/prolog/engine.py b/monkey/prolog/engine.py index f11854a..5a640c0 100644 --- a/monkey/prolog/engine.py +++ b/monkey/prolog/engine.py @@ -1,122 +1,113 @@ #!/usr/bin/python3 +import collections import http.client import json import re import urllib -class PrologEngine(object): - def __init__(self, address='localhost', port=3030, code='', destroy=False, id=None): - self.conn = http.client.HTTPConnection(address, port, timeout=10) - - # If existing engine ID is given, use it. - if id: - self.id = id - return - - # Otherwise, create a new engine. - hdrs = {'Content-Type': 'application/json;charset=utf-8'} - opts = json.dumps({'destroy': destroy, 'src_text': code, 'format': 'json-s'}) - reply, outputs = self.request('POST', '/pengine/create', body=opts, headers=hdrs) - - failed = (reply['event'] != 'create') - warnings = [] - errors = [] - for output in outputs: - message = PrologEngine.parse_prolog_output(output) - if output['message'] == 'warning': - warnings.append(message) - elif output['message'] == 'error': - failed = True - errors.append(message) - - if failed: - raise Exception('\n'.join(errors)) - - self.id = reply['id'] - - def send(self, event): - params = urllib.parse.urlencode({ - 'id': self.id, - 'event': event, - 'format': 'json-s'}) - reply, outputs = self.request('GET', '/pengine/send?' + params) - return reply - - def ask(self, query): - event = 'ask(({}),[])'.format(query) - reply = self.send(event) - return reply - - def next(self, n=1): - event = 'next({})'.format(n) - reply = self.send(event) - return reply - - def stop(self): - return self.send('stop') - - def destroy(self): - reply = self.send('destroy') - self.id = None - self.conn.close() - self.conn = None - - # Return the main reply and possible output replies. - def request(self, method, path, body=None, headers={}): - self.conn.request(method, path, body, headers) - outputs = [] +# Create a new pengine. If query is given: run it, destroy the engine and +# return the reply. Otherwise, return the ID of the new pengine. +def create(code=''): + opts = {'format': 'json-s', 'destroy': True, 'src_text': code} + reply, messages = request('POST', '/pengine/create', body=json.dumps(opts)) + + if reply['event'] != 'create' or 'error' in messages: + raise Exception('\n'.join(messages['error'])) + + return reply['id'] + +def create_and_ask(code, query): + opts = {'format': 'json-s', 'destroy': True, 'src_text': code, 'ask': query} + reply, messages = request('POST', '/pengine/create', body=json.dumps(opts)) + + if reply['event'] != 'create' or 'error' in messages: + raise Exception('\n'.join(messages['error'])) + + return reply['answer']['data'] + +def ask(engine, query): + return send(engine, 'ask(({}),[])'.format(query)) + +def next(engine, n=1): + return send(engine, 'next({})'.format(n)) + +def stop(engine): + return send(engine, 'stop') + +def destroy(engine): + return send(engine, 'destroy') + +def send(engine, event): + params = urllib.parse.urlencode({ + 'id': engine, + 'event': event, + 'format': 'json-s'}) + reply, messages = request('GET', '/pengine/send?' + params) + return reply + +# Return the main reply and pull potential output replies. +address, port = 'localhost', 3030 # TODO put this somewhere sane +def request(method, path, body=None): + headers = {'Content-Type': 'application/json;charset=utf-8'} + messages = collections.defaultdict(list) + try: + conn = http.client.HTTPConnection(address, port, timeout=10) + conn.request(method, path, body, headers=headers) while True: - response = self.conn.getresponse() + response = conn.getresponse() if response.status != http.client.OK: raise Exception('server returned {}'.format(response.status)) + reply = json.loads(response.read().decode('utf-8')) - self.id = reply['id'] if reply['event'] == 'output': - outputs.append(reply) + msg_type, msg_data = get_message(reply) + messages[msg_type].append(msg_data) + + # Request next reply. params = urllib.parse.urlencode({ - 'id': self.id, + 'id': reply['id'], 'format': 'json-s'}) - self.conn.request('GET', '/pengine/pull_response?' + params) - else: - return reply, outputs - - # Check if output is an error message and return a prettified version of it. - def parse_prolog_output(output): - match = re.match(r'.*<pre class="[^"]*">(.*)</pre>.*', - output['data'], flags=re.DOTALL) - data = match.group(1).strip() - message = '' - if output['message'] == 'error': - if 'location' in output: - loc = output['location'] - message += 'near line ' + str(loc['line']) - if 'ch' in loc: - message += ', character ' + str(loc['ch']) - message += ': ' - - if output.get('code') == 'syntax_error': - match = re.match(r'^.*Syntax error: (.*)$', data, flags=re.DOTALL) - message += match.group(1) - elif output.get('code') == 'permission_error': - match = re.match(r'^.*(No permission [^\n]*)', data, flags=re.DOTALL) - message += match.group(1) - elif output.get('code') == 'type_error': - match = re.match(r'^.*(Type error: [^\n]*)', data, flags=re.DOTALL) - message += match.group(1) + conn.request('GET', '/pengine/pull_response?' + params, headers=headers) else: - message += data - - # Replace anonymous variable names with _. - message = re.sub(r'_G[0-9]*', '_', message) - return message + return reply, messages + finally: + conn.close() + +# Strip boilerplate from Prolog messages … ugly. +def get_message(reply): + match = re.match(r'.*<pre class="[^"]*">(.*)</pre>.*', + reply['data'], flags=re.DOTALL) + data = match.group(1).strip() + message = '' + if reply['message'] == 'error': + if 'location' in reply: + loc = reply['location'] + message += 'near line ' + str(loc['line']) + if 'ch' in loc: + message += ', character ' + str(loc['ch']) + message += ': ' + + if reply.get('code') == 'syntax_error': + match = re.match(r'^.*Syntax error: (.*)$', data, flags=re.DOTALL) + message += match.group(1) + elif reply.get('code') == 'permission_error': + match = re.match(r'^.*(No permission [^\n]*)', data, flags=re.DOTALL) + message += match.group(1) + elif reply.get('code') == 'type_error': + match = re.match(r'^.*(Type error: [^\n]*)', data, flags=re.DOTALL) + message += match.group(1) + else: + message += data + + # Replace anonymous variable names with _. + message = re.sub(r'_G[0-9]*', '_', message) + return reply['message'], message # Test whether [code] is a correct solution for problem [name]. Runs all tests # and returns a list of results. Raises an exception on error. def test_all(name, code): - engine = PrologEngine(code=code) - reply = engine.ask("run_tests({}, Results)".format(name)) - engine.destroy() + reply = create_and_ask(code=code, query="run_tests({}, Results)".format(name)) if reply['event'] != 'success': raise Exception('testing procedure failed') @@ -124,19 +115,16 @@ def test_all(name, code): results = re.findall(r'(?:success|failure)\([^)]*\)', reply['data'][0]['Results']) n_total = len(results) n_passed = len([r for r in results if r.startswith('success')]) - return (n_passed, n_total) + return n_passed, n_total # Test whether [code] is a correct solution for problem [name]. Returns a bool # and stops on first failure. def test(name, code): - ret = False try: - engine = PrologEngine(code=code) - ret = engine.ask("run_tests({})".format(name))['event'] == 'success' - engine.destroy() + reply = create_and_ask(code=code, query='run_tests({})'.format(name)) + return reply['event'] == 'success' except Exception as ex: pass - return ret # Basic sanity check. if __name__ == '__main__': |