summaryrefslogtreecommitdiff
path: root/monkey/__init__.py
blob: 09f77b0c0b9c47a3e7047d951e409e13f299fc78 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
# CodeQ: an online programming tutor.
# Copyright (C) 2015 UL FRI
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import math
import time
import prolog.engine

from .edits import classify_edits
from prolog.util import Token, annotate, compose, map_vars, normalized, rename_vars, stringify
from .util import damerau_levenshtein, PQueue

# Starting from [code], find a sequence of edits that transforms it into a
# correct predicate for [name]. The [test] function is used to test generated
# programs.
# Return (solution, edits, time spent, #programs checked). If no solution is
# found within [timeout] seconds, return solution='' and edits=[].
def fix(name, code, edits, test, timeout=30, debug=False):
    # A dictionary of edits with costs for each edit type (insert, remove or
    # change a line). Edits are tuples (before, after), where before and after
    # are sequences of tokens. Variable names are normalized to A0, A1, A2,….
    inserts, removes, changes = classify_edits(edits)

    # Generate states that can be reached from the given program with one edit.
    # The program is given as a list of tokens.
    def step(program, path=None):
        # Apply edits to program in order; skip tokens with index lower than
        # last step.
        start = path[-1][4] if path else 0

        for i, token in enumerate(program):
            # Get variable names in the current rule.
            if i == 0 or program[i-1].type == 'PERIOD':
                variables = []
                for t in program[i:]:
                    if t.type == 'VARIABLE' and not t.val.startswith('_'):
                        if t.val not in variables:
                            variables.append(t.val)
                    elif t.type == 'PERIOD':
                        break

            # Remember the first token in the current part.
            if i == 0 or program[i-1].stop:
                first = i

            # Don't edit already modified parts of the program.
            if i < start:
                continue

            # Add a new fact at beginning or after another rule.
            if i == 0 or token.type == 'PERIOD':
                new_start = i+1 if token.type == 'PERIOD' else 0
                n_rules = program.count(Token('PERIOD', '.'))
                rule = 0 if i == 0 else program[i-1].rule+1  # index of new rule

                for new, cost in inserts.items():
                    # New rule must start with correct predicate name.
                    if not (new[0].type == 'NAME' and new[0].val in name):
                        continue

                    # Here we always insert a fact, so replace trailing :- with ..
                    if new[-1].type == 'FROM':
                        new = new[:-1]

                    new_real = tuple([t.clone(rule=rule, part=0) for t in new + (Token('PERIOD', '.', stop=True),)])
                    new_after = tuple([t.clone(rule=t.rule+1) for t in program[new_start:]])

                    new_program = program[:new_start] + new_real + new_after
                    new_step = ('add_rule', new_start, (), new_real, new_start)
                    new_cost = cost * math.pow(0.3, n_rules)
                    yield (new_program, new_step, new_cost)

            if token.stop and i > first:
                real_last = last = i+1
                if token.type != 'FROM':
                    real_last -= 1
                part = program[first:real_last]
                part_whole = program[first:last]
                part_normal = tuple(rename_vars(part))

                # Apply each edit a→b where a matches this part.
                seen = False  # has there been such an edit?
                for (a, b), cost in changes.items():
                    if part_normal == a:
                        seen = True
                        if b[-1].type == 'FROM':
                            b = b[:-1] + (b[-1].clone(stop=True),)
                        b_real = tuple([t.clone(rule=program[first].rule,
                                                part=program[first].part)
                                            for t in map_vars(a, b, part, variables)])

                        new_program = program[:first] + b_real + program[real_last:]
                        new_step = ('change_part', first, part, b_real, first)
                        yield (new_program, new_step, cost)


                # Remove this part.
                if token.type in ('COMMA', 'SEMI'):
                    if part_normal in removes.keys() or not seen:
                        new_after = list(program[last:])
                        for j, t in enumerate(new_after):
                            if t.rule > token.rule:
                                break
                            new_after[j] = t.clone(part=t.part-1)
                        new_program = program[:first] + tuple(new_after)
                        new_step = ('remove_part', first, part_whole, (), first-1)
                        new_cost = removes.get(part_normal, 1.0) * 0.99
                        yield (new_program, new_step, new_cost)

                # Remove this part at the end of the current rule.
                if token.type == 'PERIOD' and token.part > 0:
                    if part_normal in removes.keys() or not seen:
                        if token.part == 0:  # part is fact, remove rule
                            new_after = list(program[last+1:])
                            for j, t in enumerate(new_after):
                                new_after[j] = t.clone(rule=t.rule-1)
                            new_program = program[:first] + tuple(new_after)
                            new_step = ('remove_rule', first, part, (), first)
                        else:  # part is subgoal, remove part
                            new_after = list(program[last-1:])
                            for j, t in enumerate(new_after):
                                if t.rule > token.rule:
                                    break
                                new_after[j] = t.clone(part=t.part-1)
                            new_program = program[:first-1] + tuple(new_after)
                            new_step = ('remove_part', first-1, (program[first-1],) + part, (), first-1)

                        new_cost = removes.get(part_normal, 1.0) * 0.99
                        yield (new_program, new_step, new_cost)


                # Insert a new part (goal) after this part.
                if token.type in ('COMMA', 'FROM'):
                    for new, cost in inserts.items():
                        # Don't try to insert a head into the body.
                        if new[-1].type == 'FROM':
                            continue

                        new_real = tuple([t.clone(rule=program[first].rule,
                                                  part=program[first].part+1)
                                            for t in map_vars([], new, [], variables) + [Token('COMMA', ',', stop=True)]])

                        new_after = list(program[last:])
                        for j, t in enumerate(new_after):
                            if t.rule > token.rule:
                                break
                            new_after[j] = t.clone(rule=program[first].rule, part=t.part+1)

                        new_program = program[:last] + new_real + tuple(new_after)
                        new_step = ('add_part', last, (), new_real, last)
                        new_cost = cost * 0.4
                        yield (new_program, new_step, new_cost)

                # Insert a new part (goal) at the end of current rule.
                if token.type == 'PERIOD':
                    for new, cost in inserts.items():
                        # Don't try to insert a head into the body.
                        if new[-1].type == 'FROM':
                            continue

                        prepend = Token('FROM', ':-') if token.part == 0 else Token('COMMA', ',')
                        new_real = (prepend.clone(stop=True, rule=token.rule, part=token.part),) + \
                                   tuple([t.clone(rule=token.rule, part=token.part+1)
                                            for t in map_vars([], new, [], variables)])

                        new_after = list(program[last-1:])
                        for j, t in enumerate(new_after):
                            if t.rule > token.rule:
                                break
                            new_after[j] = t.clone(rule=t.rule, part=t.part+1)

                        new_program = program[:last-1] + new_real + tuple(new_after)
                        new_step = ('add_part', last-1, (), new_real, last)
                        new_cost = cost * 0.4
                        yield (new_program, new_step, new_cost)

    # Return a cleaned-up list of steps.
    def postprocess(steps):
        new_steps = []
        for step in steps:
            # Remove the last field from each step as it is unnecessary after a
            # path has been found.
            step = step[:4]
            if new_steps:
                prev = new_steps[-1]
                if prev[0] == 'remove_part' and step[0] == 'remove_part' and \
                   prev[1] == step[1]:
                    new_steps[-1] = ('remove_part', prev[1], prev[2]+step[2], ())
                    continue

                if prev[0] == 'remove_part' and step[0] == 'add_part' and \
                   prev[1] == step[1]:
                    new_steps[-1] = ('change_part', prev[1], prev[2], step[3])
                    continue

                if prev[0] == 'change_part' and step[0] == 'change_part' and \
                   prev[1] == step[1] and step[2] == prev[3]:
                    new_steps[-1] = ('change_part', prev[1], prev[2], step[3])
                    continue

                if prev[0] in ('add_part', 'change_part') and step[0] == 'change_part' and \
                   prev[1] == step[1] and step[2] == prev[3][:-1]:
                    new_steps[-1] = (prev[0], prev[1], prev[2], step[3]+(prev[3][-1],))
                    continue

                if prev[0] in ('add_part', 'change_part') and step[0] == 'change_part' and \
                   step[1] == prev[1]+1 and step[2] == prev[3][1:]:
                    new_steps[-1] = (prev[0], prev[1], prev[2], (prev[3][0],)+step[3])
                    continue
            new_steps.append(step)
        for step in new_steps:
            print('index {}: {} {} → {}'.format(
                step[1], step[0], stringify(step[2]), stringify(step[3])))
        return new_steps

    # Main loop: best-first search through generated programs.
    todo = PQueue()  # priority queue of candidate solutions
    done = set()     # programs we have already visited

    # Each program gets a task with the sequence of edits that generated it and
    # the associated cost. First candidate with cost 1 is the initial program.
    program = tuple(annotate(code))
    todo.push((program, (), 1.0), -1.0)

    n_tested = 0
    start_time = time.monotonic()
    total_time = 0
    while total_time < timeout:
        # Get next candidate.
        task = todo.pop()
        if task == None:
            break
        program, path, path_cost = task

        # If we have already seen this code, skip it.
        code = compose(program)
        if code in done:
            continue
        done.add(code)

        # Print some info about the current task.
        if debug:
            print('Cost {:.12f}'.format(path_cost))
            for step_type, idx, a, b, _ in path:
                print('index {}: {} {} → {}'.format(idx, step_type, stringify(a), stringify(b)))

        # If the code is correct, we are done.
        if test(code):
            path = postprocess(path)
            return code, path, total_time, n_tested
        n_tested += 1

        # Otherwise generate new solutions.
        for new_program, new_step, new_cost in step(program, path):
            new_path_cost = path_cost * new_cost
            if new_path_cost < 0.01:
                continue
            new_path = path + (new_step,)
            todo.push((new_program, new_path, new_path_cost), -new_path_cost)

        total_time = time.monotonic() - start_time

    return '', [], total_time, n_tested

# Return tuples (type, start, end, message) describing edits in [path].
def fix_hints(code, path):
    program = list(annotate(code))

    for step_type, idx, a, b in path:
        if step_type == 'add_rule':
            fix_type = 'insert'
            msg = 'Add another rule.'
            start = program[idx].pos-1
            end = start+2
        elif step_type == 'add_part':
            fix_type = 'insert'
            msg = 'Add another goal to this rule.'
            start = program[idx].pos-1
            end = start+2
        elif step_type == 'remove_rule':
            fix_type = 'remove'
            msg = 'Remove this rule.'
            start = program[idx].pos
            end = program[idx + len(a) - 1].pos
        elif step_type == 'remove_part':
            fix_type = 'remove'
            msg = 'Remove this goal.'
            start = program[idx].pos
            end = idx + len(a) - 1
            if program[end].type in ('COMMA', 'PERIOD', 'SEMI'):
                end -= 1
            end = program[end].pos + len(program[end].val)
        elif step_type == 'change_part':
            fix_type = 'change'
            msg = 'Check this part.'
            first = 0
            while idx+first < len(program)-1 and first < len(a) and first < len(b) and a[first] == b[first]:
                first += 1
            last = len(a)-1
            while last < len(b) and last >= first and a[last] == b[last]:
                last -= 1
            start = program[idx+first].pos
            end = program[idx+last].pos + len(program[idx+last].val)

        program[idx:idx+len(a)] = [t.clone(pos=program[idx].pos) for t in b]
        yield fix_type, start, end, msg


# Checks for typos in the code and suggest the nearst uploaded term by other users.
def check_typos(code, names):
    for token in annotate(code):
        if token.type == 'NAME':
            nearest_name = ' '
            nearest_dist = 1000
            own_count = names.get(token.val, 0) # count of the token.val which is compared with the
                                                # each name in the names
            for name in names.items():
                if name[0] == token.val:        # If the names are the skip the code
                    continue

                distance = damerau_levenshtein(token.val, name[0])

                if distance < nearest_dist and distance > 0  and own_count < name[1]:
                    nearest_dist = distance     # Set best_dist and best_name if the less one is found
                    nearest_name = name[0]
            if nearest_dist > 0 and nearest_dist/len(nearest_name) <= 1/3:
                yield  'typo', token.pos, token.pos + len(token.val) , 'Did you mean "{}"?'.format(nearest_name)