1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
|
#!/usr/bin/python3
import math
import time
import prolog.engine
from .edits import classify_edits
from prolog.util import Token, annotate, compose, map_vars, normalized, rename_vars, stringify
from .util import damerau_levenshtein, PQueue
# Starting from [code], find a sequence of edits that transforms it into a
# correct predicate for [name]. The [test] function is used to test generated
# programs.
# Return (solution, edits, time spent, #programs checked). If no solution is
# found within [timeout] seconds, return solution='' and edits=[].
def fix(name, code, edits, test, timeout=30, debug=False):
# A dictionary of edits with costs for each edit type (insert, remove or
# change a line). Edits are tuples (before, after), where before and after
# are sequences of tokens. Variable names are normalized to A0, A1, A2,….
inserts, removes, changes = classify_edits(edits)
# Generate states that can be reached from the given program with one edit.
# The program is given as a list of tokens.
def step(program, path=None):
# Apply edits to program in order; skip tokens with index lower than
# last step.
start = path[-1][4] if path else 0
for i, token in enumerate(program):
# Get variable names in the current rule.
if i == 0 or program[i-1].type == 'PERIOD':
variables = []
for t in program[i:]:
if t.type == 'VARIABLE' and not t.val.startswith('_'):
if t.val not in variables:
variables.append(t.val)
elif t.type == 'PERIOD':
break
# Remember the first token in the current part.
if i == 0 or program[i-1].stop:
first = i
# Don't edit already modified parts of the program.
if i < start:
continue
# Add a new fact at beginning or after another rule.
if i == 0 or token.type == 'PERIOD':
new_start = i+1 if token.type == 'PERIOD' else 0
n_rules = program.count(Token('PERIOD', '.'))
rule = 0 if i == 0 else program[i-1].rule+1 # index of new rule
for new, cost in inserts.items():
# New rule must start with correct predicate name.
if not (new[0].type == 'NAME' and new[0].val in name):
continue
# Here we always insert a fact, so replace trailing :- with ..
if new[-1].type == 'FROM':
new = new[:-1]
new_real = tuple([t.clone(rule=rule, part=0) for t in new + (Token('PERIOD', '.', stop=True),)])
new_after = tuple([t.clone(rule=t.rule+1) for t in program[new_start:]])
new_program = program[:new_start] + new_real + new_after
new_step = ('add_rule', new_start, (), new_real, new_start)
new_cost = cost * math.pow(0.3, n_rules)
yield (new_program, new_step, new_cost)
if token.stop and i > first:
real_last = last = i+1
if token.type != 'FROM':
real_last -= 1
part = program[first:real_last]
part_whole = program[first:last]
part_normal = tuple(rename_vars(part))
# Apply each edit a→b where a matches this part.
seen = False # has there been such an edit?
for (a, b), cost in changes.items():
if part_normal == a:
seen = True
if b[-1].type == 'FROM':
b = b[:-1] + (b[-1].clone(stop=True),)
b_real = tuple([t.clone(rule=program[first].rule,
part=program[first].part)
for t in map_vars(a, b, part, variables)])
new_program = program[:first] + b_real + program[real_last:]
new_step = ('change_part', first, part, b_real, first)
yield (new_program, new_step, cost)
# Remove this part.
if token.type in ('COMMA', 'SEMI'):
if part_normal in removes.keys() or not seen:
new_after = list(program[last:])
for j, t in enumerate(new_after):
if t.rule > token.rule:
break
new_after[j] = t.clone(part=t.part-1)
new_program = program[:first] + tuple(new_after)
new_step = ('remove_part', first, part_whole, (), first-1)
new_cost = removes.get(part_normal, 1.0) * 0.99
yield (new_program, new_step, new_cost)
# Remove this part at the end of the current rule.
if token.type == 'PERIOD' and token.part > 0:
if part_normal in removes.keys() or not seen:
if token.part == 0: # part is fact, remove rule
new_after = list(program[last+1:])
for j, t in enumerate(new_after):
new_after[j] = t.clone(rule=t.rule-1)
new_program = program[:first] + tuple(new_after)
new_step = ('remove_rule', first, part, (), first)
else: # part is subgoal, remove part
new_after = list(program[last-1:])
for j, t in enumerate(new_after):
if t.rule > token.rule:
break
new_after[j] = t.clone(part=t.part-1)
new_program = program[:first-1] + tuple(new_after)
new_step = ('remove_part', first-1, (program[first-1],) + part, (), first-1)
new_cost = removes.get(part_normal, 1.0) * 0.99
yield (new_program, new_step, new_cost)
# Insert a new part (goal) after this part.
if token.type in ('COMMA', 'FROM'):
for new, cost in inserts.items():
# Don't try to insert a head into the body.
if new[-1].type == 'FROM':
continue
new_real = tuple([t.clone(rule=program[first].rule,
part=program[first].part+1)
for t in map_vars([], new, [], variables) + [Token('COMMA', ',', stop=True)]])
new_after = list(program[last:])
for j, t in enumerate(new_after):
if t.rule > token.rule:
break
new_after[j] = t.clone(rule=program[first].rule, part=t.part+1)
new_program = program[:last] + new_real + tuple(new_after)
new_step = ('add_part', last, (), new_real, last)
new_cost = cost * 0.4
yield (new_program, new_step, new_cost)
# Insert a new part (goal) at the end of current rule.
if token.type == 'PERIOD':
for new, cost in inserts.items():
# Don't try to insert a head into the body.
if new[-1].type == 'FROM':
continue
prepend = Token('FROM', ':-') if token.part == 0 else Token('COMMA', ',')
new_real = (prepend.clone(stop=True, rule=token.rule, part=token.part),) + \
tuple([t.clone(rule=token.rule, part=token.part+1)
for t in map_vars([], new, [], variables)])
new_after = list(program[last-1:])
for j, t in enumerate(new_after):
if t.rule > token.rule:
break
new_after[j] = t.clone(rule=t.rule, part=t.part+1)
new_program = program[:last-1] + new_real + tuple(new_after)
new_step = ('add_part', last-1, (), new_real, last)
new_cost = cost * 0.4
yield (new_program, new_step, new_cost)
# Return a cleaned-up list of steps.
def postprocess(steps):
new_steps = []
for step in steps:
# Remove the last field from each step as it is unnecessary after a
# path has been found.
step = step[:4]
if new_steps:
prev = new_steps[-1]
if prev[0] == 'remove_part' and step[0] == 'remove_part' and \
prev[1] == step[1]:
new_steps[-1] = ('remove_part', prev[1], prev[2]+step[2], ())
continue
if prev[0] == 'remove_part' and step[0] == 'add_part' and \
prev[1] == step[1]:
new_steps[-1] = ('change_part', prev[1], prev[2], step[3])
continue
if prev[0] == 'change_part' and step[0] == 'change_part' and \
prev[1] == step[1] and step[2] == prev[3]:
new_steps[-1] = ('change_part', prev[1], prev[2], step[3])
continue
if prev[0] in ('add_part', 'change_part') and step[0] == 'change_part' and \
prev[1] == step[1] and step[2] == prev[3][:-1]:
new_steps[-1] = (prev[0], prev[1], prev[2], step[3]+(prev[3][-1],))
continue
if prev[0] in ('add_part', 'change_part') and step[0] == 'change_part' and \
step[1] == prev[1]+1 and step[2] == prev[3][1:]:
new_steps[-1] = (prev[0], prev[1], prev[2], (prev[3][0],)+step[3])
continue
new_steps.append(step)
for step in new_steps:
print('index {}: {} {} → {}'.format(
step[1], step[0], stringify(step[2]), stringify(step[3])))
return new_steps
# Main loop: best-first search through generated programs.
todo = PQueue() # priority queue of candidate solutions
done = set() # programs we have already visited
# Each program gets a task with the sequence of edits that generated it and
# the associated cost. First candidate with cost 1 is the initial program.
program = tuple(annotate(code))
todo.push((program, (), 1.0), -1.0)
n_tested = 0
start_time = time.monotonic()
total_time = 0
while total_time < timeout:
# Get next candidate.
task = todo.pop()
if task == None:
break
program, path, path_cost = task
# If we have already seen this code, skip it.
code = compose(program)
if code in done:
continue
done.add(code)
# Print some info about the current task.
if debug:
print('Cost {:.12f}'.format(path_cost))
for step_type, idx, a, b, _ in path:
print('index {}: {} {} → {}'.format(idx, step_type, stringify(a), stringify(b)))
# If the code is correct, we are done.
if test(code):
path = postprocess(path)
return code, path, total_time, n_tested
n_tested += 1
# Otherwise generate new solutions.
for new_program, new_step, new_cost in step(program, path):
new_path_cost = path_cost * new_cost
if new_path_cost < 0.01:
continue
new_path = path + (new_step,)
todo.push((new_program, new_path, new_path_cost), -new_path_cost)
total_time = time.monotonic() - start_time
return '', [], total_time, n_tested
# Return tuples (type, start, end, message) describing edits in [path].
def fix_hints(code, path):
program = list(annotate(code))
for step_type, idx, a, b in path:
if step_type == 'add_rule':
fix_type = 'insert'
msg = 'Add another rule.'
start = program[idx].pos-1
end = start+2
elif step_type == 'add_part':
fix_type = 'insert'
msg = 'Add another goal to this rule.'
start = program[idx].pos-1
end = start+2
elif step_type == 'remove_rule':
fix_type = 'remove'
msg = 'Remove this rule.'
start = program[idx].pos
end = program[idx + len(a) - 1].pos
elif step_type == 'remove_part':
fix_type = 'remove'
msg = 'Remove this goal.'
start = program[idx].pos
end = idx + len(a) - 1
if program[end].type in ('COMMA', 'PERIOD', 'SEMI'):
end -= 1
end = program[end].pos + len(program[end].val)
elif step_type == 'change_part':
fix_type = 'change'
msg = 'Check this part.'
first = 0
while idx+first < len(program)-1 and first < len(a) and first < len(b) and a[first] == b[first]:
first += 1
last = len(a)-1
while last < len(b) and last >= first and a[last] == b[last]:
last -= 1
start = program[idx+first].pos
end = program[idx+last].pos + len(program[idx+last].val)
program[idx:idx+len(a)] = [t.clone(pos=program[idx].pos) for t in b]
yield fix_type, start, end, msg
# Checks for typos in the code and suggest the nearst uploaded term by other users.
def check_typos(code, names):
for token in annotate(code):
if token.type == 'NAME':
nearest_name = ' '
nearest_dist = 1000
own_count = names.get(token.val, 0) # count of the token.val which is compared with the
# each name in the names
for name in names.items():
if name[0] == token.val: # If the names are the skip the code
continue
distance = damerau_levenshtein(token.val, name[0])
if distance < nearest_dist and distance > 0 and own_count < name[1]:
nearest_dist = distance # Set best_dist and best_name if the less one is found
nearest_name = name[0]
if nearest_dist > 0 and nearest_dist/len(nearest_name) <= 1/3:
yield 'typo', token.pos, token.pos + len(token.val) , 'Did you mean "{}"?'.format(nearest_name)
|