summaryrefslogtreecommitdiff
path: root/prolog/engine.py
blob: b3cd80539bbc559fabee3d71e7cba43f435e1d70 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#!/usr/bin/python3

import collections
import http.client
import json
import re
import urllib
import xml.etree.ElementTree

def strip_html(html_str):
    return ''.join(xml.etree.ElementTree.fromstring(html_str).itertext())

# Create a new pengine, initialize it with [code] and return Prolog's reply.
# The engine is destroyed after answering one query. If [query] is given, the
# first answer is returned and the engine destroyed.
def create(code='', query=''):
    opts = {'format': 'json-html', 'destroy': True, 'src_text': code}
    if query:
        opts['ask'] = query
    reply, output = request('POST', '/pengine/create', body=json.dumps(opts))

    # If query was given, the actual reply is nested in create/destroy objects.
    if query:
        reply = reply['answer']['data']
    return reply, output

def ask(engine, query):
    return send(engine, 'ask(({}),[])'.format(query))

def next(engine, n=1):
    return send(engine, 'next({})'.format(n))

def stop(engine):
    return send(engine, 'stop')

def destroy(engine):
    return send(engine, 'destroy')

def send(engine, event):
    params = urllib.parse.urlencode({
        'id': engine,
        'event': event,
        'format': 'json-html'})
    return request('GET', '/pengine/send?' + params)

# Return the main reply and pull potential output replies.
address, port = 'localhost', 3030  # TODO put this somewhere sane
def request(method, path, body=None):
    headers = {'Content-Type': 'application/json;charset=utf-8'}
    messages = []
    try:
        conn = http.client.HTTPConnection(address, port, timeout=10)
        conn.request(method, path, body, headers=headers)
        while True:
            response = conn.getresponse()
            if response.status != http.client.OK:
                raise Exception('server returned {}'.format(response.status))

            reply = json.loads(response.read().decode('utf-8'))
            if reply.get('event') == 'output':
                messages.append(get_message(reply))

                # Request next reply.
                params = urllib.parse.urlencode({
                    'id': reply['id'],
                    'format': 'json-html'})
                conn.request('GET', '/pengine/pull_response?' + params, headers=headers)
            else:
                return reply, messages
    finally:
        conn.close()

# Strip boilerplate from Prolog messages … ugly.
def get_message(reply):
    data = strip_html(reply['data']).strip()
    message = ''
    if reply['message'] in ('error', 'warning'):
        message = reply['message'] + ': '
        if 'location' in reply:
            loc = reply['location']
            message += 'near line ' + str(loc['line'])
            if 'ch' in loc:
                message += ', character ' + str(loc['ch'])
            message += ': '

        if reply.get('code') == 'syntax_error':
            match = re.match(r'^.*Syntax error: (.*)$', data, flags=re.DOTALL)
            data = match.group(1)
        elif reply.get('code') == 'permission_error':
            match = re.match(r'^.*(No permission [^\n]*)', data, flags=re.DOTALL)
            data = match.group(1)
        elif reply.get('code') == 'type_error':
            match = re.match(r'^.*(Type error: [^\n]*)', data, flags=re.DOTALL)
            data = match.group(1)
    message += data

    # Replace anonymous variable names with _.
    message = re.sub(r'_G[0-9]*', '_', message)
    return reply['message'], message

# Return the value of variable [name] in the JSON object returned by Prolog.
def get_var(data, name):
    for binding in data['variables']:
        if name in binding['variables']:
            return strip_html(binding['value'])
    return None

# Return a string describing variable bindings and residuals in the JSON object
# returned by Prolog.
def pretty_vars(data):
    result = []
    for binding in data['variables']:
        var_list = binding['variables']
        value = binding['value']
        result.append(' = '.join(var_list) + ' = ' + strip_html(value))
    if 'residuals' in data:
        result += [strip_html(b) for b in data['residuals']]
    return ',\n'.join(result)

# Basic sanity check.
if __name__ == '__main__':
    engine = create(code='dup([],[]). dup([H|T],[H,H|TT]) :- dup(T,TT).')['id']
    print('engine id is ' + engine)
    print(ask(engine, "run_tests({}, Result)".format('dup/2')))
    destroy(engine)