summaryrefslogtreecommitdiff
path: root/prolog/engine.py
diff options
context:
space:
mode:
authorTimotej Lazar <timotej.lazar@araneo.org>2014-12-31 16:44:46 +0100
committerAleš Smodiš <aless@guru.si>2015-08-11 14:26:01 +0200
commit79527522a5c53e57fd15589d7cc12946a8372afb (patch)
tree77cdd0a118740c6c84ebd0a9212a5d09158f85be /prolog/engine.py
parent164cf022747b06d1f9058f4ff3729098e4412310 (diff)
Delegate testing to a Prolog server
... which obsoletes *a lot* of effort in making the testing procedure more robust in the past two years. Oh well. It seems to be the sanest way of coping with more than one simultaneous user (who could have predicted this use case?). The new way involves a PEngine server, and it seems to work quite well. Remember Knuth: premature optimization (as in ignoring possible solutions because they _might_ be to slow) is stupid. TODO: - library loading (again) - use of previous solution (again) - fix issues when converting non-ground terms to json Side note, constructivism works: in the past few days I have reached a much better but fundamentally ineffable intuition about Prolog, more so than in the past two years teaching it. So, fuck ITS and rather fix the schools by giving students something meaningful to do. Sigh.
Diffstat (limited to 'prolog/engine.py')
-rw-r--r--prolog/engine.py384
1 files changed, 54 insertions, 330 deletions
diff --git a/prolog/engine.py b/prolog/engine.py
index 597ebc4..6627b9a 100644
--- a/prolog/engine.py
+++ b/prolog/engine.py
@@ -1,339 +1,63 @@
#!/usr/bin/python3
-from prolog.core import *
-import prolog.util
+import http.client
+import json
+import urllib
-class Atom(object):
- __slots__ = 'ref'
-
- def __init__(self, val=None, ref=None):
- if ref is not None:
- self.ref = ref
- return
- self.ref = PL_new_atom(bytes(val, encoding=encoding))
-
-class Term(object):
- __slots__ = 'ref'
-
- # Initialize from term reference [ref] if given, otherwise construct a new
- # term from [val] and possibly [args].
- def __init__(self, val=None, args=None, ref=None):
- if ref is not None:
- self.ref = ref
- return
- self.ref = PL_new_term_ref()
- if isinstance(val, str):
- if args is not None:
- # Explicitly constructed compound term with name [val] and arguments [args].
- name = PL_new_atom(bytes(val, encoding=encoding))
- PL_cons_functor_v(self.ref, PL_new_functor(name, len(args)), Termv(args).ref)
- else:
- # Parse term from [val].
- if not PL_chars_to_term(bytes(val, encoding=encoding), self.ref):
- raise ValueError('invalid compound term')
- elif isinstance(val, int):
- PL_put_integer(self.ref, val)
- elif isinstance(val, float):
- PL_put_float(self.ref, val)
- elif isinstance(val, list):
- PL_put_nil(self.ref)
- for t in val:
- PL_cons_list(self.ref, t.ref, self.ref)
- elif isinstance(val, Atom):
- PL_put_atom(self.ref, val.ref)
-
- def __iter__(self):
- if not PL_is_list(self.ref):
- raise TypeError('term is not a list')
- ref = self.ref
- while True:
- head, tail = Term(), Term()
- if not PL_get_list(ref, head.ref, tail.ref):
- break
- yield head
- ref = tail.ref
-
- def __str__(self):
- ptr = c_char_p()
- if PL_get_chars(self.ref, byref(ptr), CVT_WRITE|BUF_RING):
- return str(ptr.value, encoding=encoding)
-
-class Termv(object):
- __slots__ = 'ref'
-
- def __init__(self, terms):
- self.ref = PL_new_term_refs(len(terms))
- for i, term in enumerate(terms):
- PL_put_term(self.ref+i, term.ref)
-
-class Problem(object):
- def __init__(self, name, solution, facts, tests):
- self.name = name
- self.solution = solution
- self.facts = facts
- self.tests = {t: None for t in tests}
- self.answers = {}
+address, port = 'localhost', 3030
class PrologEngine(object):
- def __init__(self):
- # Initialize the swipl library.
- args = ['./', '-q', '--nosignals']
- if SWI_HOME_DIR is not None:
- args.append('--home={0}'.format(SWI_HOME_DIR))
- s_plargs = len(args)
- plargs = (c_char_p*s_plargs)()
- for i in range(s_plargs):
- plargs[i] = bytes(args[i], encoding)
- if not PL_initialise(s_plargs, plargs):
- raise EnvironmentError('Could not initialize Prolog environment.'
- 'PL_initialise returned {0}'.format(result))
-
- # Construct some predicates.
- self.p = {
- 'abolish/1': PL_predicate(b'abolish', 1, None),
- 'add_import_module/3': PL_predicate(b'add_import_module', 3, None),
- 'arg/3': PL_predicate(b'arg', 3, None),
- 'assertz/1': PL_predicate(b'assertz', 1, None),
- 'call_with_time_limit/2': PL_predicate(b'call_with_time_limit', 2, None),
- 'compile_predicates/1': PL_predicate(b'compile_predicates', 1, None),
- 'consult/1': PL_predicate(b'consult', 1, None),
- 'functor/3': PL_predicate(b'functor', 3, None),
- 'message_to_string/2': PL_predicate(b'message_to_string', 2, None),
- 'read_term_from_atom/3': PL_predicate(b'read_term_from_atom', 3, None),
- 'safe_goal/1': PL_predicate(b'safe_goal', 1, None),
- 'set_prolog_flag/2': PL_predicate(b'set_prolog_flag', 2, None),
- 'set_prolog_stack/2': PL_predicate(b'set_prolog_stack', 2, None),
- 'use_module/1': PL_predicate(b'use_module', 1, None)
- }
- self.err_flags = PL_Q_NODEBUG|PL_Q_CATCH_EXCEPTION
-
- # Load the sandbox and compatibility library.
- self.call('consult/1', [Term(Atom('prolog/lib.pl'))])
-
- # Load the time module (for call_with_time_limit) then disable autoload.
- self.call('use_module/1', [Term('library(random)')])
- self.call('use_module/1', [Term('library(time)')])
- self.call('set_prolog_flag/2', [Term('autoload'), Term('false')])
-
- # Increase memory limits.
- self.call('set_prolog_stack/2', [Term('global'), Term('limit(2*10**9)')])
- self.call('set_prolog_stack/2', [Term('local'), Term('limit(2*10**9)')])
-
- # Discard messages from the swipl library.
- self.call('assertz/1', [Term('message_hook(_, _, _)')])
-
- # Problem data loaded with load_problem.
- self.problems = {}
-
- # The set of already loaded facts.
- self.facts = set()
-
- # Load the [solution] for problem [pid] called [name] and find answers to
- # [tests]. Also load [facts] in the main module, and import modules for
- # problems in [depends] into this problem's module.
- def load_problem(self, pid, name, solution, depends, facts, tests):
- self.problems[pid] = Problem(name, solution, facts, tests)
-
- # Load the solution in 'solution<pid>' module.
- mod_problem = 'problem{}'.format(pid)
-
- fid = PL_open_foreign_frame()
- predicates = self.load(solution, mod_problem)
- if facts and facts not in self.facts:
- predicates |= self.load(facts)
- self.facts.add(facts)
- self.call('compile_predicates/1', [Term([Term(p) for p in predicates])])
-
- # Import solutions for dependency predicates.
- for i in depends:
- mod_dependency = 'problem{}'.format(i)
- self.call('add_import_module/3', [Term(mod_problem), Term(mod_dependency), Term('end')])
-
- # Find the correct test answers.
- for query in tests:
- result = self.query(query, mod_problem)
- if result is None or len(result) < 1 or 'X' not in result[0]:
- raise Exception('Error finding correct answer to query "{}"'.format(query))
- self.problems[pid].tests[query] = result[0]['X']
- PL_discard_foreign_frame(fid)
-
- # Import the correct solution for problem [pid] into module for user [uid].
- def mark_solved(self, uid, pid):
- mod_user = 'user{}'.format(uid)
- mod_problem = 'problem{}'.format(pid)
-
- fid = PL_open_foreign_frame()
- result = self.call('add_import_module/3', [Term(mod_user), Term(mod_problem), Term('end')])
- PL_discard_foreign_frame(fid)
- return result
-
- # Get up to [n] solutions to query [q]. If there are no solutions, return
- # an empty list. Raise an exception on error (either from self.call, or due
- # to malformed/unsafe query or a timeout).
- def query(self, q, module=None, n=1):
- if module is not None:
- q = '{}:({})'.format(module, q)
-
- fid = PL_open_foreign_frame()
- qid = None
- try:
- # Parse the query and store variable names.
- goal = Term()
- var_names = Term()
- options = Term([Term('variable_names', [var_names])])
- if not self.call('read_term_from_atom/3', [Term(Atom(q)), goal, options]):
- raise Exception('Warning: Could not read term from {}\n'.format(q))
-
- # Check if goal is safe with currently loaded rules.
- if not self.call('safe_goal/1', [goal]):
- raise Exception('Warning: Unsafe goal: {}\n'.format(goal))
-
- solutions = Term()
- goal_aux = Term('findnsols', [Term(n), goal, goal, solutions])
- qid = PL_open_query(None, self.err_flags, self.p['call_with_time_limit/2'],
- Termv([Term(0.01), goal_aux]).ref)
-
- result = []
- if PL_next_solution(qid):
- solutions = list(solutions)
- fid_solution = PL_open_foreign_frame()
- for solution in solutions:
- PL_unify(goal.ref, solution.ref)
- variables = {}
- for var in var_names:
- name, value = Term(), Term()
- PL_get_arg(1, var.ref, name.ref)
- PL_get_arg(2, var.ref, value.ref)
- variables[str(name)] = str(value)
- result.append(variables)
- PL_rewind_foreign_frame(fid_solution)
- PL_discard_foreign_frame(fid_solution)
- else:
- # Check for exceptions.
- error_msg = self.error(qid)
- if error_msg:
- raise Exception(error_msg)
- finally:
- if qid:
- PL_close_query(qid)
- PL_discard_foreign_frame(fid)
-
- return result
-
- # Test whether [code] gives the same answer to [query] as the correct
- # solution for problem [pid]. The solution should be loaded beforehand.
- def test(self, uid, pid, code):
- mod_user = 'user{}'.format(uid)
-
- fid = PL_open_foreign_frame()
- correct = True
- predicates = set()
- try:
- self.load(code, mod_user, predicates)
- for query, answer in sorted(self.problems[pid].tests.items()):
- result = self.query(query, mod_user, n=1)
- if len(result) != 1 or result[0]['X'] != answer:
- correct = False
- break
-
- # If a correct solution was found, see if another (incorrect)
- # solution is found in the first 10 answers.
- try:
- result = self.query(query, mod_user, n=10)
- unique = set([r['X'] for r in result])
- if len(unique) != 1:
- correct = False
- break
- except Exception as ex:
- # Only a timeout exception can occur here; in this case, we
- # consider [code] correct.
- pass
- except Exception as ex:
- correct = False
-
- self.unload(predicates)
- PL_discard_foreign_frame(fid)
-
- return correct
-
- # Call the Prolog predicate [name]. Raise an exception on error. Since this
- # creates a Termv object, it should be called within an open foreign frame.
- def call(self, name, args):
- qid = PL_open_query(None, self.err_flags, self.p[name], Termv(args).ref)
- try:
- if not PL_next_solution(qid):
- error_msg = self.error(qid)
- if error_msg:
- raise Exception(error_msg)
- return False
- finally:
- PL_cut_query(qid)
- return True
-
- # Load rules from [program] into [module] and return the corresponding
- # predicate names. Since this function might not return due to exception,
- # the [predicates] argument can be passed where the names will be stored.
- def load(self, program, module=None, predicates=None):
- if predicates is None:
- predicates = set()
- for rule in prolog.util.split(program):
- name = self.predicate_indicator(rule)
- if module:
- rule = '{}:({})'.format(module, rule)
- name = '{}:{}'.format(module, name)
- self.call('assertz/1', [Term(rule)])
- predicates.add(name)
- return predicates
-
- # Unload and remove the "dynamic" property for all rules implementing
- # [predicates].
- def unload(self, predicates):
- for predicate in predicates:
- self.call('abolish/1', [Term(predicate)])
-
- # Return a description of the last exception, or None if no error occurred.
- def error(self, qid):
- error_ref = PL_exception(qid)
- if not error_ref:
- return None
- PL_clear_exception()
-
- # Get the Prolog error message.
- fid = PL_open_foreign_frame()
- msg = Term()
- if PL_call_predicate(None, self.err_flags, self.p['message_to_string/2'],
- Termv([Term(ref=error_ref), msg]).ref):
- error_str = str(msg)
+ def __init__(self, address=address, port=port, code=''):
+ self.id = None
+ self.conn = http.client.HTTPConnection(address, port, timeout=10)
+
+ hdrs = {'Content-Type': 'application/json; charset=utf-8'}
+ opts = json.dumps({'destroy': False, 'src_text': code, 'format': 'json'})
+ self.conn.request('POST', '/pengine/create', body=opts, headers=hdrs)
+
+ response = self.conn.getresponse()
+ data = response.read()
+ reply = json.loads(str(data, encoding='utf-8'))
+ if reply['event'] == 'create':
+ self.id = reply['id']
else:
- error_str = 'Unknown error'
- PL_discard_foreign_frame(fid)
-
- return error_str
-
- # Return the main functor defined by [clause], e.g. dup/2.
- def predicate_indicator(self, clause):
- # Return the main functor for [term].
- def main_functor(term):
- name = Term()
- arity = Term()
- self.call('functor/3', [term, name, arity])
- return "'{}'/{}".format(name, arity)
-
- fid = PL_open_foreign_frame()
- clause = Term(clause)
- functor = main_functor(clause)
- # Check whether [clause] is a rule or a fact.
- if functor == "':-'/2":
- # [clause] is a rule, return the main functor for the head.
- head = Term()
- self.call('arg/3', [Term(1), clause, head])
- functor = main_functor(head)
- PL_discard_foreign_frame(fid)
- return functor
+ raise Exception('could not create engine: ' + reply['code'])
+
+ def send(self, event, fmt='json'):
+ params = urllib.parse.urlencode({
+ 'id': self.id,
+ 'event': event,
+ 'format': fmt})
+ self.conn.request('GET', '/pengine/send?' + params)
+
+ response = self.conn.getresponse()
+ data = str(response.read(), encoding='utf-8')
+ if response.status == http.client.OK:
+ return json.loads(data) if fmt == 'json' else data
+ return None
+
+ def ask(self, query, template='', fmt='json'):
+ event = 'ask(({}),[template({})])'.format(query, template)
+ reply = self.send(event, fmt=fmt)
+ return reply
+
+ def next(self, n=1):
+ event = 'next({})'.format(n)
+ reply = self.send(event)
+ return reply
+
+ def stop(self):
+ return self.send('stop')
+
+ def destroy(self):
+ reply = self.send('destroy')
+ self.id = None
+ self.conn.close()
+ self.conn = None
# Basic sanity check.
if __name__ == '__main__':
- engine = PrologEngine()
- engine.load_solution(0, 'a(2). a(2). a(3). ')
- result = engine.test(0, 0, 'a(2). ', ['a(X)', 'a(Y), Y=X'])
- print('{}: {}'.format(i, result))
+ engine = PrologEngine(code='dup([],[]). dup([H|T],[H,H|TT]) :- dup(T,TT).')
+ print('engine id is ' + engine.id)
+ print(engine.ask("run_tests({},'{}',Result)".format('dup/2', engine.id)))
+ engine.destroy()