summaryrefslogtreecommitdiff
path: root/prolog
diff options
context:
space:
mode:
Diffstat (limited to 'prolog')
-rw-r--r--prolog/engine.py134
-rw-r--r--prolog/lexer.py90
-rw-r--r--prolog/util.py179
3 files changed, 403 insertions, 0 deletions
diff --git a/prolog/engine.py b/prolog/engine.py
new file mode 100644
index 0000000..af79535
--- /dev/null
+++ b/prolog/engine.py
@@ -0,0 +1,134 @@
+#!/usr/bin/python3
+
+import collections
+import http.client
+import json
+import re
+import urllib
+
+# Create a new pengine. If query is given: run it, destroy the engine and
+# return the reply. Otherwise, return the ID of the new pengine.
+def create(code=''):
+ opts = {'format': 'json-s', 'destroy': True, 'src_text': code}
+ reply, messages = request('POST', '/pengine/create', body=json.dumps(opts))
+
+ if reply['event'] != 'create' or 'error' in messages:
+ raise Exception('\n'.join(messages['error']))
+
+ return reply['id']
+
+def create_and_ask(code, query):
+ opts = {'format': 'json-s', 'destroy': True, 'src_text': code, 'ask': query}
+ reply, messages = request('POST', '/pengine/create', body=json.dumps(opts))
+
+ if reply['event'] != 'create' or 'error' in messages:
+ raise Exception('\n'.join(messages['error']))
+
+ return reply['answer']['data']
+
+def ask(engine, query):
+ return send(engine, 'ask(({}),[])'.format(query))
+
+def next(engine, n=1):
+ return send(engine, 'next({})'.format(n))
+
+def stop(engine):
+ return send(engine, 'stop')
+
+def destroy(engine):
+ return send(engine, 'destroy')
+
+def send(engine, event):
+ params = urllib.parse.urlencode({
+ 'id': engine,
+ 'event': event,
+ 'format': 'json-s'})
+ reply, messages = request('GET', '/pengine/send?' + params)
+ return reply
+
+# Return the main reply and pull potential output replies.
+address, port = 'localhost', 3030 # TODO put this somewhere sane
+def request(method, path, body=None):
+ headers = {'Content-Type': 'application/json;charset=utf-8'}
+ messages = collections.defaultdict(list)
+ try:
+ conn = http.client.HTTPConnection(address, port, timeout=10)
+ conn.request(method, path, body, headers=headers)
+ while True:
+ response = conn.getresponse()
+ if response.status != http.client.OK:
+ raise Exception('server returned {}'.format(response.status))
+
+ reply = json.loads(response.read().decode('utf-8'))
+ if reply['event'] == 'output':
+ msg_type, msg_data = get_message(reply)
+ messages[msg_type].append(msg_data)
+
+ # Request next reply.
+ params = urllib.parse.urlencode({
+ 'id': reply['id'],
+ 'format': 'json-s'})
+ conn.request('GET', '/pengine/pull_response?' + params, headers=headers)
+ else:
+ return reply, messages
+ finally:
+ conn.close()
+
+# Strip boilerplate from Prolog messages … ugly.
+def get_message(reply):
+ match = re.match(r'.*<pre class="[^"]*">(.*)</pre>.*',
+ reply['data'], flags=re.DOTALL)
+ data = match.group(1).strip()
+ message = ''
+ if reply['message'] == 'error':
+ if 'location' in reply:
+ loc = reply['location']
+ message += 'near line ' + str(loc['line'])
+ if 'ch' in loc:
+ message += ', character ' + str(loc['ch'])
+ message += ': '
+
+ if reply.get('code') == 'syntax_error':
+ match = re.match(r'^.*Syntax error: (.*)$', data, flags=re.DOTALL)
+ message += match.group(1)
+ elif reply.get('code') == 'permission_error':
+ match = re.match(r'^.*(No permission [^\n]*)', data, flags=re.DOTALL)
+ message += match.group(1)
+ elif reply.get('code') == 'type_error':
+ match = re.match(r'^.*(Type error: [^\n]*)', data, flags=re.DOTALL)
+ message += match.group(1)
+ else:
+ message += data
+
+ # Replace anonymous variable names with _.
+ message = re.sub(r'_G[0-9]*', '_', message)
+ return reply['message'], message
+
+# Test whether [code] is a correct solution for problem [name]. Runs all tests
+# and returns a list of results. Raises an exception on error.
+def test_all(name, code):
+ reply = create_and_ask(code=code, query="run_tests({}, Results)".format(name))
+
+ if reply['event'] != 'success':
+ raise Exception('testing procedure failed')
+
+ results = re.findall(r'(?:success|failure)\([^)]*\)', reply['data'][0]['Results'])
+ n_total = len(results)
+ n_passed = len([r for r in results if r.startswith('success')])
+ return n_passed, n_total
+
+# Test whether [code] is a correct solution for problem [name]. Returns a bool
+# and stops on first failure.
+def test(name, code):
+ try:
+ reply = create_and_ask(code=code, query='run_tests({})'.format(name))
+ return reply['event'] == 'success'
+ except Exception as ex:
+ return False
+
+# Basic sanity check.
+if __name__ == '__main__':
+ engine = PrologEngine(code='dup([],[]). dup([H|T],[H,H|TT]) :- dup(T,TT).')
+ print('engine id is ' + engine.id)
+ print(engine.ask("run_tests({}, Result)".format('dup/2')))
+ engine.destroy()
diff --git a/prolog/lexer.py b/prolog/lexer.py
new file mode 100644
index 0000000..971e8a6
--- /dev/null
+++ b/prolog/lexer.py
@@ -0,0 +1,90 @@
+#!/usr/bin/python3
+
+import ply.lex as lex
+
+# LEXER
+
+#states = (
+# ('comment', 'exclusive'),
+#)
+
+# tokens; treat operators as names if followed by (
+operators = {
+ r':-': 'FROM',
+ r'->': 'IMPLIES',
+ r'\+': 'NOT',
+ r'not': 'NOT',
+ r'=': 'EQU',
+ r'\=': 'NEQU',
+ r'==': 'EQ',
+ r'\==': 'NEQ',
+ r'=..': 'UNIV',
+ r'is': 'IS',
+ r'=:=': 'EQA',
+ r'=\=': 'NEQA',
+ r'<': 'LT',
+ r'=<': 'LE',
+ r'>': 'GT',
+ r'>=': 'GE',
+ r'@<': 'LTL',
+ r'@=<': 'LEL',
+ r'@>': 'GTL',
+ r'@>=': 'GEL',
+ r'+': 'PLUS',
+ r'-': 'MINUS',
+ r'*': 'STAR',
+ r'/': 'DIV',
+ r'//': 'IDIV',
+ r'mod': 'MOD',
+ r'**': 'POW',
+ r'.': 'PERIOD',
+ r',': 'COMMA',
+ r';': 'SEMI'
+}
+tokens = list(operators.values()) + [
+ 'UINTEGER', 'UREAL',
+ 'NAME', 'VARIABLE', 'STRING',
+ 'LBRACKET', 'RBRACKET', 'LPAREN', 'RPAREN', 'PIPE', 'LBRACE', 'RBRACE',
+ 'INVALID'
+]
+
+# punctuation
+t_LBRACKET = r'\['
+t_RBRACKET = r'\]'
+t_LPAREN = r'\('
+t_RPAREN = r'\)'
+t_PIPE = r'\|'
+t_LBRACE = r'{'
+t_RBRACE = r'}'
+
+t_UINTEGER = r'[0-9]+'
+t_UREAL = r'[0-9]+\.[0-9]+([eE][-+]?[0-9]+)?|inf|nan'
+t_VARIABLE = r'(_|[A-Z])[a-zA-Z0-9_]*'
+t_STRING = r'"(""|\\.|[^\"])*"'
+
+# no support for nested comments yet
+def t_comment(t):
+ r'(/\*(.|\n)*?\*/)|(%.*)'
+ pass
+
+def t_NAME(t):
+ r"'(''|\\.|[^\\'])*'|[a-z][a-zA-Z0-9_]*|[-+*/\\^<>=~:.?@#$&]+|!|;|,"
+ if t.lexer.lexpos >= len(t.lexer.lexdata) or t.lexer.lexdata[t.lexer.lexpos] != '(':
+ t.type = operators.get(t.value, 'NAME')
+ return t
+
+t_ignore = ' \t'
+
+def t_newline(t):
+ r'\n+'
+ t.lexer.lineno += len(t.value)
+
+def t_error(t):
+ # TODO send this to stderr
+ #print("Illegal character '" + t.value[0] + "'")
+ t.type = 'INVALID'
+ t.value = t.value[0]
+ t.lexer.skip(1)
+ return t
+
+lexer = lex.lex()
diff --git a/prolog/util.py b/prolog/util.py
new file mode 100644
index 0000000..7fb81e3
--- /dev/null
+++ b/prolog/util.py
@@ -0,0 +1,179 @@
+#!/usr/bin/python3
+
+from collections import namedtuple
+
+from .lexer import lexer, operators
+
+# Stores a token's type and value, and optionally the position of the first
+# character in the lexed stream.
+class Token(namedtuple('Token', ['type', 'val', 'pos'])):
+ __slots__ = ()
+
+ # Custom constructor to support default parameters.
+ def __new__(cls, type, val='', pos=None):
+ return super(Token, cls).__new__(cls, type, val, pos)
+
+ def __str__(self):
+ return self.val
+
+ # Ignore position when comparing tokens. There is probably a cleaner way of
+ # doing these.
+ __eq__ = lambda x, y: x[0] == y[0] and x[1] == y[1]
+ __ne__ = lambda x, y: x[0] != y[0] or x[1] != y[1]
+ __lt__ = lambda x, y: tuple.__lt__(x[0:2], y[0:2])
+ __le__ = lambda x, y: tuple.__le__(x[0:2], y[0:2])
+ __ge__ = lambda x, y: tuple.__ge__(x[0:2], y[0:2])
+ __gt__ = lambda x, y: tuple.__gt__(x[0:2], y[0:2])
+
+ # Only hash token's value (we don't care about position, and types are
+ # determined by values).
+ def __hash__(self):
+ return hash(self[1])
+
+# Return a list of tokens in [text].
+def tokenize(text):
+ lexer.input(text)
+ return [Token(t.type, t.value, t.lexpos) for t in lexer]
+
+# Return a one-line string representation of [tokens].
+def stringify(tokens):
+ def token_str(t):
+ if t.type in ('PERIOD', 'COMMA'):
+ return str(t) + ' '
+ if t.type in operators.values():
+ return ' ' + str(t) + ' '
+ return str(t)
+ return ''.join(map(token_str, tokens))
+
+# Yield the sequence of rules in [code].
+def split(code):
+ tokens = tokenize(code)
+ start = 0
+ for idx, token in enumerate(tokens):
+ if token.type == 'PERIOD' and idx - start > 1:
+ yield stringify(tokens[start:idx])
+ start = idx + 1
+
+# Return a list of lines in [code] and a list of rule ranges.
+def decompose(code):
+ lines = []
+ rules = []
+ tokens = tokenize(code)
+ tokens.append(Token('EOF'))
+
+ line = []
+ parens = []
+ rule_start = 0
+ for t in tokens:
+ if t.type == 'SEMI':
+ if line != []:
+ lines.append(tuple(line))
+ line = []
+ lines.append((t,))
+ continue
+ if not parens:
+ if t.type in ('PERIOD', 'FROM', 'COMMA', 'EOF'):
+ if line != []:
+ lines.append(tuple(line))
+ line = []
+ if t.type in ('PERIOD', 'EOF') and rule_start < len(lines):
+ rules.append((rule_start, len(lines)))
+ rule_start = len(lines)
+ continue
+ if t.type in ('LPAREN', 'LBRACKET', 'LBRACE'):
+ parens.append(t.type)
+ elif parens:
+ if t.type == 'RPAREN' and parens[-1] == 'LPAREN':
+ parens.pop()
+ elif t.type == 'RBRACKET' and parens[-1] == 'LBRACKET':
+ parens.pop()
+ elif t.type == 'RBRACE' and parens[-1] == 'LBRACE':
+ parens.pop()
+ line.append(t)
+ return lines, rules
+
+# Format a list of [lines] according to [rules] (as returned by decompose).
+def compose(lines, rules):
+ code = ''
+ for start, end in rules:
+ for i in range(start, end):
+ line = lines[i]
+ if i > start:
+ code += ' '
+ code += stringify(line)
+ if i == end-1:
+ code += '.\n'
+ elif i == start:
+ code += ' :-\n'
+ else:
+ if line and line[-1].type != 'SEMI' and lines[i+1][-1].type != 'SEMI':
+ code += ','
+ code += '\n'
+ return code.strip()
+
+# Rename variables in [tokens] to A0, A1, A2,… in order of appearance.
+def rename_vars(tokens, names=None):
+ if names is None:
+ names = {}
+ next_id = len(names)
+
+ # Return a new list.
+ tokens = list(tokens)
+ for i in range(len(tokens)):
+ if tokens[i].type == 'PERIOD':
+ names.clear()
+ next_id = 0
+ elif tokens[i] == Token('VARIABLE', '_'):
+ tokens[i] = Token('VARIABLE', 'A{}'.format(next_id))
+ next_id += 1
+ elif tokens[i].type == 'VARIABLE':
+ cur_name = tokens[i].val
+ if cur_name not in names:
+ names[cur_name] = 'A{}'.format(next_id)
+ next_id += 1
+ tokens[i] = Token('VARIABLE', names[cur_name])
+ return tokens
+
+# transformation = before → after; applied on line which is part of rule
+# return mapping from formal vars in before+after to actual vars in rule
+# line and rule should of course not be normalized
+def map_vars(before, after, line, rule):
+ mapping = {}
+ new_index = 0
+ for i in range(len(before)):
+ if line[i].type == 'VARIABLE':
+ formal_name = before[i].val
+ if line[i].val != '_':
+ actual_name = line[i].val
+ else:
+ actual_name = 'New'+str(new_index)
+ new_index += 1
+ mapping[formal_name] = actual_name
+
+ remaining_formal = [t.val for t in after if t.type == 'VARIABLE' and t.val not in mapping.keys()]
+ remaining_actual = [t.val for t in rule if t.type == 'VARIABLE' and t.val != '_' and t.val not in mapping.values()]
+
+ while len(remaining_actual) < len(remaining_formal):
+ remaining_actual.append('New'+str(new_index))
+ new_index += 1
+
+ for i, formal_name in enumerate(remaining_formal):
+ mapping[formal_name] = remaining_actual[i]
+
+ return mapping
+
+# Basic sanity check.
+if __name__ == '__main__':
+ code = 'dup([H|T], [H1|T1]) :- dup(T1, T2). '
+ lines, rules = decompose(code)
+ print(compose(lines, rules))
+
+ var_names = {}
+ before = rename_vars(tokenize("dup([A0|A1], [A2|A3])"), var_names)
+ after = rename_vars(tokenize("dup([A0|A1], [A5, A4|A3])"), var_names)
+
+ line = lines[0]
+ rule = tokenize(code)
+
+ mapping = map_vars(before, after, line, rule)
+ print(mapping)