summaryrefslogtreecommitdiff
path: root/prolog/engine.py
diff options
context:
space:
mode:
authorTimotej Lazar <timotej.lazar@araneo.org>2015-02-04 18:47:07 +0100
committerAleš Smodiš <aless@guru.si>2015-08-11 14:26:01 +0200
commit001739a6a93cceeb29f81ea2281ade0bef1a8645 (patch)
tree814a0890841ab55799329c76452ba25ce693caca /prolog/engine.py
parent6a104bf8e2baea162d7f9f1d439dd8f671ddd413 (diff)
Move monkey.prolog to root module
Diffstat (limited to 'prolog/engine.py')
-rw-r--r--prolog/engine.py134
1 files changed, 134 insertions, 0 deletions
diff --git a/prolog/engine.py b/prolog/engine.py
new file mode 100644
index 0000000..af79535
--- /dev/null
+++ b/prolog/engine.py
@@ -0,0 +1,134 @@
+#!/usr/bin/python3
+
+import collections
+import http.client
+import json
+import re
+import urllib
+
+# Create a new pengine. If query is given: run it, destroy the engine and
+# return the reply. Otherwise, return the ID of the new pengine.
+def create(code=''):
+ opts = {'format': 'json-s', 'destroy': True, 'src_text': code}
+ reply, messages = request('POST', '/pengine/create', body=json.dumps(opts))
+
+ if reply['event'] != 'create' or 'error' in messages:
+ raise Exception('\n'.join(messages['error']))
+
+ return reply['id']
+
+def create_and_ask(code, query):
+ opts = {'format': 'json-s', 'destroy': True, 'src_text': code, 'ask': query}
+ reply, messages = request('POST', '/pengine/create', body=json.dumps(opts))
+
+ if reply['event'] != 'create' or 'error' in messages:
+ raise Exception('\n'.join(messages['error']))
+
+ return reply['answer']['data']
+
+def ask(engine, query):
+ return send(engine, 'ask(({}),[])'.format(query))
+
+def next(engine, n=1):
+ return send(engine, 'next({})'.format(n))
+
+def stop(engine):
+ return send(engine, 'stop')
+
+def destroy(engine):
+ return send(engine, 'destroy')
+
+def send(engine, event):
+ params = urllib.parse.urlencode({
+ 'id': engine,
+ 'event': event,
+ 'format': 'json-s'})
+ reply, messages = request('GET', '/pengine/send?' + params)
+ return reply
+
+# Return the main reply and pull potential output replies.
+address, port = 'localhost', 3030 # TODO put this somewhere sane
+def request(method, path, body=None):
+ headers = {'Content-Type': 'application/json;charset=utf-8'}
+ messages = collections.defaultdict(list)
+ try:
+ conn = http.client.HTTPConnection(address, port, timeout=10)
+ conn.request(method, path, body, headers=headers)
+ while True:
+ response = conn.getresponse()
+ if response.status != http.client.OK:
+ raise Exception('server returned {}'.format(response.status))
+
+ reply = json.loads(response.read().decode('utf-8'))
+ if reply['event'] == 'output':
+ msg_type, msg_data = get_message(reply)
+ messages[msg_type].append(msg_data)
+
+ # Request next reply.
+ params = urllib.parse.urlencode({
+ 'id': reply['id'],
+ 'format': 'json-s'})
+ conn.request('GET', '/pengine/pull_response?' + params, headers=headers)
+ else:
+ return reply, messages
+ finally:
+ conn.close()
+
+# Strip boilerplate from Prolog messages … ugly.
+def get_message(reply):
+ match = re.match(r'.*<pre class="[^"]*">(.*)</pre>.*',
+ reply['data'], flags=re.DOTALL)
+ data = match.group(1).strip()
+ message = ''
+ if reply['message'] == 'error':
+ if 'location' in reply:
+ loc = reply['location']
+ message += 'near line ' + str(loc['line'])
+ if 'ch' in loc:
+ message += ', character ' + str(loc['ch'])
+ message += ': '
+
+ if reply.get('code') == 'syntax_error':
+ match = re.match(r'^.*Syntax error: (.*)$', data, flags=re.DOTALL)
+ message += match.group(1)
+ elif reply.get('code') == 'permission_error':
+ match = re.match(r'^.*(No permission [^\n]*)', data, flags=re.DOTALL)
+ message += match.group(1)
+ elif reply.get('code') == 'type_error':
+ match = re.match(r'^.*(Type error: [^\n]*)', data, flags=re.DOTALL)
+ message += match.group(1)
+ else:
+ message += data
+
+ # Replace anonymous variable names with _.
+ message = re.sub(r'_G[0-9]*', '_', message)
+ return reply['message'], message
+
+# Test whether [code] is a correct solution for problem [name]. Runs all tests
+# and returns a list of results. Raises an exception on error.
+def test_all(name, code):
+ reply = create_and_ask(code=code, query="run_tests({}, Results)".format(name))
+
+ if reply['event'] != 'success':
+ raise Exception('testing procedure failed')
+
+ results = re.findall(r'(?:success|failure)\([^)]*\)', reply['data'][0]['Results'])
+ n_total = len(results)
+ n_passed = len([r for r in results if r.startswith('success')])
+ return n_passed, n_total
+
+# Test whether [code] is a correct solution for problem [name]. Returns a bool
+# and stops on first failure.
+def test(name, code):
+ try:
+ reply = create_and_ask(code=code, query='run_tests({})'.format(name))
+ return reply['event'] == 'success'
+ except Exception as ex:
+ return False
+
+# Basic sanity check.
+if __name__ == '__main__':
+ engine = PrologEngine(code='dup([],[]). dup([H|T],[H,H|TT]) :- dup(T,TT).')
+ print('engine id is ' + engine.id)
+ print(engine.ask("run_tests({}, Result)".format('dup/2')))
+ engine.destroy()