summaryrefslogtreecommitdiff
path: root/monkey/__init__.py
diff options
context:
space:
mode:
authorTimotej Lazar <timotej.lazar@fri.uni-lj.si>2015-11-18 13:20:00 +0100
committerTimotej Lazar <timotej.lazar@fri.uni-lj.si>2015-12-11 16:11:15 +0100
commitfa39fe7bfedd0b2e615d369adb5b510ceb9b857f (patch)
treeabf381b007d1816247cab534ac8e2152695ad596 /monkey/__init__.py
parentdd723bd01634fa5ffc85402ea10947e472b257af (diff)
Use a more general method for extracting edits
This is a large overhaul of monkey code. Before, only edits within individual lines were tracked, which required a Prolog-specific method for splitting a program into a list of lines for every rule. In this version, modifications can be tracked within arbitrary code ranges. Ranges to be tracked are determined by selecting "interesting" subtrees in the AST of the starting code version. The new method is simpler, less language-dependent and easier to extend. The downside is that a program must be syntactically correct before we can attempt to fix it (the previous approach could handle programs with syntax errors in some cases). This commit also integrates a call to monkey.fix in prolog_session.hint, by running it if no other hint is found.
Diffstat (limited to 'monkey/__init__.py')
-rw-r--r--[-rwxr-xr-x]monkey/__init__.py333
1 files changed, 63 insertions, 270 deletions
diff --git a/monkey/__init__.py b/monkey/__init__.py
index 09f77b0..db00493 100755..100644
--- a/monkey/__init__.py
+++ b/monkey/__init__.py
@@ -18,8 +18,7 @@ import math
import time
import prolog.engine
-from .edits import classify_edits
-from prolog.util import Token, annotate, compose, map_vars, normalized, rename_vars, stringify
+from prolog.util import Token, tokenize, rename_vars, stringify, parse, interesting_ranges, rename_vars_list
from .util import damerau_levenshtein, PQueue
# Starting from [code], find a sequence of edits that transforms it into a
@@ -27,216 +26,45 @@ from .util import damerau_levenshtein, PQueue
# programs.
# Return (solution, edits, time spent, #programs checked). If no solution is
# found within [timeout] seconds, return solution='' and edits=[].
-def fix(name, code, edits, test, timeout=30, debug=False):
- # A dictionary of edits with costs for each edit type (insert, remove or
- # change a line). Edits are tuples (before, after), where before and after
- # are sequences of tokens. Variable names are normalized to A0, A1, A2,….
- inserts, removes, changes = classify_edits(edits)
-
- # Generate states that can be reached from the given program with one edit.
- # The program is given as a list of tokens.
+def fix(code, edits, test, timeout=30, debug=False):
def step(program, path=None):
- # Apply edits to program in order; skip tokens with index lower than
- # last step.
- start = path[-1][4] if path else 0
-
- for i, token in enumerate(program):
- # Get variable names in the current rule.
- if i == 0 or program[i-1].type == 'PERIOD':
- variables = []
- for t in program[i:]:
- if t.type == 'VARIABLE' and not t.val.startswith('_'):
- if t.val not in variables:
- variables.append(t.val)
- elif t.type == 'PERIOD':
- break
-
- # Remember the first token in the current part.
- if i == 0 or program[i-1].stop:
- first = i
-
- # Don't edit already modified parts of the program.
- if i < start:
- continue
-
- # Add a new fact at beginning or after another rule.
- if i == 0 or token.type == 'PERIOD':
- new_start = i+1 if token.type == 'PERIOD' else 0
- n_rules = program.count(Token('PERIOD', '.'))
- rule = 0 if i == 0 else program[i-1].rule+1 # index of new rule
-
- for new, cost in inserts.items():
- # New rule must start with correct predicate name.
- if not (new[0].type == 'NAME' and new[0].val in name):
- continue
-
- # Here we always insert a fact, so replace trailing :- with ..
- if new[-1].type == 'FROM':
- new = new[:-1]
-
- new_real = tuple([t.clone(rule=rule, part=0) for t in new + (Token('PERIOD', '.', stop=True),)])
- new_after = tuple([t.clone(rule=t.rule+1) for t in program[new_start:]])
-
- new_program = program[:new_start] + new_real + new_after
- new_step = ('add_rule', new_start, (), new_real, new_start)
- new_cost = cost * math.pow(0.3, n_rules)
- yield (new_program, new_step, new_cost)
-
- if token.stop and i > first:
- real_last = last = i+1
- if token.type != 'FROM':
- real_last -= 1
- part = program[first:real_last]
- part_whole = program[first:last]
- part_normal = tuple(rename_vars(part))
-
- # Apply each edit a→b where a matches this part.
- seen = False # has there been such an edit?
- for (a, b), cost in changes.items():
- if part_normal == a:
- seen = True
- if b[-1].type == 'FROM':
- b = b[:-1] + (b[-1].clone(stop=True),)
- b_real = tuple([t.clone(rule=program[first].rule,
- part=program[first].part)
- for t in map_vars(a, b, part, variables)])
-
- new_program = program[:first] + b_real + program[real_last:]
- new_step = ('change_part', first, part, b_real, first)
- yield (new_program, new_step, cost)
-
-
- # Remove this part.
- if token.type in ('COMMA', 'SEMI'):
- if part_normal in removes.keys() or not seen:
- new_after = list(program[last:])
- for j, t in enumerate(new_after):
- if t.rule > token.rule:
- break
- new_after[j] = t.clone(part=t.part-1)
- new_program = program[:first] + tuple(new_after)
- new_step = ('remove_part', first, part_whole, (), first-1)
- new_cost = removes.get(part_normal, 1.0) * 0.99
- yield (new_program, new_step, new_cost)
-
- # Remove this part at the end of the current rule.
- if token.type == 'PERIOD' and token.part > 0:
- if part_normal in removes.keys() or not seen:
- if token.part == 0: # part is fact, remove rule
- new_after = list(program[last+1:])
- for j, t in enumerate(new_after):
- new_after[j] = t.clone(rule=t.rule-1)
- new_program = program[:first] + tuple(new_after)
- new_step = ('remove_rule', first, part, (), first)
- else: # part is subgoal, remove part
- new_after = list(program[last-1:])
- for j, t in enumerate(new_after):
- if t.rule > token.rule:
- break
- new_after[j] = t.clone(part=t.part-1)
- new_program = program[:first-1] + tuple(new_after)
- new_step = ('remove_part', first-1, (program[first-1],) + part, (), first-1)
-
- new_cost = removes.get(part_normal, 1.0) * 0.99
- yield (new_program, new_step, new_cost)
-
-
- # Insert a new part (goal) after this part.
- if token.type in ('COMMA', 'FROM'):
- for new, cost in inserts.items():
- # Don't try to insert a head into the body.
- if new[-1].type == 'FROM':
- continue
-
- new_real = tuple([t.clone(rule=program[first].rule,
- part=program[first].part+1)
- for t in map_vars([], new, [], variables) + [Token('COMMA', ',', stop=True)]])
-
- new_after = list(program[last:])
- for j, t in enumerate(new_after):
- if t.rule > token.rule:
- break
- new_after[j] = t.clone(rule=program[first].rule, part=t.part+1)
-
- new_program = program[:last] + new_real + tuple(new_after)
- new_step = ('add_part', last, (), new_real, last)
- new_cost = cost * 0.4
- yield (new_program, new_step, new_cost)
-
- # Insert a new part (goal) at the end of current rule.
- if token.type == 'PERIOD':
- for new, cost in inserts.items():
- # Don't try to insert a head into the body.
- if new[-1].type == 'FROM':
- continue
-
- prepend = Token('FROM', ':-') if token.part == 0 else Token('COMMA', ',')
- new_real = (prepend.clone(stop=True, rule=token.rule, part=token.part),) + \
- tuple([t.clone(rule=token.rule, part=token.part+1)
- for t in map_vars([], new, [], variables)])
-
- new_after = list(program[last-1:])
- for j, t in enumerate(new_after):
- if t.rule > token.rule:
- break
- new_after[j] = t.clone(rule=t.rule, part=t.part+1)
-
- new_program = program[:last-1] + new_real + tuple(new_after)
- new_step = ('add_part', last-1, (), new_real, last)
- new_cost = cost * 0.4
- yield (new_program, new_step, new_cost)
-
- # Return a cleaned-up list of steps.
- def postprocess(steps):
- new_steps = []
- for step in steps:
- # Remove the last field from each step as it is unnecessary after a
- # path has been found.
- step = step[:4]
- if new_steps:
- prev = new_steps[-1]
- if prev[0] == 'remove_part' and step[0] == 'remove_part' and \
- prev[1] == step[1]:
- new_steps[-1] = ('remove_part', prev[1], prev[2]+step[2], ())
- continue
-
- if prev[0] == 'remove_part' and step[0] == 'add_part' and \
- prev[1] == step[1]:
- new_steps[-1] = ('change_part', prev[1], prev[2], step[3])
- continue
-
- if prev[0] == 'change_part' and step[0] == 'change_part' and \
- prev[1] == step[1] and step[2] == prev[3]:
- new_steps[-1] = ('change_part', prev[1], prev[2], step[3])
- continue
-
- if prev[0] in ('add_part', 'change_part') and step[0] == 'change_part' and \
- prev[1] == step[1] and step[2] == prev[3][:-1]:
- new_steps[-1] = (prev[0], prev[1], prev[2], step[3]+(prev[3][-1],))
- continue
-
- if prev[0] in ('add_part', 'change_part') and step[0] == 'change_part' and \
- step[1] == prev[1]+1 and step[2] == prev[3][1:]:
- new_steps[-1] = (prev[0], prev[1], prev[2], (prev[3][0],)+step[3])
- continue
- new_steps.append(step)
- for step in new_steps:
- print('index {}: {} {} → {}'.format(
- step[1], step[0], stringify(step[2]), stringify(step[3])))
- return new_steps
+ tokens = program.leaves()
+ for part, range_path in interesting_ranges(program):
+ names = {}
+ part_normal = tuple(rename_vars_list(part, names))
+ for (path, a, b), p in edits.items():
+ if path == range_path and a == part_normal:
+ reverse_names = {v: k for k, v in names.items()}
+ b_real = tuple(rename_vars(b, reverse_names))
+ new_tokens = []
+ idx = None
+ for i, t in enumerate(tokens):
+ if t.pos >= part[0].pos and t.pos+len(t.val) <= part[-1].pos+len(part[-1].val):
+ if idx is None:
+ idx = i
+ new_tokens.extend(b_real)
+ else:
+ new_tokens.append(t)
+ new_code = stringify(new_tokens)
+ new_program = parse(new_code)
+ new_step = (idx, tuple(part), b_real)
+ if new_program is not None:
+ yield (new_program.freeze(), new_step, p)
# Main loop: best-first search through generated programs.
todo = PQueue() # priority queue of candidate solutions
done = set() # programs we have already visited
+ n_tested = 0
+ start_time = time.monotonic()
+ total_time = 0
# Each program gets a task with the sequence of edits that generated it and
# the associated cost. First candidate with cost 1 is the initial program.
- program = tuple(annotate(code))
- todo.push((program, (), 1.0), -1.0)
+ program = parse(code)
+ if program is None:
+ return '', [], total_time, n_tested
+ todo.push((program.freeze(), (), 1.0), -1.0)
- n_tested = 0
- start_time = time.monotonic()
- total_time = 0
while total_time < timeout:
# Get next candidate.
task = todo.pop()
@@ -245,7 +73,7 @@ def fix(name, code, edits, test, timeout=30, debug=False):
program, path, path_cost = task
# If we have already seen this code, skip it.
- code = compose(program)
+ code = stringify(program)
if code in done:
continue
done.add(code)
@@ -253,12 +81,11 @@ def fix(name, code, edits, test, timeout=30, debug=False):
# Print some info about the current task.
if debug:
print('Cost {:.12f}'.format(path_cost))
- for step_type, idx, a, b, _ in path:
- print('index {}: {} {} → {}'.format(idx, step_type, stringify(a), stringify(b)))
+ for idx, a, b in path:
+ print('{}: {} → {}'.format(idx, stringify(a), stringify(b)))
# If the code is correct, we are done.
if test(code):
- path = postprocess(path)
return code, path, total_time, n_tested
n_tested += 1
@@ -274,66 +101,32 @@ def fix(name, code, edits, test, timeout=30, debug=False):
return '', [], total_time, n_tested
-# Return tuples (type, start, end, message) describing edits in [path].
-def fix_hints(code, path):
- program = list(annotate(code))
-
- for step_type, idx, a, b in path:
- if step_type == 'add_rule':
- fix_type = 'insert'
- msg = 'Add another rule.'
- start = program[idx].pos-1
- end = start+2
- elif step_type == 'add_part':
- fix_type = 'insert'
- msg = 'Add another goal to this rule.'
- start = program[idx].pos-1
- end = start+2
- elif step_type == 'remove_rule':
- fix_type = 'remove'
- msg = 'Remove this rule.'
- start = program[idx].pos
- end = program[idx + len(a) - 1].pos
- elif step_type == 'remove_part':
- fix_type = 'remove'
- msg = 'Remove this goal.'
- start = program[idx].pos
- end = idx + len(a) - 1
- if program[end].type in ('COMMA', 'PERIOD', 'SEMI'):
- end -= 1
- end = program[end].pos + len(program[end].val)
- elif step_type == 'change_part':
- fix_type = 'change'
- msg = 'Check this part.'
- first = 0
- while idx+first < len(program)-1 and first < len(a) and first < len(b) and a[first] == b[first]:
- first += 1
- last = len(a)-1
- while last < len(b) and last >= first and a[last] == b[last]:
- last -= 1
- start = program[idx+first].pos
- end = program[idx+last].pos + len(program[idx+last].val)
-
- program[idx:idx+len(a)] = [t.clone(pos=program[idx].pos) for t in b]
- yield fix_type, start, end, msg
-
-
-# Checks for typos in the code and suggest the nearst uploaded term by other users.
-def check_typos(code, names):
- for token in annotate(code):
- if token.type == 'NAME':
- nearest_name = ' '
- nearest_dist = 1000
- own_count = names.get(token.val, 0) # count of the token.val which is compared with the
- # each name in the names
- for name in names.items():
- if name[0] == token.val: # If the names are the skip the code
- continue
-
- distance = damerau_levenshtein(token.val, name[0])
-
- if distance < nearest_dist and distance > 0 and own_count < name[1]:
- nearest_dist = distance # Set best_dist and best_name if the less one is found
- nearest_name = name[0]
- if nearest_dist > 0 and nearest_dist/len(nearest_name) <= 1/3:
- yield 'typo', token.pos, token.pos + len(token.val) , 'Did you mean "{}"?'.format(nearest_name)
+def min_diff(a, b):
+ first = 0
+ while first < len(a) and first < len(b) and a[first] == b[first]:
+ first += 1
+ last = 0
+ while first-last < len(a) and first-last < len(b) and a[last-1] == b[last-1]:
+ last -= 1
+ return first, last
+
+def fix_hints(code, steps):
+ hints = []
+ tokens = tokenize(code)
+ for idx, a, b in steps:
+ start, end = min_diff(a, b)
+
+ if start >= len(a)+end:
+ hint_id = 'monkey_insert'
+ elif start >= len(b)+end:
+ hint_id = 'monkey_remove'
+ else:
+ hint_id = 'monkey_change'
+
+ pos_start = tokens[idx+start].pos
+ pos_end = tokens[idx+len(a)+end-1].pos + len(tokens[idx+len(a)+end-1].val)
+ pos_end = max(pos_end, pos_start+1)
+
+ hints += [{'id': hint_id, 'start': pos_start, 'end': pos_end}]
+ tokens[idx:idx+len(a)] = [t.clone(pos=tokens[idx].pos) for t in b]
+ return hints