#!/usr/bin/python3 from prolog.core import * import prolog.util class Atom(object): __slots__ = 'ref' def __init__(self, val=None, ref=None): if ref is not None: self.ref = ref return self.ref = PL_new_atom(bytes(val, encoding=encoding)) class Term(object): __slots__ = 'ref' # Initialize from term reference [ref] if given, otherwise construct a new # term from [val] and possibly [args]. def __init__(self, val=None, args=None, ref=None): if ref is not None: self.ref = ref return self.ref = PL_new_term_ref() if isinstance(val, str): if args is not None: # Explicitly constructed compound term with name [val] and arguments [args]. name = PL_new_atom(bytes(val, encoding=encoding)) PL_cons_functor_v(self.ref, PL_new_functor(name, len(args)), Termv(args).ref) else: # Parse term from [val]. if not PL_chars_to_term(bytes(val, encoding=encoding), self.ref): raise ValueError('invalid compound term') elif isinstance(val, int): PL_put_integer(self.ref, val) elif isinstance(val, float): PL_put_float(self.ref, val) elif isinstance(val, list): PL_put_nil(self.ref) for t in val: PL_cons_list(self.ref, t.ref, self.ref) elif isinstance(val, Atom): PL_put_atom(self.ref, val.ref) def __iter__(self): if not PL_is_list(self.ref): raise TypeError('term is not a list') ref = self.ref while True: head, tail = Term(), Term() if not PL_get_list(ref, head.ref, tail.ref): break yield head ref = tail.ref def __str__(self): ptr = c_char_p() if PL_get_chars(self.ref, byref(ptr), CVT_WRITE|BUF_RING): return str(ptr.value, encoding=encoding) class Termv(object): __slots__ = 'ref' def __init__(self, terms): self.ref = PL_new_term_refs(len(terms)) for i, term in enumerate(terms): PL_put_term(self.ref+i, term.ref) class PrologEngine(object): def __init__(self): # Initialize the swipl library. args = ['./', '-q', '--nosignals'] if SWI_HOME_DIR is not None: args.append('--home={0}'.format(SWI_HOME_DIR)) s_plargs = len(args) plargs = (c_char_p*s_plargs)() for i in range(s_plargs): plargs[i] = bytes(args[i], encoding) if not PL_initialise(s_plargs, plargs): raise EnvironmentError('Could not initialize Prolog environment.' 'PL_initialise returned {0}'.format(result)) # Construct some predicates. self.p = { 'abolish/1': PL_predicate(b'abolish', 1, None), 'add_import_module/3': PL_predicate(b'add_import_module', 3, None), 'arg/3': PL_predicate(b'arg', 3, None), 'assertz/1': PL_predicate(b'assertz', 1, None), 'call_with_time_limit/2': PL_predicate(b'call_with_time_limit', 2, None), 'compile_predicates/1': PL_predicate(b'compile_predicates', 1, None), 'consult/1': PL_predicate(b'consult', 1, None), 'functor/3': PL_predicate(b'functor', 3, None), 'message_to_string/2': PL_predicate(b'message_to_string', 2, None), 'read_term_from_atom/3': PL_predicate(b'read_term_from_atom', 3, None), 'safe_goal/1': PL_predicate(b'safe_goal', 1, None), 'set_prolog_flag/2': PL_predicate(b'set_prolog_flag', 2, None), 'set_prolog_stack/2': PL_predicate(b'set_prolog_stack', 2, None), 'use_module/1': PL_predicate(b'use_module', 1, None) } self.err_flags = PL_Q_NODEBUG|PL_Q_CATCH_EXCEPTION # Load the sandbox and compatibility library. self.call('consult/1', [Term(Atom('prolog/lib.pl'))]) # Load the time module (for call_with_time_limit) then disable autoload. self.call('use_module/1', [Term('library(time)')]) self.call('set_prolog_flag/2', [Term('autoload'), Term('false')]) # Increase memory limits. self.call('set_prolog_stack/2', [Term('global'), Term('limit(2*10**9)')]) self.call('set_prolog_stack/2', [Term('local'), Term('limit(2*10**9)')]) # Discard messages from the swipl library. self.call('assertz/1', [Term('message_hook(_, _, _)')]) # Dictionary of correct answers; keys are (pid, query). self.answers = {} # Loads the correct solution [code] to problem [pid]. def load_solution(self, pid, code, facts=None): module = 'solution{}'.format(pid) fid = PL_open_foreign_frame() predicates = set() for rule in prolog.util.split(code): self.call('assertz/1', [Term('{}:({})'.format(module, rule))]) predicates.add('{}:{}'.format(module, self.predicate_indicator(rule))) if facts: for rule in prolog.util.split(facts): self.call('assertz/1', [Term(rule)]) predicates.add(self.predicate_indicator(rule)) self.call('compile_predicates/1', [Term([Term(p) for p in predicates])]) PL_discard_foreign_frame(fid) # Import the correct solution for problem [pid] into module for user [uid]. def mark_solved(self, uid, pid): m_user = 'user{}'.format(uid) m_solution = 'solution{}'.format(pid) fid = PL_open_foreign_frame() result = self.call('add_import_module/3', [Term(m_user), Term(m_solution), Term('end')]) PL_discard_foreign_frame(fid) return result # Get up to [n] solutions to query [q]. If there are no solutions, return # an empty list. Raise an exception on error (either from self.call, or due # to malformed/unsafe query or a timeout). def query(self, q, module=None, n=1): if module is not None: q = '{}:({})'.format(module, q) fid = PL_open_foreign_frame() qid = None try: # Parse the query and store variable names. goal = Term() var_names = Term() options = Term([Term('variable_names', [var_names])]) if not self.call('read_term_from_atom/3', [Term(Atom(q)), goal, options]): raise Exception('Warning: Could not read term from {}\n'.format(q)) # Check if goal is safe with currently loaded rules. if not self.call('safe_goal/1', [goal]): raise Exception('Warning: Unsafe goal: {}\n'.format(goal)) solutions = Term() goal_aux = Term('findnsols', [Term(n), goal, goal, solutions]) qid = PL_open_query(None, self.err_flags, self.p['call_with_time_limit/2'], Termv([Term(0.01), goal_aux]).ref) result = [] if PL_next_solution(qid): solutions = list(solutions) fid_solution = PL_open_foreign_frame() for solution in solutions: PL_unify(goal.ref, solution.ref) variables = {} for var in var_names: name, value = Term(), Term() PL_get_arg(1, var.ref, name.ref) PL_get_arg(2, var.ref, value.ref) variables[str(name)] = str(value) result.append(variables) PL_rewind_foreign_frame(fid_solution) PL_discard_foreign_frame(fid_solution) else: # Check for exceptions. error_msg = self.error(qid) if error_msg: raise Exception(error_msg) finally: if qid: PL_close_query(qid) PL_discard_foreign_frame(fid) return result # Test whether [code] gives the same answer to [query] as the correct # solution. The solution for problem [pid] should be loaded beforehand. def test(self, uid, pid, code, queries): # Module names for user code and the correct solution. m_user = 'user{}'.format(uid) m_solution = 'solution{}'.format(pid) fid = PL_open_foreign_frame() # Find the correct answers if not already known. for query in queries: if (pid, query) not in self.answers: result = self.query(query, m_solution) if result is None or len(result) < 1 or 'X' not in result[0]: raise Exception('Error finding correct answer to query "{}"'.format(query)) self.answers[(pid, query)] = result[0]['X'] # TODO maybe we could check all vars? correct = True predicates = set() try: # Load the user program [code]. for rule in prolog.util.split(code): self.call('assertz/1', [Term('{}:({})'.format(m_user, rule))]) predicates.add(self.predicate_indicator(rule)) for query in queries: result = self.query(query, m_user, n=1) if len(result) != 1 or result[0]['X'] != self.answers[(pid, query)]: correct = False break # If a correct solution was found, see if another (incorrect) # solution is found in the first 10 answers. try: result = self.query(query, m_user, n=10) unique = set([r['X'] for r in result]) if len(unique) != 1: correct = False break except Exception as ex: # Only a timeout exception can occur here; in this case, we # consider [code] correct. pass except Exception as ex: correct = False # Unload all loaded rules. for predicate in predicates: self.call('abolish/1', [Term(m_user + ':' + predicate)]) PL_discard_foreign_frame(fid) return correct # Call the Prolog predicate [name]. Raise an exception on error. Since this # creates a Termv object, it should be called within an open foreign frame. def call(self, name, args): qid = PL_open_query(None, self.err_flags, self.p[name], Termv(args).ref) try: if not PL_next_solution(qid): error_msg = self.error(qid) if error_msg: raise Exception(error_msg) return False finally: PL_cut_query(qid) return True # Return a description of the last exception, or None if no error occurred. def error(self, qid): error_ref = PL_exception(qid) if not error_ref: return None PL_clear_exception() # Get the Prolog error message. fid = PL_open_foreign_frame() msg = Term() if PL_call_predicate(None, self.err_flags, self.p['message_to_string/2'], Termv([Term(ref=error_ref), msg]).ref): error_str = str(msg) else: error_str = 'Unknown error' PL_discard_foreign_frame(fid) return error_str # Return the main functor defined by [clause], e.g. dup/2. def predicate_indicator(self, clause): # Return the main functor for [term]. def main_functor(term): name = Term() arity = Term() self.call('functor/3', [term, name, arity]) return "'{}'/{}".format(name, arity) fid = PL_open_foreign_frame() clause = Term(clause) functor = main_functor(clause) # Check whether [clause] is a rule or a fact. if functor == "':-'/2": # [clause] is a rule, return the main functor for the head. head = Term() self.call('arg/3', [Term(1), clause, head]) functor = main_functor(head) PL_discard_foreign_frame(fid) return functor # Basic sanity check. if __name__ == '__main__': engine = PrologEngine() engine.load_solution(0, 'a(2). a(2). a(3). ') result = engine.test(0, 0, 'a(2). ', ['a(X)', 'a(Y), Y=X']) print('{}: {}'.format(i, result))