summaryrefslogtreecommitdiff
path: root/monkey/monkey.py
blob: e18563043753be411756e0299a4bd2096c1bb8d1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#!/usr/bin/python3

import math
import time

from .edits import classify_edits
from prolog.engine import test
from prolog.util import Token, compose, decompose, map_vars, rename_vars, stringify
from .util import PQueue

# Starting from [code], find a sequence of edits that transforms it into a
# correct predicate for [name]. Append [aux_code] when testing (available facts
# and predicates).
# Return (solution, edits, time spent, #programs checked). If no solution is
# found within [timeout] seconds, solution='' and edits=[].
def fix(name, code, edits, program_lines, aux_code='', timeout=30, debug=False):
    # A dictionary of edits with costs for each edit type (insert, remove or
    # change a line). Edits are tuples (before, after), where before and after
    # are sequences of tokens. Variable names are normalized to A0, A1, A2,….
    inserts, removes, changes = classify_edits(edits)

    # Generate states that can be reached from the given program with one edit.
    # Program code is given as a list of [lines], where each line is a list of
    # tokens. Rule ranges are given in [rules] (see prolog.util.decompose).
    def step(lines, rules, prev=None):
        # Apply edits in order from top to bottom; skip lines with index lower
        # than last step.
        start_line = prev[1] if prev else 0

        for start, end in rules:
            rule_lines = lines[start:end]
            rule_tokens = [t for line in rule_lines for t in line]

            for line_idx in range(start, end):
                if line_idx < start_line:
                    continue
                line = lines[line_idx]
                line_normal = tuple(rename_vars(line))

                seen = False
                # Apply each edit that matches this line.
                for (before, after), cost in changes.items():
                    if line_normal == before:
                        seen = True
                        mapping = map_vars(before, after, line, rule_tokens)
                        after_real = tuple([t if t.type != 'VARIABLE' else Token('VARIABLE', mapping[t.val]) for t in after])
                        new_lines = lines[:line_idx] + (after_real,) + lines[line_idx+1:]
                        new_step = ('change_line', line_idx, (tuple(line), after_real))

                        yield (new_lines, rules, new_step, cost)

                # Remove the current line.
                if line_normal in removes.keys() or not seen:
                    new_lines = lines[:line_idx] + lines[line_idx+1:]
                    new_rules = []
                    for old_start, old_end in rules:
                        new_start, new_end = (old_start - (0 if old_start <= line_idx else 1),
                                              old_end - (0 if old_end <= line_idx else 1))
                        if new_end > new_start:
                            new_rules.append((new_start, new_end))
                    new_step = ('remove_line', line_idx, (tuple(line), ()))
                    new_cost = removes[line_normal] if line_normal in removes.keys() else 0.9

                    yield (new_lines, new_rules, new_step, new_cost)

            # Add a line at the end of the current rule.
            for after, cost in inserts.items():
                mapping = map_vars([], after, [], rule_tokens)
                after_real = [t if t.type != 'VARIABLE' else Token('VARIABLE', mapping[t.val]) for t in after]
                after_real = tuple(after_real)
                new_lines = lines[:end] + (after_real,) + lines[end:]
                new_rules = []
                for old_start, old_end in rules:
                    new_rules.append((old_start + (0 if old_start < end else 1),
                                      old_end + (0 if old_end < end else 1)))
                new_step = ('add_subgoal', end, ((), after_real))

                yield (new_lines, new_rules, new_step, cost)

        # Add a new fact at the end.
        if len(rules) < 2:
            for after, cost in inserts.items():
                new_lines = lines + (after,)
                new_rules = rules + (((len(lines), len(lines)+1)),)
                new_step = ('add_rule', len(new_lines)-1, (tuple(), tuple(after)))

                yield (new_lines, new_rules, new_step, cost)

    # Return a cleaned-up list of steps:
    #   - multiple line edits in a single line are merged into one
    #   - check if any lines can be removed from the program
    def postprocess(steps):
        new_steps = []
        i = 0
        while i < len(steps):
            if i < len(steps)-1 and \
               steps[i][0] == 'change_line' and steps[i+1][0] == 'change_line' and \
               steps[i][1] == steps[i+1][1] and steps[i][2][1] == steps[i+1][2][0]:
                new_steps.append(('change_line', steps[i][1], (steps[i][2][0], steps[i+1][2][1])))
                i += 1
            else:
                new_steps.append(steps[i])
            i += 1
        return new_steps

    # Prevents useless edits with cost=1.0.
    step_cost = 0.99

    # Main loop: best-first search through generated programs.
    todo = PQueue()  # priority queue of candidate solutions
    done = set()     # programs we have already visited

    # Each program gets a task with the sequence of edits that generated it and
    # the associated cost. First candidate with cost 1 is the initial program.
    lines, rules = decompose(code)
    todo.push(((tuple(lines), tuple(rules)), (), 1.0), -1.0)

    n_tested = 0
    start_time = time.monotonic()
    total_time = 0
    while total_time < timeout:
        # Get next candidate.
        task = todo.pop()
        if task == None:
            break
        (lines, rules), path, path_cost = task

        # If we have already seen this code, skip it.
        code = compose(lines, rules)
        if code in done:
            continue
        done.add(code)

        # Print some info about the current task.
        if debug:
            print('Cost {:.12f}'.format(path_cost))
            for step_type, line, (before, after) in path:
                print('line {}: {} {} → {}'.format(line, step_type,  stringify(before), stringify(after)))

        # If the code is correct, we are done.
        if test(name, code + '\n' + aux_code):
            path = postprocess(path)
            return code, path, total_time, n_tested
        n_tested += 1

        # Otherwise generate new solutions.
        prev_step = path[-1] if path else None
        for new_lines, new_rules, new_step, new_cost in step(lines, rules, prev_step):
            new_path_cost = path_cost * new_cost * step_cost
            if new_path_cost < 0.005:
                continue
            new_path = path + (new_step,)
            todo.push(((tuple(new_lines), tuple(new_rules)), new_path, new_path_cost), -new_path_cost)

        total_time = time.monotonic() - start_time

    return '', [], total_time, n_tested

# Return a list of character ranges modified by the sequence [edits].
def fix_ranges(edits):
    marks = []
    for step_type, line, (before, after) in edits:
        if step_type == 'change_line':
            marks.append(('change', (before[0].pos, before[-1].pos+len(before[-1].val))))
        elif step_type == 'remove_line':
            marks.append(('remove', (before[0].pos, before[-1].pos+len(before[-1].val))))
        elif step_type == 'add_subgoal' or step_type == 'add_rule':
            marks.append((step_type, line))
    return marks