summaryrefslogtreecommitdiff
path: root/monkey/monkey.py
blob: 5d26c6ed42dc83fc7636c7f840545c565cf552ee (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/python3

import math
import time

from .edits import classify_edits
from .prolog.engine import test
from .prolog.util import compose, decompose, map_vars, rename_vars, stringify
from .util import PQueue, Token

# score a program (a list of lines) according to lines distribution
def score(program, lines):
    result = 1
    for line in program:
        line_normal = list(line)
        rename_vars(line_normal)
        line_normal = tuple(line_normal)
        result *= lines.get(line_normal, 0.01)

    if len(program) == 0 or result == 0:
        return 0.01
    return math.pow(result, 1/len(program))

# find a sequence of edits that fixes [code]
def fix(name, code, edits, aux_code='', timeout=30, debug=False):
    todo = PQueue()  # priority queue of candidate solutions
    done = set()     # set of already-analyzed solutions

    # Add a new candidate solution ([lines]+[rules]) to the priority queue.
    # This solution is generated by applying [step] with [cost] to [prev] task.
    def add_task(lines, rules, prev=None, step=None, cost=None):
        if prev is None:
            path = ()
            path_cost = 1.0
        else:
            path = tuple(list(prev[1]) + [step])
            path_cost = prev[2] * cost
        todo.push(((tuple(lines), tuple(rules)), path, path_cost), -path_cost)

    lines, rules = decompose(code)
    add_task(lines, rules)

    inserts, removes, changes = classify_edits(edits)
    start_time = time.monotonic()
    n_tested = 0
    while True:
        total_time = time.monotonic() - start_time
        if total_time > timeout:
            break

        task = todo.pop()
        if task == None:
            break

        (lines, rules), path, path_cost = task
        code = compose(lines, rules)
        if code in done:
            continue
        done.add(code)

        if debug:
            print('Cost {:.12f}'.format(path_cost))
            for line, (before, after) in path:
                print('line ' + str(line) + ':\t' + stringify(before) + ' → ' + stringify(after))

        # If the code is correct, we are done.
        if test(name, code + '\n' + aux_code):
            return code, path, total_time, n_tested
        n_tested += 1

        # Otherwise generate new solutions.
        rule_no = 0
        for start, end in rules:
            rule = lines[start:end]
            rule_tokens = [t for line in rule for t in line]

            for line_idx in range(start, end):
                line = lines[line_idx]

                line_normal = list(line)
                rename_vars(line_normal)
                line_normal = tuple(line_normal)

                seen = False
                for (before, after), cost in changes.items():
                    if line_normal == before:
                        seen = True
                        mapping = map_vars(before, after, line, rule_tokens)
                        after_real = tuple([t if t.type != 'VARIABLE' else Token('VARIABLE', mapping[t.val]) for t in after])
                        new_lines = lines[:line_idx] + (after_real,) + lines[line_idx+1:]
                        new_step = ((rule_no, line_idx-start), (tuple(line), after_real))

                        add_task(new_lines, rules, prev=task, step=new_step, cost=cost)

                # if nothing could be done with this line, try removing it
                # (maybe try removing in any case?)
                if line_normal in removes.keys() or not seen:
                    new_lines = lines[:line_idx] + lines[line_idx+1:]
                    new_rules = []
                    for old_start, old_end in rules:
                        new_start, new_end = (old_start - (0 if old_start <= line_idx else 1),
                                              old_end - (0 if old_end <= line_idx else 1))
                        if new_end > new_start:
                            new_rules.append((new_start, new_end))
                    new_step = ((rule_no, line_idx-start), (tuple(line), ()))
                    new_cost = removes[line_normal] if line_normal in removes.keys() else 0.9

                    add_task(new_lines, new_rules, prev=task, step=new_step, cost=new_cost)

            # try adding a line to this rule… would need to distinguish between
            # head/body lines in transforms
            for after, cost in inserts.items():
                mapping = map_vars([], after, [], rule_tokens)
                after_real = [t if t.type != 'VARIABLE' else Token('VARIABLE', mapping[t.val]) for t in after]
                after_real = tuple(after_real)
                new_lines = lines[:end] + (after_real,) + lines[end:]
                new_rules = []
                for old_start, old_end in rules:
                    new_rules.append((old_start + (0 if old_start < end else 1),
                                      old_end + (0 if old_end < end else 1)))
                new_step = ((rule_no, end-start), ((), after_real))

                add_task(new_lines, new_rules, prev=task, step=new_step, cost=cost)
            rule_no += 1

        # try adding a new fact
        if len(rules) < 2:
            for after, cost in inserts.items():
                new_lines = lines + (after,)
                new_rules = rules + (((len(lines), len(lines)+1)),)
                new_step = ((len(new_rules)-1, 0), (tuple(), tuple(after)))

                add_task(new_lines, new_rules, prev=task, step=new_step, cost=cost)

    return '', [], total_time, n_tested