diff options
Diffstat (limited to 'prolog/engine.py')
-rw-r--r-- | prolog/engine.py | 384 |
1 files changed, 54 insertions, 330 deletions
diff --git a/prolog/engine.py b/prolog/engine.py index 597ebc4..6627b9a 100644 --- a/prolog/engine.py +++ b/prolog/engine.py @@ -1,339 +1,63 @@ #!/usr/bin/python3 -from prolog.core import * -import prolog.util +import http.client +import json +import urllib -class Atom(object): - __slots__ = 'ref' - - def __init__(self, val=None, ref=None): - if ref is not None: - self.ref = ref - return - self.ref = PL_new_atom(bytes(val, encoding=encoding)) - -class Term(object): - __slots__ = 'ref' - - # Initialize from term reference [ref] if given, otherwise construct a new - # term from [val] and possibly [args]. - def __init__(self, val=None, args=None, ref=None): - if ref is not None: - self.ref = ref - return - self.ref = PL_new_term_ref() - if isinstance(val, str): - if args is not None: - # Explicitly constructed compound term with name [val] and arguments [args]. - name = PL_new_atom(bytes(val, encoding=encoding)) - PL_cons_functor_v(self.ref, PL_new_functor(name, len(args)), Termv(args).ref) - else: - # Parse term from [val]. - if not PL_chars_to_term(bytes(val, encoding=encoding), self.ref): - raise ValueError('invalid compound term') - elif isinstance(val, int): - PL_put_integer(self.ref, val) - elif isinstance(val, float): - PL_put_float(self.ref, val) - elif isinstance(val, list): - PL_put_nil(self.ref) - for t in val: - PL_cons_list(self.ref, t.ref, self.ref) - elif isinstance(val, Atom): - PL_put_atom(self.ref, val.ref) - - def __iter__(self): - if not PL_is_list(self.ref): - raise TypeError('term is not a list') - ref = self.ref - while True: - head, tail = Term(), Term() - if not PL_get_list(ref, head.ref, tail.ref): - break - yield head - ref = tail.ref - - def __str__(self): - ptr = c_char_p() - if PL_get_chars(self.ref, byref(ptr), CVT_WRITE|BUF_RING): - return str(ptr.value, encoding=encoding) - -class Termv(object): - __slots__ = 'ref' - - def __init__(self, terms): - self.ref = PL_new_term_refs(len(terms)) - for i, term in enumerate(terms): - PL_put_term(self.ref+i, term.ref) - -class Problem(object): - def __init__(self, name, solution, facts, tests): - self.name = name - self.solution = solution - self.facts = facts - self.tests = {t: None for t in tests} - self.answers = {} +address, port = 'localhost', 3030 class PrologEngine(object): - def __init__(self): - # Initialize the swipl library. - args = ['./', '-q', '--nosignals'] - if SWI_HOME_DIR is not None: - args.append('--home={0}'.format(SWI_HOME_DIR)) - s_plargs = len(args) - plargs = (c_char_p*s_plargs)() - for i in range(s_plargs): - plargs[i] = bytes(args[i], encoding) - if not PL_initialise(s_plargs, plargs): - raise EnvironmentError('Could not initialize Prolog environment.' - 'PL_initialise returned {0}'.format(result)) - - # Construct some predicates. - self.p = { - 'abolish/1': PL_predicate(b'abolish', 1, None), - 'add_import_module/3': PL_predicate(b'add_import_module', 3, None), - 'arg/3': PL_predicate(b'arg', 3, None), - 'assertz/1': PL_predicate(b'assertz', 1, None), - 'call_with_time_limit/2': PL_predicate(b'call_with_time_limit', 2, None), - 'compile_predicates/1': PL_predicate(b'compile_predicates', 1, None), - 'consult/1': PL_predicate(b'consult', 1, None), - 'functor/3': PL_predicate(b'functor', 3, None), - 'message_to_string/2': PL_predicate(b'message_to_string', 2, None), - 'read_term_from_atom/3': PL_predicate(b'read_term_from_atom', 3, None), - 'safe_goal/1': PL_predicate(b'safe_goal', 1, None), - 'set_prolog_flag/2': PL_predicate(b'set_prolog_flag', 2, None), - 'set_prolog_stack/2': PL_predicate(b'set_prolog_stack', 2, None), - 'use_module/1': PL_predicate(b'use_module', 1, None) - } - self.err_flags = PL_Q_NODEBUG|PL_Q_CATCH_EXCEPTION - - # Load the sandbox and compatibility library. - self.call('consult/1', [Term(Atom('prolog/lib.pl'))]) - - # Load the time module (for call_with_time_limit) then disable autoload. - self.call('use_module/1', [Term('library(random)')]) - self.call('use_module/1', [Term('library(time)')]) - self.call('set_prolog_flag/2', [Term('autoload'), Term('false')]) - - # Increase memory limits. - self.call('set_prolog_stack/2', [Term('global'), Term('limit(2*10**9)')]) - self.call('set_prolog_stack/2', [Term('local'), Term('limit(2*10**9)')]) - - # Discard messages from the swipl library. - self.call('assertz/1', [Term('message_hook(_, _, _)')]) - - # Problem data loaded with load_problem. - self.problems = {} - - # The set of already loaded facts. - self.facts = set() - - # Load the [solution] for problem [pid] called [name] and find answers to - # [tests]. Also load [facts] in the main module, and import modules for - # problems in [depends] into this problem's module. - def load_problem(self, pid, name, solution, depends, facts, tests): - self.problems[pid] = Problem(name, solution, facts, tests) - - # Load the solution in 'solution<pid>' module. - mod_problem = 'problem{}'.format(pid) - - fid = PL_open_foreign_frame() - predicates = self.load(solution, mod_problem) - if facts and facts not in self.facts: - predicates |= self.load(facts) - self.facts.add(facts) - self.call('compile_predicates/1', [Term([Term(p) for p in predicates])]) - - # Import solutions for dependency predicates. - for i in depends: - mod_dependency = 'problem{}'.format(i) - self.call('add_import_module/3', [Term(mod_problem), Term(mod_dependency), Term('end')]) - - # Find the correct test answers. - for query in tests: - result = self.query(query, mod_problem) - if result is None or len(result) < 1 or 'X' not in result[0]: - raise Exception('Error finding correct answer to query "{}"'.format(query)) - self.problems[pid].tests[query] = result[0]['X'] - PL_discard_foreign_frame(fid) - - # Import the correct solution for problem [pid] into module for user [uid]. - def mark_solved(self, uid, pid): - mod_user = 'user{}'.format(uid) - mod_problem = 'problem{}'.format(pid) - - fid = PL_open_foreign_frame() - result = self.call('add_import_module/3', [Term(mod_user), Term(mod_problem), Term('end')]) - PL_discard_foreign_frame(fid) - return result - - # Get up to [n] solutions to query [q]. If there are no solutions, return - # an empty list. Raise an exception on error (either from self.call, or due - # to malformed/unsafe query or a timeout). - def query(self, q, module=None, n=1): - if module is not None: - q = '{}:({})'.format(module, q) - - fid = PL_open_foreign_frame() - qid = None - try: - # Parse the query and store variable names. - goal = Term() - var_names = Term() - options = Term([Term('variable_names', [var_names])]) - if not self.call('read_term_from_atom/3', [Term(Atom(q)), goal, options]): - raise Exception('Warning: Could not read term from {}\n'.format(q)) - - # Check if goal is safe with currently loaded rules. - if not self.call('safe_goal/1', [goal]): - raise Exception('Warning: Unsafe goal: {}\n'.format(goal)) - - solutions = Term() - goal_aux = Term('findnsols', [Term(n), goal, goal, solutions]) - qid = PL_open_query(None, self.err_flags, self.p['call_with_time_limit/2'], - Termv([Term(0.01), goal_aux]).ref) - - result = [] - if PL_next_solution(qid): - solutions = list(solutions) - fid_solution = PL_open_foreign_frame() - for solution in solutions: - PL_unify(goal.ref, solution.ref) - variables = {} - for var in var_names: - name, value = Term(), Term() - PL_get_arg(1, var.ref, name.ref) - PL_get_arg(2, var.ref, value.ref) - variables[str(name)] = str(value) - result.append(variables) - PL_rewind_foreign_frame(fid_solution) - PL_discard_foreign_frame(fid_solution) - else: - # Check for exceptions. - error_msg = self.error(qid) - if error_msg: - raise Exception(error_msg) - finally: - if qid: - PL_close_query(qid) - PL_discard_foreign_frame(fid) - - return result - - # Test whether [code] gives the same answer to [query] as the correct - # solution for problem [pid]. The solution should be loaded beforehand. - def test(self, uid, pid, code): - mod_user = 'user{}'.format(uid) - - fid = PL_open_foreign_frame() - correct = True - predicates = set() - try: - self.load(code, mod_user, predicates) - for query, answer in sorted(self.problems[pid].tests.items()): - result = self.query(query, mod_user, n=1) - if len(result) != 1 or result[0]['X'] != answer: - correct = False - break - - # If a correct solution was found, see if another (incorrect) - # solution is found in the first 10 answers. - try: - result = self.query(query, mod_user, n=10) - unique = set([r['X'] for r in result]) - if len(unique) != 1: - correct = False - break - except Exception as ex: - # Only a timeout exception can occur here; in this case, we - # consider [code] correct. - pass - except Exception as ex: - correct = False - - self.unload(predicates) - PL_discard_foreign_frame(fid) - - return correct - - # Call the Prolog predicate [name]. Raise an exception on error. Since this - # creates a Termv object, it should be called within an open foreign frame. - def call(self, name, args): - qid = PL_open_query(None, self.err_flags, self.p[name], Termv(args).ref) - try: - if not PL_next_solution(qid): - error_msg = self.error(qid) - if error_msg: - raise Exception(error_msg) - return False - finally: - PL_cut_query(qid) - return True - - # Load rules from [program] into [module] and return the corresponding - # predicate names. Since this function might not return due to exception, - # the [predicates] argument can be passed where the names will be stored. - def load(self, program, module=None, predicates=None): - if predicates is None: - predicates = set() - for rule in prolog.util.split(program): - name = self.predicate_indicator(rule) - if module: - rule = '{}:({})'.format(module, rule) - name = '{}:{}'.format(module, name) - self.call('assertz/1', [Term(rule)]) - predicates.add(name) - return predicates - - # Unload and remove the "dynamic" property for all rules implementing - # [predicates]. - def unload(self, predicates): - for predicate in predicates: - self.call('abolish/1', [Term(predicate)]) - - # Return a description of the last exception, or None if no error occurred. - def error(self, qid): - error_ref = PL_exception(qid) - if not error_ref: - return None - PL_clear_exception() - - # Get the Prolog error message. - fid = PL_open_foreign_frame() - msg = Term() - if PL_call_predicate(None, self.err_flags, self.p['message_to_string/2'], - Termv([Term(ref=error_ref), msg]).ref): - error_str = str(msg) + def __init__(self, address=address, port=port, code=''): + self.id = None + self.conn = http.client.HTTPConnection(address, port, timeout=10) + + hdrs = {'Content-Type': 'application/json; charset=utf-8'} + opts = json.dumps({'destroy': False, 'src_text': code, 'format': 'json'}) + self.conn.request('POST', '/pengine/create', body=opts, headers=hdrs) + + response = self.conn.getresponse() + data = response.read() + reply = json.loads(str(data, encoding='utf-8')) + if reply['event'] == 'create': + self.id = reply['id'] else: - error_str = 'Unknown error' - PL_discard_foreign_frame(fid) - - return error_str - - # Return the main functor defined by [clause], e.g. dup/2. - def predicate_indicator(self, clause): - # Return the main functor for [term]. - def main_functor(term): - name = Term() - arity = Term() - self.call('functor/3', [term, name, arity]) - return "'{}'/{}".format(name, arity) - - fid = PL_open_foreign_frame() - clause = Term(clause) - functor = main_functor(clause) - # Check whether [clause] is a rule or a fact. - if functor == "':-'/2": - # [clause] is a rule, return the main functor for the head. - head = Term() - self.call('arg/3', [Term(1), clause, head]) - functor = main_functor(head) - PL_discard_foreign_frame(fid) - return functor + raise Exception('could not create engine: ' + reply['code']) + + def send(self, event, fmt='json'): + params = urllib.parse.urlencode({ + 'id': self.id, + 'event': event, + 'format': fmt}) + self.conn.request('GET', '/pengine/send?' + params) + + response = self.conn.getresponse() + data = str(response.read(), encoding='utf-8') + if response.status == http.client.OK: + return json.loads(data) if fmt == 'json' else data + return None + + def ask(self, query, template='', fmt='json'): + event = 'ask(({}),[template({})])'.format(query, template) + reply = self.send(event, fmt=fmt) + return reply + + def next(self, n=1): + event = 'next({})'.format(n) + reply = self.send(event) + return reply + + def stop(self): + return self.send('stop') + + def destroy(self): + reply = self.send('destroy') + self.id = None + self.conn.close() + self.conn = None # Basic sanity check. if __name__ == '__main__': - engine = PrologEngine() - engine.load_solution(0, 'a(2). a(2). a(3). ') - result = engine.test(0, 0, 'a(2). ', ['a(X)', 'a(Y), Y=X']) - print('{}: {}'.format(i, result)) + engine = PrologEngine(code='dup([],[]). dup([H|T],[H,H|TT]) :- dup(T,TT).') + print('engine id is ' + engine.id) + print(engine.ask("run_tests({},'{}',Result)".format('dup/2', engine.id))) + engine.destroy() |