summaryrefslogtreecommitdiff
path: root/monkey/monkey.py
blob: cd44baca91012aa898811699e1177d3de849b370 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
#!/usr/bin/python3

import math
import time

from .edits import classify_edits
from .prolog.engine import test
from .prolog.util import compose, decompose, map_vars, rename_vars, stringify
from .util import PQueue, Token

# Starting from [code], find a sequence of [edits] that transforms it into a
# correct predicate for [name]. Append [aux_code] when testing (available facts
# and predicates).
# Return (solution, edits, time spent, #programs checked). If no solution is
# found within [timeout] seconds, solution='' and edits=[].
def fix(name, code, edits, aux_code='', timeout=30, debug=False):
    todo = PQueue()  # priority queue of candidate solutions
    done = set()     # programs we have already visited

    inserts, removes, changes = classify_edits(edits)

    # Generate states that can be reached from the given program with one edit.
    # Program code is given as a list of [lines], where each line is a list of
    # tokens. Rule ranges are given in [rules] (see prolog.util.decompose).
    def step(lines, rules, prev=None):
        # Apply edits in order from top to bottom; skip lines with index lower
        # than last step.
        start_line = prev[1] if prev else 0

        for start, end in rules:
            rule_lines = lines[start:end]
            rule_tokens = [t for line in rule_lines for t in line]

            for line_idx in range(start, end):
                if line_idx < start_line:
                    continue
                line = lines[line_idx]

                line_normal = list(line)
                rename_vars(line_normal)
                line_normal = tuple(line_normal)

                seen = False
                # Apply each edit that matches this line.
                for (before, after), cost in changes.items():
                    if line_normal == before:
                        seen = True
                        mapping = map_vars(before, after, line, rule_tokens)
                        after_real = tuple([t if t.type != 'VARIABLE' else Token('VARIABLE', mapping[t.val]) for t in after])
                        new_lines = lines[:line_idx] + (after_real,) + lines[line_idx+1:]
                        new_step = ('change_line', line_idx, (tuple(line), after_real))

                        yield (new_lines, rules, new_step, cost)

                # Remove the current line.
                if line_normal in removes.keys() or not seen:
                    new_lines = lines[:line_idx] + lines[line_idx+1:]
                    new_rules = []
                    for old_start, old_end in rules:
                        new_start, new_end = (old_start - (0 if old_start <= line_idx else 1),
                                              old_end - (0 if old_end <= line_idx else 1))
                        if new_end > new_start:
                            new_rules.append((new_start, new_end))
                    new_step = ('remove_line', line_idx, (tuple(line), ()))
                    new_cost = removes[line_normal] if line_normal in removes.keys() else 0.9

                    yield (new_lines, new_rules, new_step, new_cost)

            # Add a line to the current rule.
            for after, cost in inserts.items():
                mapping = map_vars([], after, [], rule_tokens)
                after_real = [t if t.type != 'VARIABLE' else Token('VARIABLE', mapping[t.val]) for t in after]
                after_real = tuple(after_real)
                new_lines = lines[:end] + (after_real,) + lines[end:]
                new_rules = []
                for old_start, old_end in rules:
                    new_rules.append((old_start + (0 if old_start < end else 1),
                                      old_end + (0 if old_end < end else 1)))
                new_step = ('add_subgoal', end, ((), after_real))

                yield (new_lines, new_rules, new_step, cost)

        # Add a new fact at the end.
        if len(rules) < 2:
            for after, cost in inserts.items():
                new_lines = lines + (after,)
                new_rules = rules + (((len(lines), len(lines)+1)),)
                new_step = ('add_rule', len(new_lines)-1, (tuple(), tuple(after)))

                yield (new_lines, new_rules, new_step, cost)

    # Return a cleaned-up list of steps:
    #   - multiple line edits in a single line are merged into one
    #   - check if any lines can be removed from the program
    def postprocess(steps):
        new_steps = []
        i = 0
        while i < len(steps):
            if i < len(steps)-1 and \
               steps[i][0] == 'change_line' and steps[i+1][0] == 'change_line' and \
               steps[i][1] == steps[i+1][1] and steps[i][2][1] == steps[i+1][2][0]:
                new_steps.append(('change_line', steps[i][1], (steps[i][2][0], steps[i+1][2][1])))
                i += 1
            else:
                new_steps.append(steps[i])
            i += 1
        return new_steps

    # Add the initial program to the queue.
    lines, rules = decompose(code)
    todo.push(((tuple(lines), tuple(rules)), (), 1.0), -1.0)

    start_time = time.monotonic()
    n_tested = 0
    while True:
        # Have we been at it for too long?
        total_time = time.monotonic() - start_time
        if total_time > timeout:
            break

        # Get next candidate.
        task = todo.pop()
        if task == None:
            break
        (lines, rules), path, path_cost = task

        # If we have already seen this code, skip it.
        code = compose(lines, rules)
        if code in done:
            continue
        done.add(code)

        if debug:
            print('Cost {:.12f}'.format(path_cost))
            for step_type, line, (before, after) in path:
                print('line {}: {} {} → {}'.format(line, step_type,  stringify(before), stringify(after)))

        # If the code is correct, we are done.
        if test(name, code + '\n' + aux_code):
            path = postprocess(path)
            return code, path, total_time, n_tested
        n_tested += 1

        # Otherwise generate new solutions.
        path = list(path)
        prev_step = path[-1] if path else None
        for new_lines, new_rules, new_step, new_cost in step(lines, rules, prev_step):
            new_path_cost = path_cost * new_cost * 0.99
            if new_path_cost < 0.005:
                continue
            new_path = tuple(path + [new_step])
            todo.push(((tuple(new_lines), tuple(new_rules)), new_path, new_path_cost), -new_path_cost)

    return '', [], total_time, n_tested

# Return a list of character ranges modified by the sequence [edits].
def fix_ranges(edits):
    marks = []
    for step_type, line, (before, after) in edits:
        if step_type == 'change_line':
            marks.append(('change', (before[0].pos, before[-1].pos+len(before[-1].val))))
        elif step_type == 'remove_line':
            marks.append(('remove', (before[0].pos, before[-1].pos+len(before[-1].val))))
        elif step_type == 'add_subgoal' or step_type == 'add_rule':
            marks.append((step_type, line))
    return marks