summaryrefslogtreecommitdiff
path: root/monkey
diff options
context:
space:
mode:
authorTimotej Lazar <timotej.lazar@araneo.org>2015-01-26 18:26:57 +0100
committerAleš Smodiš <aless@guru.si>2015-08-11 14:26:01 +0200
commit60dd33f144165771881f60acb44552fb37a7d47f (patch)
treedb4ef83ea6bacbc4825cbadaf18b511c45d2780d /monkey
parente155b4ba28bdcfe757c8aaab2673e84b49073132 (diff)
Reduce number of requests when testing
Allow running a query when creating the pengine. Also replace PrologEngine with a set of functions.
Diffstat (limited to 'monkey')
-rw-r--r--monkey/prolog/engine.py200
1 files changed, 94 insertions, 106 deletions
diff --git a/monkey/prolog/engine.py b/monkey/prolog/engine.py
index f11854a..5a640c0 100644
--- a/monkey/prolog/engine.py
+++ b/monkey/prolog/engine.py
@@ -1,122 +1,113 @@
#!/usr/bin/python3
+import collections
import http.client
import json
import re
import urllib
-class PrologEngine(object):
- def __init__(self, address='localhost', port=3030, code='', destroy=False, id=None):
- self.conn = http.client.HTTPConnection(address, port, timeout=10)
-
- # If existing engine ID is given, use it.
- if id:
- self.id = id
- return
-
- # Otherwise, create a new engine.
- hdrs = {'Content-Type': 'application/json;charset=utf-8'}
- opts = json.dumps({'destroy': destroy, 'src_text': code, 'format': 'json-s'})
- reply, outputs = self.request('POST', '/pengine/create', body=opts, headers=hdrs)
-
- failed = (reply['event'] != 'create')
- warnings = []
- errors = []
- for output in outputs:
- message = PrologEngine.parse_prolog_output(output)
- if output['message'] == 'warning':
- warnings.append(message)
- elif output['message'] == 'error':
- failed = True
- errors.append(message)
-
- if failed:
- raise Exception('\n'.join(errors))
-
- self.id = reply['id']
-
- def send(self, event):
- params = urllib.parse.urlencode({
- 'id': self.id,
- 'event': event,
- 'format': 'json-s'})
- reply, outputs = self.request('GET', '/pengine/send?' + params)
- return reply
-
- def ask(self, query):
- event = 'ask(({}),[])'.format(query)
- reply = self.send(event)
- return reply
-
- def next(self, n=1):
- event = 'next({})'.format(n)
- reply = self.send(event)
- return reply
-
- def stop(self):
- return self.send('stop')
-
- def destroy(self):
- reply = self.send('destroy')
- self.id = None
- self.conn.close()
- self.conn = None
-
- # Return the main reply and possible output replies.
- def request(self, method, path, body=None, headers={}):
- self.conn.request(method, path, body, headers)
- outputs = []
+# Create a new pengine. If query is given: run it, destroy the engine and
+# return the reply. Otherwise, return the ID of the new pengine.
+def create(code=''):
+ opts = {'format': 'json-s', 'destroy': True, 'src_text': code}
+ reply, messages = request('POST', '/pengine/create', body=json.dumps(opts))
+
+ if reply['event'] != 'create' or 'error' in messages:
+ raise Exception('\n'.join(messages['error']))
+
+ return reply['id']
+
+def create_and_ask(code, query):
+ opts = {'format': 'json-s', 'destroy': True, 'src_text': code, 'ask': query}
+ reply, messages = request('POST', '/pengine/create', body=json.dumps(opts))
+
+ if reply['event'] != 'create' or 'error' in messages:
+ raise Exception('\n'.join(messages['error']))
+
+ return reply['answer']['data']
+
+def ask(engine, query):
+ return send(engine, 'ask(({}),[])'.format(query))
+
+def next(engine, n=1):
+ return send(engine, 'next({})'.format(n))
+
+def stop(engine):
+ return send(engine, 'stop')
+
+def destroy(engine):
+ return send(engine, 'destroy')
+
+def send(engine, event):
+ params = urllib.parse.urlencode({
+ 'id': engine,
+ 'event': event,
+ 'format': 'json-s'})
+ reply, messages = request('GET', '/pengine/send?' + params)
+ return reply
+
+# Return the main reply and pull potential output replies.
+address, port = 'localhost', 3030 # TODO put this somewhere sane
+def request(method, path, body=None):
+ headers = {'Content-Type': 'application/json;charset=utf-8'}
+ messages = collections.defaultdict(list)
+ try:
+ conn = http.client.HTTPConnection(address, port, timeout=10)
+ conn.request(method, path, body, headers=headers)
while True:
- response = self.conn.getresponse()
+ response = conn.getresponse()
if response.status != http.client.OK:
raise Exception('server returned {}'.format(response.status))
+
reply = json.loads(response.read().decode('utf-8'))
- self.id = reply['id']
if reply['event'] == 'output':
- outputs.append(reply)
+ msg_type, msg_data = get_message(reply)
+ messages[msg_type].append(msg_data)
+
+ # Request next reply.
params = urllib.parse.urlencode({
- 'id': self.id,
+ 'id': reply['id'],
'format': 'json-s'})
- self.conn.request('GET', '/pengine/pull_response?' + params)
- else:
- return reply, outputs
-
- # Check if output is an error message and return a prettified version of it.
- def parse_prolog_output(output):
- match = re.match(r'.*<pre class="[^"]*">(.*)</pre>.*',
- output['data'], flags=re.DOTALL)
- data = match.group(1).strip()
- message = ''
- if output['message'] == 'error':
- if 'location' in output:
- loc = output['location']
- message += 'near line ' + str(loc['line'])
- if 'ch' in loc:
- message += ', character ' + str(loc['ch'])
- message += ': '
-
- if output.get('code') == 'syntax_error':
- match = re.match(r'^.*Syntax error: (.*)$', data, flags=re.DOTALL)
- message += match.group(1)
- elif output.get('code') == 'permission_error':
- match = re.match(r'^.*(No permission [^\n]*)', data, flags=re.DOTALL)
- message += match.group(1)
- elif output.get('code') == 'type_error':
- match = re.match(r'^.*(Type error: [^\n]*)', data, flags=re.DOTALL)
- message += match.group(1)
+ conn.request('GET', '/pengine/pull_response?' + params, headers=headers)
else:
- message += data
-
- # Replace anonymous variable names with _.
- message = re.sub(r'_G[0-9]*', '_', message)
- return message
+ return reply, messages
+ finally:
+ conn.close()
+
+# Strip boilerplate from Prolog messages … ugly.
+def get_message(reply):
+ match = re.match(r'.*<pre class="[^"]*">(.*)</pre>.*',
+ reply['data'], flags=re.DOTALL)
+ data = match.group(1).strip()
+ message = ''
+ if reply['message'] == 'error':
+ if 'location' in reply:
+ loc = reply['location']
+ message += 'near line ' + str(loc['line'])
+ if 'ch' in loc:
+ message += ', character ' + str(loc['ch'])
+ message += ': '
+
+ if reply.get('code') == 'syntax_error':
+ match = re.match(r'^.*Syntax error: (.*)$', data, flags=re.DOTALL)
+ message += match.group(1)
+ elif reply.get('code') == 'permission_error':
+ match = re.match(r'^.*(No permission [^\n]*)', data, flags=re.DOTALL)
+ message += match.group(1)
+ elif reply.get('code') == 'type_error':
+ match = re.match(r'^.*(Type error: [^\n]*)', data, flags=re.DOTALL)
+ message += match.group(1)
+ else:
+ message += data
+
+ # Replace anonymous variable names with _.
+ message = re.sub(r'_G[0-9]*', '_', message)
+ return reply['message'], message
# Test whether [code] is a correct solution for problem [name]. Runs all tests
# and returns a list of results. Raises an exception on error.
def test_all(name, code):
- engine = PrologEngine(code=code)
- reply = engine.ask("run_tests({}, Results)".format(name))
- engine.destroy()
+ reply = create_and_ask(code=code, query="run_tests({}, Results)".format(name))
if reply['event'] != 'success':
raise Exception('testing procedure failed')
@@ -124,19 +115,16 @@ def test_all(name, code):
results = re.findall(r'(?:success|failure)\([^)]*\)', reply['data'][0]['Results'])
n_total = len(results)
n_passed = len([r for r in results if r.startswith('success')])
- return (n_passed, n_total)
+ return n_passed, n_total
# Test whether [code] is a correct solution for problem [name]. Returns a bool
# and stops on first failure.
def test(name, code):
- ret = False
try:
- engine = PrologEngine(code=code)
- ret = engine.ask("run_tests({})".format(name))['event'] == 'success'
- engine.destroy()
+ reply = create_and_ask(code=code, query='run_tests({})'.format(name))
+ return reply['event'] == 'success'
except Exception as ex:
pass
- return ret
# Basic sanity check.
if __name__ == '__main__':