summaryrefslogtreecommitdiff
path: root/prolog/engine.py
blob: a25a07dcbe40fce4ea3918d20a0e3699556c9d0a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/python3

import http.client
import json
import re
import urllib

address, port = 'localhost', 3030

class PrologEngine(object):
    def __init__(self, address=address, port=port, code='', destroy=False, id=None):
        self.conn = http.client.HTTPConnection(address, port, timeout=10)

        # If existing engine ID is given, use it.
        if id:
            self.id = id
            return

        # Otherwise, create a new engine.
        hdrs = {'Content-Type': 'application/json;charset=utf-8'}
        opts = json.dumps({'destroy': destroy, 'src_text': code, 'format': 'json-s'})
        reply, outputs = self.request('POST', '/pengine/create', body=opts, headers=hdrs)

        failed = (reply['event'] != 'create')
        warnings = []
        errors = []
        for output in outputs:
            print(output)
            message = PrologEngine.parse_prolog_output(output)
            if output['message'] == 'warning':
                warnings.append(message)
            elif output['message'] == 'error':
                failed = True
                errors.append(message)

        if failed:
            raise Exception('\n'.join(errors))

        self.id = reply['id']

    def send(self, event):
        params = urllib.parse.urlencode({
            'id': self.id,
            'event': event,
            'format': 'json-s'})
        reply, outputs = self.request('GET', '/pengine/send?' + params)
        return reply

    def ask(self, query):
        event = 'ask(({}),[])'.format(query)
        reply = self.send(event)
        return reply

    def next(self, n=1):
        event = 'next({})'.format(n)
        reply = self.send(event)
        return reply

    def stop(self):
        return self.send('stop')

    def destroy(self):
        reply = self.send('destroy')
        self.id = None
        self.conn.close()
        self.conn = None

    # Return the main reply and possible output replies.
    def request(self, method, path, body=None, headers={}):
        self.conn.request(method, path, body, headers)
        outputs = []
        while True:
            response = self.conn.getresponse()
            if response.status != http.client.OK:
                raise Exception('server returned {}'.format(response.status))
            reply = json.loads(response.read().decode('utf-8'))
            self.id = reply['id']
            if reply['event'] == 'output':
                outputs.append(reply)
                params = urllib.parse.urlencode({
                    'id': self.id,
                    'format': 'json-s'})
                self.conn.request('GET', '/pengine/pull_response?' + params)
            else:
                return reply, outputs

    # Check if output is an error message and return a prettified version of it.
    def parse_prolog_output(output):
        match = re.match(r'.*<pre class="[^"]*">(.*)</pre>.*',
                         output['data'], flags=re.DOTALL)
        data = match.group(1).strip()
        message = ''
        if output['message'] == 'error':
            if 'location' in output:
                loc = output['location']
                message += 'near line ' + str(loc['line'])
                if 'ch' in loc:
                    message += ', character ' + str(loc['ch'])
                message += ': '

            if output.get('code') == 'syntax_error':
                match = re.match(r'^.*Syntax error: (.*)$', data, flags=re.DOTALL)
                message += match.group(1)
            elif output.get('code') == 'permission_error':
                match = re.match(r'^.*(No permission [^\n]*)', data, flags=re.DOTALL)
                message += match.group(1)
            elif output.get('code') == 'type_error':
                match = re.match(r'^.*(Type error: [^\n]*)', data, flags=re.DOTALL)
                message += match.group(1)
            else:
                message += data

        # Replace anonymous variable names with _.
        message = re.sub(r'_G[0-9]*', '_', message)
        return message

# Basic sanity check.
if __name__ == '__main__':
    engine = PrologEngine(code='dup([],[]). dup([H|T],[H,H|TT]) :- dup(T,TT).')
    print('engine id is ' + engine.id)
    print(engine.ask("run_tests({},'{}',Result)".format('dup/2', engine.id)))
    engine.destroy()