summaryrefslogtreecommitdiff
path: root/prolog/engine.py
blob: e64f72855c4f31291d0c78d783d22e6bd9fac8d4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/python3

import collections
import http.client
import json
import re
import urllib

# Create a new pengine, initialize it with [code] and return its ID. The engine
# will be destroyed after completing one query.
def create(code=''):
    opts = {'format': 'json-s', 'destroy': True, 'src_text': code}
    reply, messages = request('POST', '/pengine/create', body=json.dumps(opts))

    if reply.get('event') != 'create' or 'error' in messages:
        raise Exception('\n'.join(messages['error']))

    return reply['id']

# Create a new pengine with [code], run [query] and destroy the engine. Return
# the reply from engine.
def create_and_ask(code, query):
    opts = {'format': 'json-s', 'destroy': True, 'src_text': code, 'ask': query}
    reply, messages = request('POST', '/pengine/create', body=json.dumps(opts))

    if reply.get('event') != 'create' or 'error' in messages:
        raise Exception('\n'.join(messages['error']))

    return reply['answer']['data']

def ask(engine, query):
    return send(engine, 'ask(({}),[])'.format(query))

def next(engine, n=1):
    return send(engine, 'next({})'.format(n))

def stop(engine):
    return send(engine, 'stop')

def destroy(engine):
    return send(engine, 'destroy')

def send(engine, event):
    params = urllib.parse.urlencode({
        'id': engine,
        'event': event,
        'format': 'json-s'})
    reply, messages = request('GET', '/pengine/send?' + params)
    return reply

# Return the main reply and pull potential output replies.
address, port = 'localhost', 3030  # TODO put this somewhere sane
def request(method, path, body=None):
    headers = {'Content-Type': 'application/json;charset=utf-8'}
    messages = collections.defaultdict(list)
    try:
        conn = http.client.HTTPConnection(address, port, timeout=10)
        conn.request(method, path, body, headers=headers)
        while True:
            response = conn.getresponse()
            if response.status != http.client.OK:
                raise Exception('server returned {}'.format(response.status))

            reply = json.loads(response.read().decode('utf-8'))
            if reply.get('event') == 'output':
                msg_type, msg_data = get_message(reply)
                messages[msg_type].append(msg_data)

                # Request next reply.
                params = urllib.parse.urlencode({
                    'id': reply['id'],
                    'format': 'json-s'})
                conn.request('GET', '/pengine/pull_response?' + params, headers=headers)
            else:
                return reply, messages
    finally:
        conn.close()

# Strip boilerplate from Prolog messages … ugly.
def get_message(reply):
    match = re.match(r'.*<pre class="[^"]*">(.*)</pre>.*',
                     reply['data'], flags=re.DOTALL)
    data = match.group(1).strip()
    message = ''
    if reply['message'] == 'error':
        if 'location' in reply:
            loc = reply['location']
            message += 'near line ' + str(loc['line'])
            if 'ch' in loc:
                message += ', character ' + str(loc['ch'])
            message += ': '

        if reply.get('code') == 'syntax_error':
            match = re.match(r'^.*Syntax error: (.*)$', data, flags=re.DOTALL)
            data = match.group(1)
        elif reply.get('code') == 'permission_error':
            match = re.match(r'^.*(No permission [^\n]*)', data, flags=re.DOTALL)
            data = match.group(1)
        elif reply.get('code') == 'type_error':
            match = re.match(r'^.*(Type error: [^\n]*)', data, flags=re.DOTALL)
            data = match.group(1)
        message += data

    # Replace anonymous variable names with _.
    message = re.sub(r'_G[0-9]*', '_', message)
    return reply['message'], message

# Test whether [code] is a correct solution for problem [name]. Runs all tests
# and returns a list of results. Raises an exception on error.
def test_all(name, code):
    reply = create_and_ask(code=code, query="run_tests({}, Results)".format(name))

    if reply.get('event') != 'success':
        raise Exception('testing procedure failed')

    results = re.findall(r'(?:success|failure)\([^)]*\)', reply['data'][0]['Results'])
    n_total = len(results)
    n_passed = len([r for r in results if r.startswith('success')])
    return n_passed, n_total

# Test whether [code] is a correct solution for problem [name]. Returns a bool
# and stops on first failure.
def test(name, code):
    try:
        reply = create_and_ask(code=code, query='run_tests({})'.format(name))
        return reply.get('event') == 'success'
    except Exception as ex:
        return False

# Basic sanity check.
if __name__ == '__main__':
    engine = create(code='dup([],[]). dup([H|T],[H,H|TT]) :- dup(T,TT).')
    print('engine id is ' + engine)
    print(ask(engine, "run_tests({}, Result)".format('dup/2')))
    destroy(engine)