summaryrefslogtreecommitdiff
path: root/prolog/engine.py
diff options
context:
space:
mode:
Diffstat (limited to 'prolog/engine.py')
-rw-r--r--prolog/engine.py384
1 files changed, 54 insertions, 330 deletions
diff --git a/prolog/engine.py b/prolog/engine.py
index 597ebc4..6627b9a 100644
--- a/prolog/engine.py
+++ b/prolog/engine.py
@@ -1,339 +1,63 @@
#!/usr/bin/python3
-from prolog.core import *
-import prolog.util
+import http.client
+import json
+import urllib
-class Atom(object):
- __slots__ = 'ref'
-
- def __init__(self, val=None, ref=None):
- if ref is not None:
- self.ref = ref
- return
- self.ref = PL_new_atom(bytes(val, encoding=encoding))
-
-class Term(object):
- __slots__ = 'ref'
-
- # Initialize from term reference [ref] if given, otherwise construct a new
- # term from [val] and possibly [args].
- def __init__(self, val=None, args=None, ref=None):
- if ref is not None:
- self.ref = ref
- return
- self.ref = PL_new_term_ref()
- if isinstance(val, str):
- if args is not None:
- # Explicitly constructed compound term with name [val] and arguments [args].
- name = PL_new_atom(bytes(val, encoding=encoding))
- PL_cons_functor_v(self.ref, PL_new_functor(name, len(args)), Termv(args).ref)
- else:
- # Parse term from [val].
- if not PL_chars_to_term(bytes(val, encoding=encoding), self.ref):
- raise ValueError('invalid compound term')
- elif isinstance(val, int):
- PL_put_integer(self.ref, val)
- elif isinstance(val, float):
- PL_put_float(self.ref, val)
- elif isinstance(val, list):
- PL_put_nil(self.ref)
- for t in val:
- PL_cons_list(self.ref, t.ref, self.ref)
- elif isinstance(val, Atom):
- PL_put_atom(self.ref, val.ref)
-
- def __iter__(self):
- if not PL_is_list(self.ref):
- raise TypeError('term is not a list')
- ref = self.ref
- while True:
- head, tail = Term(), Term()
- if not PL_get_list(ref, head.ref, tail.ref):
- break
- yield head
- ref = tail.ref
-
- def __str__(self):
- ptr = c_char_p()
- if PL_get_chars(self.ref, byref(ptr), CVT_WRITE|BUF_RING):
- return str(ptr.value, encoding=encoding)
-
-class Termv(object):
- __slots__ = 'ref'
-
- def __init__(self, terms):
- self.ref = PL_new_term_refs(len(terms))
- for i, term in enumerate(terms):
- PL_put_term(self.ref+i, term.ref)
-
-class Problem(object):
- def __init__(self, name, solution, facts, tests):
- self.name = name
- self.solution = solution
- self.facts = facts
- self.tests = {t: None for t in tests}
- self.answers = {}
+address, port = 'localhost', 3030
class PrologEngine(object):
- def __init__(self):
- # Initialize the swipl library.
- args = ['./', '-q', '--nosignals']
- if SWI_HOME_DIR is not None:
- args.append('--home={0}'.format(SWI_HOME_DIR))
- s_plargs = len(args)
- plargs = (c_char_p*s_plargs)()
- for i in range(s_plargs):
- plargs[i] = bytes(args[i], encoding)
- if not PL_initialise(s_plargs, plargs):
- raise EnvironmentError('Could not initialize Prolog environment.'
- 'PL_initialise returned {0}'.format(result))
-
- # Construct some predicates.
- self.p = {
- 'abolish/1': PL_predicate(b'abolish', 1, None),
- 'add_import_module/3': PL_predicate(b'add_import_module', 3, None),
- 'arg/3': PL_predicate(b'arg', 3, None),
- 'assertz/1': PL_predicate(b'assertz', 1, None),
- 'call_with_time_limit/2': PL_predicate(b'call_with_time_limit', 2, None),
- 'compile_predicates/1': PL_predicate(b'compile_predicates', 1, None),
- 'consult/1': PL_predicate(b'consult', 1, None),
- 'functor/3': PL_predicate(b'functor', 3, None),
- 'message_to_string/2': PL_predicate(b'message_to_string', 2, None),
- 'read_term_from_atom/3': PL_predicate(b'read_term_from_atom', 3, None),
- 'safe_goal/1': PL_predicate(b'safe_goal', 1, None),
- 'set_prolog_flag/2': PL_predicate(b'set_prolog_flag', 2, None),
- 'set_prolog_stack/2': PL_predicate(b'set_prolog_stack', 2, None),
- 'use_module/1': PL_predicate(b'use_module', 1, None)
- }
- self.err_flags = PL_Q_NODEBUG|PL_Q_CATCH_EXCEPTION
-
- # Load the sandbox and compatibility library.
- self.call('consult/1', [Term(Atom('prolog/lib.pl'))])
-
- # Load the time module (for call_with_time_limit) then disable autoload.
- self.call('use_module/1', [Term('library(random)')])
- self.call('use_module/1', [Term('library(time)')])
- self.call('set_prolog_flag/2', [Term('autoload'), Term('false')])
-
- # Increase memory limits.
- self.call('set_prolog_stack/2', [Term('global'), Term('limit(2*10**9)')])
- self.call('set_prolog_stack/2', [Term('local'), Term('limit(2*10**9)')])
-
- # Discard messages from the swipl library.
- self.call('assertz/1', [Term('message_hook(_, _, _)')])
-
- # Problem data loaded with load_problem.
- self.problems = {}
-
- # The set of already loaded facts.
- self.facts = set()
-
- # Load the [solution] for problem [pid] called [name] and find answers to
- # [tests]. Also load [facts] in the main module, and import modules for
- # problems in [depends] into this problem's module.
- def load_problem(self, pid, name, solution, depends, facts, tests):
- self.problems[pid] = Problem(name, solution, facts, tests)
-
- # Load the solution in 'solution<pid>' module.
- mod_problem = 'problem{}'.format(pid)
-
- fid = PL_open_foreign_frame()
- predicates = self.load(solution, mod_problem)
- if facts and facts not in self.facts:
- predicates |= self.load(facts)
- self.facts.add(facts)
- self.call('compile_predicates/1', [Term([Term(p) for p in predicates])])
-
- # Import solutions for dependency predicates.
- for i in depends:
- mod_dependency = 'problem{}'.format(i)
- self.call('add_import_module/3', [Term(mod_problem), Term(mod_dependency), Term('end')])
-
- # Find the correct test answers.
- for query in tests:
- result = self.query(query, mod_problem)
- if result is None or len(result) < 1 or 'X' not in result[0]:
- raise Exception('Error finding correct answer to query "{}"'.format(query))
- self.problems[pid].tests[query] = result[0]['X']
- PL_discard_foreign_frame(fid)
-
- # Import the correct solution for problem [pid] into module for user [uid].
- def mark_solved(self, uid, pid):
- mod_user = 'user{}'.format(uid)
- mod_problem = 'problem{}'.format(pid)
-
- fid = PL_open_foreign_frame()
- result = self.call('add_import_module/3', [Term(mod_user), Term(mod_problem), Term('end')])
- PL_discard_foreign_frame(fid)
- return result
-
- # Get up to [n] solutions to query [q]. If there are no solutions, return
- # an empty list. Raise an exception on error (either from self.call, or due
- # to malformed/unsafe query or a timeout).
- def query(self, q, module=None, n=1):
- if module is not None:
- q = '{}:({})'.format(module, q)
-
- fid = PL_open_foreign_frame()
- qid = None
- try:
- # Parse the query and store variable names.
- goal = Term()
- var_names = Term()
- options = Term([Term('variable_names', [var_names])])
- if not self.call('read_term_from_atom/3', [Term(Atom(q)), goal, options]):
- raise Exception('Warning: Could not read term from {}\n'.format(q))
-
- # Check if goal is safe with currently loaded rules.
- if not self.call('safe_goal/1', [goal]):
- raise Exception('Warning: Unsafe goal: {}\n'.format(goal))
-
- solutions = Term()
- goal_aux = Term('findnsols', [Term(n), goal, goal, solutions])
- qid = PL_open_query(None, self.err_flags, self.p['call_with_time_limit/2'],
- Termv([Term(0.01), goal_aux]).ref)
-
- result = []
- if PL_next_solution(qid):
- solutions = list(solutions)
- fid_solution = PL_open_foreign_frame()
- for solution in solutions:
- PL_unify(goal.ref, solution.ref)
- variables = {}
- for var in var_names:
- name, value = Term(), Term()
- PL_get_arg(1, var.ref, name.ref)
- PL_get_arg(2, var.ref, value.ref)
- variables[str(name)] = str(value)
- result.append(variables)
- PL_rewind_foreign_frame(fid_solution)
- PL_discard_foreign_frame(fid_solution)
- else:
- # Check for exceptions.
- error_msg = self.error(qid)
- if error_msg:
- raise Exception(error_msg)
- finally:
- if qid:
- PL_close_query(qid)
- PL_discard_foreign_frame(fid)
-
- return result
-
- # Test whether [code] gives the same answer to [query] as the correct
- # solution for problem [pid]. The solution should be loaded beforehand.
- def test(self, uid, pid, code):
- mod_user = 'user{}'.format(uid)
-
- fid = PL_open_foreign_frame()
- correct = True
- predicates = set()
- try:
- self.load(code, mod_user, predicates)
- for query, answer in sorted(self.problems[pid].tests.items()):
- result = self.query(query, mod_user, n=1)
- if len(result) != 1 or result[0]['X'] != answer:
- correct = False
- break
-
- # If a correct solution was found, see if another (incorrect)
- # solution is found in the first 10 answers.
- try:
- result = self.query(query, mod_user, n=10)
- unique = set([r['X'] for r in result])
- if len(unique) != 1:
- correct = False
- break
- except Exception as ex:
- # Only a timeout exception can occur here; in this case, we
- # consider [code] correct.
- pass
- except Exception as ex:
- correct = False
-
- self.unload(predicates)
- PL_discard_foreign_frame(fid)
-
- return correct
-
- # Call the Prolog predicate [name]. Raise an exception on error. Since this
- # creates a Termv object, it should be called within an open foreign frame.
- def call(self, name, args):
- qid = PL_open_query(None, self.err_flags, self.p[name], Termv(args).ref)
- try:
- if not PL_next_solution(qid):
- error_msg = self.error(qid)
- if error_msg:
- raise Exception(error_msg)
- return False
- finally:
- PL_cut_query(qid)
- return True
-
- # Load rules from [program] into [module] and return the corresponding
- # predicate names. Since this function might not return due to exception,
- # the [predicates] argument can be passed where the names will be stored.
- def load(self, program, module=None, predicates=None):
- if predicates is None:
- predicates = set()
- for rule in prolog.util.split(program):
- name = self.predicate_indicator(rule)
- if module:
- rule = '{}:({})'.format(module, rule)
- name = '{}:{}'.format(module, name)
- self.call('assertz/1', [Term(rule)])
- predicates.add(name)
- return predicates
-
- # Unload and remove the "dynamic" property for all rules implementing
- # [predicates].
- def unload(self, predicates):
- for predicate in predicates:
- self.call('abolish/1', [Term(predicate)])
-
- # Return a description of the last exception, or None if no error occurred.
- def error(self, qid):
- error_ref = PL_exception(qid)
- if not error_ref:
- return None
- PL_clear_exception()
-
- # Get the Prolog error message.
- fid = PL_open_foreign_frame()
- msg = Term()
- if PL_call_predicate(None, self.err_flags, self.p['message_to_string/2'],
- Termv([Term(ref=error_ref), msg]).ref):
- error_str = str(msg)
+ def __init__(self, address=address, port=port, code=''):
+ self.id = None
+ self.conn = http.client.HTTPConnection(address, port, timeout=10)
+
+ hdrs = {'Content-Type': 'application/json; charset=utf-8'}
+ opts = json.dumps({'destroy': False, 'src_text': code, 'format': 'json'})
+ self.conn.request('POST', '/pengine/create', body=opts, headers=hdrs)
+
+ response = self.conn.getresponse()
+ data = response.read()
+ reply = json.loads(str(data, encoding='utf-8'))
+ if reply['event'] == 'create':
+ self.id = reply['id']
else:
- error_str = 'Unknown error'
- PL_discard_foreign_frame(fid)
-
- return error_str
-
- # Return the main functor defined by [clause], e.g. dup/2.
- def predicate_indicator(self, clause):
- # Return the main functor for [term].
- def main_functor(term):
- name = Term()
- arity = Term()
- self.call('functor/3', [term, name, arity])
- return "'{}'/{}".format(name, arity)
-
- fid = PL_open_foreign_frame()
- clause = Term(clause)
- functor = main_functor(clause)
- # Check whether [clause] is a rule or a fact.
- if functor == "':-'/2":
- # [clause] is a rule, return the main functor for the head.
- head = Term()
- self.call('arg/3', [Term(1), clause, head])
- functor = main_functor(head)
- PL_discard_foreign_frame(fid)
- return functor
+ raise Exception('could not create engine: ' + reply['code'])
+
+ def send(self, event, fmt='json'):
+ params = urllib.parse.urlencode({
+ 'id': self.id,
+ 'event': event,
+ 'format': fmt})
+ self.conn.request('GET', '/pengine/send?' + params)
+
+ response = self.conn.getresponse()
+ data = str(response.read(), encoding='utf-8')
+ if response.status == http.client.OK:
+ return json.loads(data) if fmt == 'json' else data
+ return None
+
+ def ask(self, query, template='', fmt='json'):
+ event = 'ask(({}),[template({})])'.format(query, template)
+ reply = self.send(event, fmt=fmt)
+ return reply
+
+ def next(self, n=1):
+ event = 'next({})'.format(n)
+ reply = self.send(event)
+ return reply
+
+ def stop(self):
+ return self.send('stop')
+
+ def destroy(self):
+ reply = self.send('destroy')
+ self.id = None
+ self.conn.close()
+ self.conn = None
# Basic sanity check.
if __name__ == '__main__':
- engine = PrologEngine()
- engine.load_solution(0, 'a(2). a(2). a(3). ')
- result = engine.test(0, 0, 'a(2). ', ['a(X)', 'a(Y), Y=X'])
- print('{}: {}'.format(i, result))
+ engine = PrologEngine(code='dup([],[]). dup([H|T],[H,H|TT]) :- dup(T,TT).')
+ print('engine id is ' + engine.id)
+ print(engine.ask("run_tests({},'{}',Result)".format('dup/2', engine.id)))
+ engine.destroy()