#!/usr/bin/python3 import re from prolog.core import * from prolog.util import * class Atom(object): __slots__ = 'ref' def __init__(self, src=None, ref=None): if ref != None: self.ref = ref else: self.ref = PL_new_atom(bytes(src, encoding=encoding)) class Term(object): __slots__ = 'ref' def __init__(self, src=None, ref=None): if ref != None: self.ref = ref else: self.ref = PL_new_term_ref() if isinstance(src, str): if not PL_chars_to_term(bytes(src, encoding=encoding), self.ref): raise ValueError("invalid compound term") elif isinstance(src, int): PL_put_integer(self.ref, src) elif isinstance(src, float): PL_put_float(self.ref, src) elif isinstance(src, list): PL_put_nil(self.ref) for t in src: PL_cons_list(self.ref, t.ref, self.ref) elif isinstance(src, tuple) and len(src) == 2: name = PL_new_atom(bytes(src[0], encoding=encoding)) args = src[1] PL_cons_functor_v(self.ref, PL_new_functor(name, len(args)), args.ref) elif isinstance(src, Atom): PL_put_atom(self.ref, src.ref) def __str__(self): ptr = c_char_p() if PL_get_chars(self.ref, byref(ptr), CVT_WRITE|BUF_RING): return str(ptr.value, encoding=encoding) class Termv(object): __slots__ = 'ref', 'size' def __init__(self, terms): self.size = len(terms) self.ref = PL_new_term_refs(self.size) for i in range(len(terms)): PL_put_term(self.ref+i, terms[i].ref) def __len__(self): return self.size def __getitem__(self, i): if not isinstance(i, int) or len(self) == 0 or i < 0 or i >= len(self): raise TypeError return Term(ref=self.ref+i) def __iter__(self): for i in range(len(self)): yield Term(ref=self.ref+i) return def __str__(self): return '[' + ','.join([str(term) for term in self]) + ']' class PrologEngine(object): def __init__(self): # initialize Prolog library args = ['./', '-q', '--nosignals'] if SWI_HOME_DIR is not None: args.append('--home={0}'.format(SWI_HOME_DIR)) s_plargs = len(args) plargs = (c_char_p*s_plargs)() for i in range(s_plargs): plargs[i] = bytes(args[i], encoding) if not PL_initialise(s_plargs, plargs): raise EnvironmentError('Could not initialize Prolog environment.' 'PL_initialise returned {0}'.format(result)) # construct some predicates self.p = { 'assertz': PL_predicate(b'assertz', 2, None), 'erase': PL_predicate(b'erase', 1, None), 'call_with_depth_limit': PL_predicate(b'call_with_depth_limit', 3, None), 'call_with_time_limit': PL_predicate(b'call_with_time_limit', 2, None), 'consult': PL_predicate(b'consult', 1, None), 'message_to_string': PL_predicate(b'message_to_string', 2, None), 'read_term_from_atom': PL_predicate(b'read_term_from_atom', 3, None), 'safe_goal': PL_predicate(b'safe_goal', 1, None), 'set_prolog_stack': PL_predicate(b'set_prolog_stack', 2, None) } self.err_flags = PL_Q_NODEBUG|PL_Q_CATCH_EXCEPTION # database of current asserts; each load gets a new index self.refs = {} self.max_index = 0 # increase memory limits self.call('set_prolog_stack', [Term('global'), Term('limit(2*10**9)')]) self.call('set_prolog_stack', [Term('local'), Term('limit(2*10**9)')]) # discard messages from swipl library self.load('message_hook(_, _, _). ') def call(self, name, args): return PL_call_predicate(None, self.err_flags, self.p[name], Termv(args).ref) == 1 def consult(self, filename): return self.call('consult', [Term(Atom(filename))]) def load(self, program, module=None): tokens = tokenize(program) refs = [] try: start = 0 for idx in range(len(tokens)): if tokens[idx] != ('PERIOD', '.') or idx - start <= 1: continue rule = stringify(tokens[start:idx]) start = idx + 1 if module != None: rule = module + ':(' + rule + ')' Ref = Term('Ref') if self.call('assertz', [Term(rule), Ref]): refs.append(Ref) except: # unload already loaded rules (hacky, mustfix, todo) for ref in refs: self.call('erase', [ref]) return None self.max_index += 1 self.refs[self.max_index] = refs return self.max_index def unload(self, index): for ref in self.refs[index]: self.call('erase', [ref]) del self.refs[index] def query(self, q, module=None): if module != None: q = module + ':(' + q + ')' fid = PL_open_foreign_frame() goal = Term() var_list = Term() options = Term([Term(('variable_names', Termv([var_list])))]) # parse term if not self.call('read_term_from_atom', [Term(Atom(q)), goal, options]): sys.stderr.write('Warning: Could not read term from {}\n'.format(q)) return None # check if goal is safe with currently loaded rules if not self.call('safe_goal', [goal]): sys.stderr.write('Warning: Unsafe goal: {}\n'.format(q)) return None solutions = [] #depth_limit = Term(('call_with_depth_limit', Termv([goal, Term(50), Term('_')]))) qid = PL_open_query(None, self.err_flags, self.p['call_with_time_limit'], Termv([Term(0.01), goal]).ref) while len(solutions) < 10: if PL_next_solution(qid): # get variable values in this solution as strings variables = {} head, tail = Term(), Term() while PL_get_list(var_list.ref, head.ref, tail.ref): name, value = Term(), Term() PL_get_arg(1, head.ref, name.ref) PL_get_arg(2, head.ref, value.ref) variables[str(name)] = str(value) var_list = tail solutions += [variables] else: ex = PL_exception(qid) if ex != None: ex = Term(ref=PL_exception(qid)) if ex != None: Msg = Term('Msg') if self.call('message_to_string', [ex, Msg]): sys.stderr.write('Error: ' + str(Msg) + '\n') else: sys.stderr.write('Unknown error\n') break PL_close_query(qid) PL_discard_foreign_frame(fid) return solutions