summaryrefslogtreecommitdiff
path: root/monkey/prolog/engine.py
blob: 5a640c0f4cae052caa6a5a667f75f6499802b119 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#!/usr/bin/python3

import collections
import http.client
import json
import re
import urllib

# Create a new pengine. If query is given: run it, destroy the engine and
# return the reply. Otherwise, return the ID of the new pengine.
def create(code=''):
    opts = {'format': 'json-s', 'destroy': True, 'src_text': code}
    reply, messages = request('POST', '/pengine/create', body=json.dumps(opts))

    if reply['event'] != 'create' or 'error' in messages:
        raise Exception('\n'.join(messages['error']))

    return reply['id']

def create_and_ask(code, query):
    opts = {'format': 'json-s', 'destroy': True, 'src_text': code, 'ask': query}
    reply, messages = request('POST', '/pengine/create', body=json.dumps(opts))

    if reply['event'] != 'create' or 'error' in messages:
        raise Exception('\n'.join(messages['error']))

    return reply['answer']['data']

def ask(engine, query):
    return send(engine, 'ask(({}),[])'.format(query))

def next(engine, n=1):
    return send(engine, 'next({})'.format(n))

def stop(engine):
    return send(engine, 'stop')

def destroy(engine):
    return send(engine, 'destroy')

def send(engine, event):
    params = urllib.parse.urlencode({
        'id': engine,
        'event': event,
        'format': 'json-s'})
    reply, messages = request('GET', '/pengine/send?' + params)
    return reply

# Return the main reply and pull potential output replies.
address, port = 'localhost', 3030  # TODO put this somewhere sane
def request(method, path, body=None):
    headers = {'Content-Type': 'application/json;charset=utf-8'}
    messages = collections.defaultdict(list)
    try:
        conn = http.client.HTTPConnection(address, port, timeout=10)
        conn.request(method, path, body, headers=headers)
        while True:
            response = conn.getresponse()
            if response.status != http.client.OK:
                raise Exception('server returned {}'.format(response.status))

            reply = json.loads(response.read().decode('utf-8'))
            if reply['event'] == 'output':
                msg_type, msg_data = get_message(reply)
                messages[msg_type].append(msg_data)

                # Request next reply.
                params = urllib.parse.urlencode({
                    'id': reply['id'],
                    'format': 'json-s'})
                conn.request('GET', '/pengine/pull_response?' + params, headers=headers)
            else:
                return reply, messages
    finally:
        conn.close()

# Strip boilerplate from Prolog messages … ugly.
def get_message(reply):
    match = re.match(r'.*<pre class="[^"]*">(.*)</pre>.*',
                     reply['data'], flags=re.DOTALL)
    data = match.group(1).strip()
    message = ''
    if reply['message'] == 'error':
        if 'location' in reply:
            loc = reply['location']
            message += 'near line ' + str(loc['line'])
            if 'ch' in loc:
                message += ', character ' + str(loc['ch'])
            message += ': '

        if reply.get('code') == 'syntax_error':
            match = re.match(r'^.*Syntax error: (.*)$', data, flags=re.DOTALL)
            message += match.group(1)
        elif reply.get('code') == 'permission_error':
            match = re.match(r'^.*(No permission [^\n]*)', data, flags=re.DOTALL)
            message += match.group(1)
        elif reply.get('code') == 'type_error':
            match = re.match(r'^.*(Type error: [^\n]*)', data, flags=re.DOTALL)
            message += match.group(1)
        else:
            message += data

    # Replace anonymous variable names with _.
    message = re.sub(r'_G[0-9]*', '_', message)
    return reply['message'], message

# Test whether [code] is a correct solution for problem [name]. Runs all tests
# and returns a list of results. Raises an exception on error.
def test_all(name, code):
    reply = create_and_ask(code=code, query="run_tests({}, Results)".format(name))

    if reply['event'] != 'success':
        raise Exception('testing procedure failed')

    results = re.findall(r'(?:success|failure)\([^)]*\)', reply['data'][0]['Results'])
    n_total = len(results)
    n_passed = len([r for r in results if r.startswith('success')])
    return n_passed, n_total

# Test whether [code] is a correct solution for problem [name]. Returns a bool
# and stops on first failure.
def test(name, code):
    try:
        reply = create_and_ask(code=code, query='run_tests({})'.format(name))
        return reply['event'] == 'success'
    except Exception as ex:
        pass

# Basic sanity check.
if __name__ == '__main__':
    engine = PrologEngine(code='dup([],[]). dup([H|T],[H,H|TT]) :- dup(T,TT).')
    print('engine id is ' + engine.id)
    print(engine.ask("run_tests({}, Result)".format('dup/2')))
    engine.destroy()