summaryrefslogtreecommitdiff
path: root/monkey/prolog/engine.py
blob: e73ea5476e808a9a7495b4492e2c1e16f0159d90 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/usr/bin/python3

import http.client
import json
import re
import urllib

class PrologEngine(object):
    def __init__(self, address='localhost', port=3030, code='', destroy=False, id=None):
        self.conn = http.client.HTTPConnection(address, port, timeout=10)

        # If existing engine ID is given, use it.
        if id:
            self.id = id
            return

        # Otherwise, create a new engine.
        hdrs = {'Content-Type': 'application/json;charset=utf-8'}
        opts = json.dumps({'destroy': destroy, 'src_text': code, 'format': 'json-s'})
        reply, outputs = self.request('POST', '/pengine/create', body=opts, headers=hdrs)

        failed = (reply['event'] != 'create')
        warnings = []
        errors = []
        for output in outputs:
            message = PrologEngine.parse_prolog_output(output)
            if output['message'] == 'warning':
                warnings.append(message)
            elif output['message'] == 'error':
                failed = True
                errors.append(message)

        if failed:
            raise Exception('\n'.join(errors))

        self.id = reply['id']

    def send(self, event):
        params = urllib.parse.urlencode({
            'id': self.id,
            'event': event,
            'format': 'json-s'})
        reply, outputs = self.request('GET', '/pengine/send?' + params)
        return reply

    def ask(self, query):
        event = 'ask(({}),[])'.format(query)
        reply = self.send(event)
        return reply

    def next(self, n=1):
        event = 'next({})'.format(n)
        reply = self.send(event)
        return reply

    def stop(self):
        return self.send('stop')

    def destroy(self):
        reply = self.send('destroy')
        self.id = None
        self.conn.close()
        self.conn = None

    # Return the main reply and possible output replies.
    def request(self, method, path, body=None, headers={}):
        self.conn.request(method, path, body, headers)
        outputs = []
        while True:
            response = self.conn.getresponse()
            if response.status != http.client.OK:
                raise Exception('server returned {}'.format(response.status))
            reply = json.loads(response.read().decode('utf-8'))
            self.id = reply['id']
            if reply['event'] == 'output':
                outputs.append(reply)
                params = urllib.parse.urlencode({
                    'id': self.id,
                    'format': 'json-s'})
                self.conn.request('GET', '/pengine/pull_response?' + params)
            else:
                return reply, outputs

    # Check if output is an error message and return a prettified version of it.
    def parse_prolog_output(output):
        match = re.match(r'.*<pre class="[^"]*">(.*)</pre>.*',
                         output['data'], flags=re.DOTALL)
        data = match.group(1).strip()
        message = ''
        if output['message'] == 'error':
            if 'location' in output:
                loc = output['location']
                message += 'near line ' + str(loc['line'])
                if 'ch' in loc:
                    message += ', character ' + str(loc['ch'])
                message += ': '

            if output.get('code') == 'syntax_error':
                match = re.match(r'^.*Syntax error: (.*)$', data, flags=re.DOTALL)
                message += match.group(1)
            elif output.get('code') == 'permission_error':
                match = re.match(r'^.*(No permission [^\n]*)', data, flags=re.DOTALL)
                message += match.group(1)
            elif output.get('code') == 'type_error':
                match = re.match(r'^.*(Type error: [^\n]*)', data, flags=re.DOTALL)
                message += match.group(1)
            else:
                message += data

        # Replace anonymous variable names with _.
        message = re.sub(r'_G[0-9]*', '_', message)
        return message

# Test whether [code] is a correct solution for problem [name]. Runs all tests
# and returns a list of results. Raises an exception on error.
def test_all(name, code):
    engine = PrologEngine(code=code)
    reply = engine.ask("run_tests({}, '{}', Results)".format(name, engine.id))
    engine.destroy()

    if reply['event'] != 'success':
        raise Exception('testing procedure failed')

    results = re.findall(r'(?:success|failure)\([^)]*\)', reply['data'][0]['Results'])
    n_total = len(results)
    n_passed = len([r for r in results if r.startswith('success')])
    return (n_passed, n_total)

# Test whether [code] is a correct solution for problem [name]. Returns a bool
# and stops on first failure.
def test(name, code):
    ret = False
    try:
        engine = PrologEngine(code=code)
        ret = engine.ask("run_tests({}, '{}')".format(name, engine.id))['event'] == 'success'
        engine.destroy()
    except Exception as ex:
        pass
    return ret

# Basic sanity check.
if __name__ == '__main__':
    engine = PrologEngine(code='dup([],[]). dup([H|T],[H,H|TT]) :- dup(T,TT).')
    print('engine id is ' + engine.id)
    print(engine.ask("run_tests({},'{}',Result)".format('dup/2', engine.id)))
    engine.destroy()