#!/usr/bin/python3 from prolog.core import * from prolog.util import * class Atom(object): __slots__ = 'ref' def __init__(self, val=None, ref=None): if ref is not None: self.ref = ref return self.ref = PL_new_atom(bytes(val, encoding=encoding)) class Term(object): __slots__ = 'ref' # Initialize from term reference [ref] if given, otherwise construct a new # term from [val] and possibly [args]. def __init__(self, val=None, args=None, ref=None): if ref is not None: self.ref = ref return self.ref = PL_new_term_ref() if isinstance(val, str): if args is not None: # Explicitly constructed compound term with name [val] and arguments [args]. name = PL_new_atom(bytes(val, encoding=encoding)) PL_cons_functor_v(self.ref, PL_new_functor(name, len(args)), Termv(args).ref) else: # Parse term from [val]. if not PL_chars_to_term(bytes(val, encoding=encoding), self.ref): raise ValueError('invalid compound term') elif isinstance(val, int): PL_put_integer(self.ref, val) elif isinstance(val, float): PL_put_float(self.ref, val) elif isinstance(val, list): PL_put_nil(self.ref) for t in val: PL_cons_list(self.ref, t.ref, self.ref) elif isinstance(val, Atom): PL_put_atom(self.ref, val.ref) def __iter__(self): if not PL_is_list(self.ref): raise TypeError('term is not a list') ref = self.ref head, tail = Term(), Term() while PL_get_list(ref, head.ref, tail.ref): yield head ref = tail.ref def __str__(self): ptr = c_char_p() if PL_get_chars(self.ref, byref(ptr), CVT_WRITE|BUF_RING): return str(ptr.value, encoding=encoding) class Termv(object): __slots__ = 'ref' def __init__(self, terms): self.ref = PL_new_term_refs(len(terms)) for i, term in enumerate(terms): PL_put_term(self.ref+i, term.ref) class PrologEngine(object): def __init__(self): # initialize Prolog library args = ['./', '-q', '--nosignals'] if SWI_HOME_DIR is not None: args.append('--home={0}'.format(SWI_HOME_DIR)) s_plargs = len(args) plargs = (c_char_p*s_plargs)() for i in range(s_plargs): plargs[i] = bytes(args[i], encoding) if not PL_initialise(s_plargs, plargs): raise EnvironmentError('Could not initialize Prolog environment.' 'PL_initialise returned {0}'.format(result)) # construct some predicates self.p = { 'assertz': PL_predicate(b'assertz', 2, None), 'erase': PL_predicate(b'erase', 1, None), 'call_with_depth_limit': PL_predicate(b'call_with_depth_limit', 3, None), 'call_with_time_limit': PL_predicate(b'call_with_time_limit', 2, None), 'consult': PL_predicate(b'consult', 1, None), 'findnsols': PL_predicate(b'findnsols', 4, None), 'message_to_string': PL_predicate(b'message_to_string', 2, None), 'read_term_from_atom': PL_predicate(b'read_term_from_atom', 3, None), 'safe_goal': PL_predicate(b'safe_goal', 1, None), 'set_prolog_stack': PL_predicate(b'set_prolog_stack', 2, None) } self.err_flags = PL_Q_NODEBUG|PL_Q_CATCH_EXCEPTION # database of current asserts; each load gets a new index self.refs = {} self.max_index = 0 # increase memory limits self.call('set_prolog_stack', [Term('global'), Term('limit(2*10**9)')]) self.call('set_prolog_stack', [Term('local'), Term('limit(2*10**9)')]) # discard messages from swipl library self.load('message_hook(_, _, _). ') def exception(self, qid): ref = PL_exception(qid) if ref is None: return False try: ex = Term(ref=ref) if ex is not None: msg = Term() if PL_call_predicate(None, self.err_flags, self.p['message_to_string'], Termv([ex, msg]).ref): raise Exception(str(msg)) raise Exception('Unknown error') finally: PL_clear_exception() return False def call(self, name, args): qid = PL_open_query(None, self.err_flags, self.p[name], Termv(args).ref) try: if not PL_next_solution(qid): if not self.exception(qid): return False finally: PL_cut_query(qid) return True def consult(self, filename): return self.call('consult', [Term(Atom(filename))]) def load(self, program, module=None): tokens = tokenize(program) refs = [] try: start = 0 for idx in range(len(tokens)): if tokens[idx] != ('PERIOD', '.') or idx - start <= 1: continue rule = stringify(tokens[start:idx]) orig_rule = rule start = idx + 1 if module is not None: rule = module + ':(' + rule + ')' ref = Term() if self.call('assertz', [Term(rule), ref]): refs.append(ref) except Exception as ex: # unload already loaded rules (hacky, mustfix, todo) for ref in refs: self.call('erase', [ref]) raise Exception('Error loading program: {}\nBad rule: {}'.format(ex, orig_rule)) self.max_index += 1 self.refs[self.max_index] = refs return self.max_index def unload(self, index): try: for ref in self.refs[index]: self.call('erase', [ref]) del self.refs[index] except Exception as ex: # This should never happen. sys.stderr.write(str(ex) + '\n') # Get up to [n] solutions to query [q]. def query(self, q, module=None, n=1): if module is not None: q = '{}:({})'.format(module, q) fid = PL_open_foreign_frame() var_names = Term() options = Term([Term('variable_names', [var_names])]) goal = Term() try: # Parse term and store variable names. if not self.call('read_term_from_atom', [Term(Atom(q)), goal, options]): raise Exception('Warning: Could not read term from {}\n'.format(q)) # Check if goal is safe with currently loaded rules. if not self.call('safe_goal', [goal]): raise Exception('Warning: Unsafe goal: {}\n'.format(goal)) except Exception as ex: raise solutions = Term() goal_aux = Term('findnsols', [Term(n), goal, goal, solutions]) qid = PL_open_query(None, self.err_flags, self.p['call_with_time_limit'], Termv([Term(0.01), goal_aux]).ref) result = [] if PL_next_solution(qid): for solution in solutions: fid_solution = PL_open_foreign_frame() PL_unify(goal.ref, solution.ref) variables = {} for var in var_names: name, value = Term(), Term() PL_get_arg(1, var.ref, name.ref) PL_get_arg(2, var.ref, value.ref) variables[str(name)] = str(value) result.append(variables) PL_rewind_foreign_frame(fid_solution) else: self.exception(qid) PL_close_query(qid) PL_discard_foreign_frame(fid) return result # Basic sanity check. if __name__ == '__main__': prolog = PrologEngine() prolog.load('a(1). a(2). a(3).', 'test') n = 1 while True: solution = prolog.query('a(X), a(Y), X\=Y', 'test', n=n) if solution is None or len(solution) != n: print('{} solutions found!'.format(n-1)) break print(solution[-1]) n += 1