#!/usr/bin/python3 from prolog.core import * import prolog.util class Atom(object): __slots__ = 'ref' def __init__(self, val=None, ref=None): if ref is not None: self.ref = ref return self.ref = PL_new_atom(bytes(val, encoding=encoding)) class Term(object): __slots__ = 'ref' # Initialize from term reference [ref] if given, otherwise construct a new # term from [val] and possibly [args]. def __init__(self, val=None, args=None, ref=None): if ref is not None: self.ref = ref return self.ref = PL_new_term_ref() if isinstance(val, str): if args is not None: # Explicitly constructed compound term with name [val] and arguments [args]. name = PL_new_atom(bytes(val, encoding=encoding)) PL_cons_functor_v(self.ref, PL_new_functor(name, len(args)), Termv(args).ref) else: # Parse term from [val]. if not PL_chars_to_term(bytes(val, encoding=encoding), self.ref): raise ValueError('invalid compound term') elif isinstance(val, int): PL_put_integer(self.ref, val) elif isinstance(val, float): PL_put_float(self.ref, val) elif isinstance(val, list): PL_put_nil(self.ref) for t in val: PL_cons_list(self.ref, t.ref, self.ref) elif isinstance(val, Atom): PL_put_atom(self.ref, val.ref) def __iter__(self): if not PL_is_list(self.ref): raise TypeError('term is not a list') ref = self.ref head, tail = Term(), Term() while PL_get_list(ref, head.ref, tail.ref): yield head ref = tail.ref def __str__(self): ptr = c_char_p() if PL_get_chars(self.ref, byref(ptr), CVT_WRITE|BUF_RING): return str(ptr.value, encoding=encoding) class Termv(object): __slots__ = 'ref' def __init__(self, terms): self.ref = PL_new_term_refs(len(terms)) for i, term in enumerate(terms): PL_put_term(self.ref+i, term.ref) class PrologEngine(object): def __init__(self): # Initialize the swipl library. args = ['./', '-q', '--nosignals'] if SWI_HOME_DIR is not None: args.append('--home={0}'.format(SWI_HOME_DIR)) s_plargs = len(args) plargs = (c_char_p*s_plargs)() for i in range(s_plargs): plargs[i] = bytes(args[i], encoding) if not PL_initialise(s_plargs, plargs): raise EnvironmentError('Could not initialize Prolog environment.' 'PL_initialise returned {0}'.format(result)) # Construct some predicates. self.p = { 'add_import_module/3': PL_predicate(b'add_import_module', 3, None), 'assertz/1': PL_predicate(b'assertz', 1, None), 'assertz/2': PL_predicate(b'assertz', 2, None), 'erase/1': PL_predicate(b'erase', 1, None), 'call_with_time_limit/2': PL_predicate(b'call_with_time_limit', 2, None), 'consult/1': PL_predicate(b'consult', 1, None), 'message_to_string/2': PL_predicate(b'message_to_string', 2, None), 'read_term_from_atom/3': PL_predicate(b'read_term_from_atom', 3, None), 'safe_goal/1': PL_predicate(b'safe_goal', 1, None), 'set_prolog_flag/2': PL_predicate(b'set_prolog_flag', 2, None), 'set_prolog_stack/2': PL_predicate(b'set_prolog_stack', 2, None), 'use_module/1': PL_predicate(b'use_module', 1, None) } self.err_flags = PL_Q_NODEBUG|PL_Q_CATCH_EXCEPTION # Load the sandbox and compatibility library. self.call('consult/1', [Term(Atom('prolog/lib.pl'))]) # Load the time module (for call_with_time_limit) then disable autoload. self.call('use_module/1', [Term('library(time)')]) self.call('set_prolog_flag/2', [Term('autoload'), Term('false')]) # Increase memory limits. self.call('set_prolog_stack/2', [Term('global'), Term('limit(2*10**9)')]) self.call('set_prolog_stack/2', [Term('local'), Term('limit(2*10**9)')]) # Discard messages from the swipl library. self.call('assertz/1', [Term('message_hook(_, _, _)')]) # Dictionary of correct answers; keys are (pid, query). self.answers = {} # Return a description of the last exception, or None if no error occurred. def error(self, qid): error_ref = PL_exception(qid) if not error_ref: return None PL_clear_exception() # Get the Prolog error message. fid = PL_open_foreign_frame() msg = Term() if PL_call_predicate(None, self.err_flags, self.p['message_to_string/2'], Termv([Term(ref=error_ref), msg]).ref): error_str = str(msg) else: error_str = 'Unknown error' PL_discard_foreign_frame(fid) return error_str # Call the Prolog predicate [name]. Raise an exception on error. def call(self, name, args): qid = PL_open_query(None, self.err_flags, self.p[name], Termv(args).ref) try: if not PL_next_solution(qid): error_msg = self.error(qid) if error_msg: raise Exception(error_msg) return False finally: PL_cut_query(qid) return True # Get up to [n] solutions to query [q]. If there are no solutions, return # an empty list. Raise an exception on error (either from self.call, or due # to malformed/unsafe query or a timeout). def query(self, q, module=None, n=1): if module is not None: q = '{}:({})'.format(module, q) fid = PL_open_foreign_frame() qid = None try: # Parse the query and store variable names. goal = Term() var_names = Term() options = Term([Term('variable_names', [var_names])]) if not self.call('read_term_from_atom/3', [Term(Atom(q)), goal, options]): raise Exception('Warning: Could not read term from {}\n'.format(q)) # Check if goal is safe with currently loaded rules. if not self.call('safe_goal/1', [goal]): raise Exception('Warning: Unsafe goal: {}\n'.format(goal)) solutions = Term() goal_aux = Term('findnsols', [Term(n), goal, goal, solutions]) qid = PL_open_query(None, self.err_flags, self.p['call_with_time_limit/2'], Termv([Term(0.01), goal_aux]).ref) result = [] if PL_next_solution(qid): fid_solution = PL_open_foreign_frame() for solution in solutions: PL_unify(goal.ref, solution.ref) variables = {} for var in var_names: name, value = Term(), Term() PL_get_arg(1, var.ref, name.ref) PL_get_arg(2, var.ref, value.ref) variables[str(name)] = str(value) result.append(variables) PL_rewind_foreign_frame(fid_solution) PL_discard_foreign_frame(fid_solution) else: # Check for exceptions. error_msg = self.error(qid) if error_msg: raise Exception(error_msg) finally: if qid: PL_close_query(qid) PL_discard_foreign_frame(fid) return result # Loads the correct solution [code] to problem [pid]. # TODO handle library loading. def load_solution(self, pid, code): module = 'solution{}'.format(pid) for rule in prolog.util.split(code): self.call('assertz/1', [Term('{}:({})'.format(module, rule))]) # Import the correct solution for problem [pid] into module for user [uid]. def mark_solved(self, uid, pid): m_user = 'user{}'.format(uid) m_solution = 'solution{}'.format(pid) return self.call('add_import_module/3', [Term(m_user), Term(m_solution), Term('end')]) # Test whether [code] gives the same answer to [query] as the correct # solution. The solution for problem [pid] should be loaded beforehand. def test(self, uid, pid, code, queries): # Module names for user code and the correct solution. m_user = 'user{}'.format(uid) m_solution = 'solution{}'.format(pid) # Find the correct answers if not already known. for query in queries: if (pid, query) not in self.answers: result = self.query(query, m_solution) if result is None or len(result) < 1 or 'X' not in result[0]: raise Exception('Error finding correct answer to query "{}"'.format(query)) self.answers[(pid, query)] = result[0]['X'] # TODO maybe we could check all vars? correct = True refs = [] try: # Load the user program [code]. for rule in prolog.util.split(code): ref = Term() if self.call('assertz/2', [Term('{}:({})'.format(m_user, rule)), ref]): refs.append(ref) for i, query in enumerate(queries): result = self.query(query, m_user) correct &= (len(result) == 1 and result[0]['X'] == self.answers[(pid, query)]) except Exception as ex: correct = False # Unload all loaded rules. for ref in refs: self.call('erase/1', [ref]) return correct # Basic sanity check. if __name__ == '__main__': engine = PrologEngine() engine.load_solution(0, 'a(2). a(2). a(3). ') result = engine.test(0, 0, 'a(2). ', ['a(X)', 'a(Y), Y=X']) print('{}: {}'.format(i, result))