summaryrefslogtreecommitdiff
path: root/monkey/monkey.py
blob: f5b69a938476f01cb6256e4417080c08bc41a904 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
#!/usr/bin/python3

import math
import time

from .edits import classify_edits
from prolog.engine import test
from prolog.util import Token, compose, decompose, map_vars, normalized, rename_vars, stringify
from .util import PQueue

# Starting from [code], find a sequence of edits that transforms it into a
# correct predicate for [name]. Append [aux_code] when testing (available facts
# and predicates).
# Return (solution, edits, time spent, #programs checked). If no solution is
# found within [timeout] seconds, solution='' and edits=[].
def fix(name, code, edits, aux_code='', timeout=30, debug=False):
    # A dictionary of edits with costs for each edit type (insert, remove or
    # change a line). Edits are tuples (before, after), where before and after
    # are sequences of tokens. Variable names are normalized to A0, A1, A2,….
    inserts, removes, changes = classify_edits(edits)

    # Generate states that can be reached from the given program with one edit.
    # Program code is given as a list of [lines], where each line is a list of
    # tokens. Rule ranges are given in [rules] (see prolog.util.decompose).
    def step(lines, rules, path=None):
        # Apply edits in order from top to bottom; skip lines with index lower
        # than last step.
        start_line = path[-1][1] if path else 0

        for start, end in rules:
            rule_lines = lines[start:end]
            rule_tokens = [t for line in rule_lines for t in line]

            # Prepend a new rule (fact) before this rule (only if no line in
            # the current rule has been modified yet).
            if start_line == 0 or start > start_line:
                for after, cost in inserts.items():
                    new_lines = lines[:start] + (after,) + lines[start:]
                    new_rules = []
                    for old_start, old_end in rules:
                        if old_start == start:
                            new_rules.append((start, start+1))
                        new_rules.append((old_start + (0 if old_start < start else 1),
                                          old_end + (0 if old_end < start else 1)))
                    new_step = ('add_rule', start, (tuple(), after))
                    # Decrease probability as we add more rules.
                    new_cost = cost * math.pow(0.3, len(rules))

                    yield (new_lines, new_rules, new_step, new_cost)

            # Apply some edits for each line in this rule.
            for line_idx in range(start, end):
                if line_idx < start_line:
                    continue
                line = lines[line_idx]
                line_normal = tuple(rename_vars(line))

                # Apply edits whose left-hand side matches this line.
                seen = False  # has there been such an edit?
                for (before, after), cost in changes.items():
                    if line_normal == before:
                        seen = True
                        mapping = map_vars(before, after, line, rule_tokens)
                        after_real = tuple([t if t.type != 'VARIABLE'
                                              else Token('VARIABLE', mapping[t.val])
                                              for t in after])
                        new_lines = lines[:line_idx] + (after_real,) + lines[line_idx+1:]
                        new_step = ('change_line', line_idx, (tuple(line), after_real))

                        yield (new_lines, rules, new_step, cost)

                # Remove the current line.
                if line_normal in removes.keys() or not seen:
                    new_lines = lines[:line_idx] + lines[line_idx+1:]
                    new_rules = []
                    for old_start, old_end in rules:
                        new_start, new_end = (old_start - (0 if old_start <= line_idx else 1),
                                              old_end - (0 if old_end <= line_idx else 1))
                        if new_end > new_start:
                            new_rules.append((new_start, new_end))
                    new_step = ('remove_line', line_idx, (tuple(line), ()))
                    new_cost = removes.get(line_normal, 1.0) * 0.99

                    yield (new_lines, new_rules, new_step, new_cost)

                # Add a line after this one.
                for after, cost in inserts.items():
                    # Don't try to insert a head into the body.
                    if after[-1].type == 'FROM':
                        continue
                    mapping = map_vars([], after, [], rule_tokens)
                    after_real = tuple([t if t.type != 'VARIABLE'
                                          else Token('VARIABLE', mapping[t.val])
                                          for t in after])

                    idx = line_idx+1
                    new_lines = lines[:idx] + (after_real,) + lines[idx:]
                    new_rules = []
                    for old_start, old_end in rules:
                        new_rules.append((old_start + (0 if old_start < idx else 1),
                                          old_end + (0 if old_end < idx else 1)))
                    new_step = ('add_subgoal', idx, ((), after_real))
                    new_cost = cost * 0.4

                    yield (new_lines, new_rules, new_step, new_cost)

    # Return a cleaned-up list of steps:
    #   - multiple line edits in a single line are merged into one
    #   - check if any lines can be removed from the program
    def postprocess(steps):
        new_steps = []
        for step in steps:
            if new_steps:
                prev = new_steps[-1]

                if prev[1] == step[1] and \
                   prev[0] in ('add_rule', 'add_subgoal', 'change_line') and \
                   step[0] == 'change_line' and \
                   normalized(prev[2][1]) == normalized(step[2][0]):
                    new_steps[-1] = (prev[0], prev[1], (prev[2][0], step[2][1]))
                    continue

                if prev[0] == 'add_subgoal' and step[0] == 'remove_line' and \
                   prev[1]+1 == step[1]:
                    new_steps[-1] = ('change_line', prev[1], (step[2][0], prev[2][1]))
                    continue

            new_steps.append(step)
        return new_steps

    # Main loop: best-first search through generated programs.
    todo = PQueue()  # priority queue of candidate solutions
    done = set()     # programs we have already visited

    # Each program gets a task with the sequence of edits that generated it and
    # the associated cost. First candidate with cost 1 is the initial program.
    lines, rules = decompose(code)
    todo.push(((tuple(lines), tuple(rules)), (), 1.0), -1.0)

    n_tested = 0
    start_time = time.monotonic()
    total_time = 0
    while total_time < timeout:
        # Get next candidate.
        task = todo.pop()
        if task == None:
            break
        (lines, rules), path, path_cost = task

        # If we have already seen this code, skip it.
        code = compose(lines, rules)
        if code in done:
            continue
        done.add(code)

        # Print some info about the current task.
        if debug:
            print('Cost {:.12f}'.format(path_cost))
            for step_type, line, (before, after) in path:
                print('line {}: {} {} → {}'.format(line, step_type, stringify(before), stringify(after)))

        # If the code is correct, we are done.
        if test(name, code + '\n' + aux_code):
            path = postprocess(path)
            return code, path, total_time, n_tested
        n_tested += 1

        # Otherwise generate new solutions.
        for new_lines, new_rules, new_step, new_cost in step(lines, rules, path):
            new_path_cost = path_cost * new_cost
            if new_path_cost < 0.01:
                continue
            new_path = path + (new_step,)
            todo.push(((tuple(new_lines), tuple(new_rules)), new_path, new_path_cost), -new_path_cost)

        total_time = time.monotonic() - start_time

    return '', [], total_time, n_tested