summaryrefslogtreecommitdiff
path: root/server/prolog_session.py
blob: ff44a569db18bc062bf6266057ce16ac4305721e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# coding=utf-8

import operator
import threading
import prolog.engine
from db.models import CodeqUser, Problem
from . import problems

__all__ = ['PrologSession']

def format_prolog_output(reply, output):
    messages = [text for text in map(operator.itemgetter(1), output)]
    # When an engine is destroyed, a nested data object has the actual query result.
    event = reply['event']
    if event == 'destroy':
        reply = reply['data']
        event = reply['event']

    if event == 'success':
        messages.append(prolog.engine.pretty_vars(reply['data'][0]))
        return messages, 'ok', True and reply['more']
    if event == 'failure':
        messages.append('false')
        return messages, 'ok', False
    if event == 'error':
        # Remove potential module name (engine ID) from the error message.
        messages.append('error: ' + reply['data'].replace("'{}':".format(reply['id']), ''))
        return messages, 'error', False

    return messages, 'ok', False  # TODO: is it possible to reach this return statement?


class PrologSession(object):
    """Abstracts a Prolog session.
    Only public methods are available to the outside world due to the use of multiprocessing managers.
    Therefore prefix any private methods with an underscore (_).
    No properties are accessible; use getters and setters instead.
    Values are passed by value instead of by reference (deep copy!).
    """
    def __init__(self):
        self._access_lock = threading.Lock()
        self._engine_id = None
        self._problem_id = -1

    def run(self, code):
        with self._access_lock:
            if self._engine_id is not None:
                prolog.engine.stop(self._engine_id)
                self._engine_id = None
            engine_id, output = prolog.engine.create(code=code)
            if not engine_id:
                raise Exception('System error: could not create a prolog engine')
            self._engine_id = engine_id
            messages = [text for text in map(operator.itemgetter(1), output)]
            status = 'error' if 'error' in map(operator.itemgetter(0), output) else 'ok'
            return messages, status, False

    def query(self, query):
        with self._access_lock:
            if self._engine_id is None:
                return ['Prolog is not running'], 'error', False
            try:
                return format_prolog_output(*prolog.engine.ask(self._engine_id, query))
            except Exception as e:
                return [str(e)], 'error', False

    def step(self):
        with self._access_lock:
            if self._engine_id is None:
                return ['Prolog is not running'], 'error', False
            try:
                return format_prolog_output(*prolog.engine.next(self._engine_id))
            except Exception as e:
                return [str(e)], 'error', False

    def end(self):
        """Stops the Prolog engine."""
        with self._access_lock:
            if self._engine_id is not None:
                prolog.engine.destroy(self._engine_id)
                self._engine_id = None
                self._problem_id = -1
            return [], 'ok', False

    def __del__(self):
        #  no locking needed if GC is removing us, as there cannot be any concurrent access by definition
        if hasattr(self, '_engine_id') and (self._engine_id is not None):
            prolog.engine.destroy(self._engine_id)
            self._engine_id = None

    def test(self, user_id, problem_id, program):
        language, problem_group, problem = Problem.identifier(problem_id)
        problem_module = problems.load_problem(language, problem_group, problem, 'common')

        solved_problems = [p for p in CodeqUser.solved_problems(user_id, language)
                             if p != (problem_group, problem)]
        other_solutions = problems.solutions_for_problems(language, solved_problems)
        code = program + '\n' + other_solutions

        try:
            n_correct, n_all = problem_module.test(code)
            return ['Passed {} / {} tests.'.format(n_correct, n_all)]
        except AttributeError as ex:
            return ['No test is defined for this problem']

    def run_for_user(self, user_id, problem_id, program, query):
        """A "shorthand" method to start a Prolog session, load correct solutions of all user's solved
        problems and the given program, and ask a query.
        """
        language, problem_group, problem = Problem.identifier(problem_id)
        problem_module = problems.load_problem(language, problem_group, problem, 'common')

        solved_problems = [p for p in CodeqUser.solved_problems(user_id, language)
                                   if p != (problem_group, problem)]
        other_solutions = problems.solutions_for_problems(language, solved_problems)
        problem_facts = problems.get_facts(language, problem_module) or ''
        code = program + '\n' + other_solutions + '\n' + problem_facts

        messages, status, have_more = self.run(code)
        if status == 'ok':
            more_messages, status, have_more = self.query(query)
            messages.extend(more_messages)
        self._problem_id = problem_id
        return messages, status, have_more

    def get_problem_id(self):
        return self._problem_id