From ee5da92aeffcc3505e2be2efeb1a235ab852709a Mon Sep 17 00:00:00 2001 From: Timotej Lazar Date: Tue, 13 Oct 2015 16:24:39 +0200 Subject: Move monkey/monkey.py to monkey/__init__.py --- monkey/__init__.py | 325 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 325 insertions(+) create mode 100755 monkey/__init__.py (limited to 'monkey/__init__.py') diff --git a/monkey/__init__.py b/monkey/__init__.py new file mode 100755 index 0000000..6577450 --- /dev/null +++ b/monkey/__init__.py @@ -0,0 +1,325 @@ +#!/usr/bin/python3 + +import math +import time +import prolog.engine + +from .edits import classify_edits +from prolog.util import Token, annotate, compose, map_vars, normalized, rename_vars, stringify +from .util import damerau_levenshtein, PQueue + +# Starting from [code], find a sequence of edits that transforms it into a +# correct predicate for [name]. The [test] function is used to test generated +# programs. +# Return (solution, edits, time spent, #programs checked). If no solution is +# found within [timeout] seconds, return solution='' and edits=[]. +def fix(name, code, edits, test, timeout=30, debug=False): + # A dictionary of edits with costs for each edit type (insert, remove or + # change a line). Edits are tuples (before, after), where before and after + # are sequences of tokens. Variable names are normalized to A0, A1, A2,…. + inserts, removes, changes = classify_edits(edits) + + # Generate states that can be reached from the given program with one edit. + # The program is given as a list of tokens. + def step(program, path=None): + # Apply edits to program in order; skip tokens with index lower than + # last step. + start = path[-1][4] if path else 0 + + for i, token in enumerate(program): + # Get variable names in the current rule. + if i == 0 or program[i-1].type == 'PERIOD': + variables = [] + for t in program[i:]: + if t.type == 'VARIABLE' and not t.val.startswith('_'): + if t.val not in variables: + variables.append(t.val) + elif t.type == 'PERIOD': + break + + # Remember the first token in the current part. + if i == 0 or program[i-1].stop: + first = i + + # Don't edit already modified parts of the program. + if i < start: + continue + + # Add a new fact at beginning or after another rule. + if i == 0 or token.type == 'PERIOD': + new_start = i+1 if token.type == 'PERIOD' else 0 + n_rules = program.count(Token('PERIOD', '.')) + rule = 0 if i == 0 else program[i-1].rule+1 # index of new rule + + for new, cost in inserts.items(): + # New rule must start with correct predicate name. + if not (new[0].type == 'NAME' and new[0].val in name): + continue + + # Here we always insert a fact, so replace trailing :- with .. + if new[-1].type == 'FROM': + new = new[:-1] + + new_real = tuple([t.clone(rule=rule, part=0) for t in new + (Token('PERIOD', '.', stop=True),)]) + new_after = tuple([t.clone(rule=t.rule+1) for t in program[new_start:]]) + + new_program = program[:new_start] + new_real + new_after + new_step = ('add_rule', new_start, (), new_real, new_start) + new_cost = cost * math.pow(0.3, n_rules) + yield (new_program, new_step, new_cost) + + if token.stop and i > first: + real_last = last = i+1 + if token.type != 'FROM': + real_last -= 1 + part = program[first:real_last] + part_whole = program[first:last] + part_normal = tuple(rename_vars(part)) + + # Apply each edit a→b where a matches this part. + seen = False # has there been such an edit? + for (a, b), cost in changes.items(): + if part_normal == a: + seen = True + if b[-1].type == 'FROM': + b = b[:-1] + (b[-1].clone(stop=True),) + b_real = tuple([t.clone(rule=program[first].rule, + part=program[first].part) + for t in map_vars(a, b, part, variables)]) + + new_program = program[:first] + b_real + program[real_last:] + new_step = ('change_part', first, part, b_real, first) + yield (new_program, new_step, cost) + + + # Remove this part. + if token.type in ('COMMA', 'SEMI'): + if part_normal in removes.keys() or not seen: + new_after = list(program[last:]) + for j, t in enumerate(new_after): + if t.rule > token.rule: + break + new_after[j] = t.clone(part=t.part-1) + new_program = program[:first] + tuple(new_after) + new_step = ('remove_part', first, part_whole, (), first-1) + new_cost = removes.get(part_normal, 1.0) * 0.99 + yield (new_program, new_step, new_cost) + + # Remove this part at the end of the current rule. + if token.type == 'PERIOD' and token.part > 0: + if part_normal in removes.keys() or not seen: + if token.part == 0: # part is fact, remove rule + new_after = list(program[last+1:]) + for j, t in enumerate(new_after): + new_after[j] = t.clone(rule=t.rule-1) + new_program = program[:first] + tuple(new_after) + new_step = ('remove_rule', first, part, (), first) + else: # part is subgoal, remove part + new_after = list(program[last-1:]) + for j, t in enumerate(new_after): + if t.rule > token.rule: + break + new_after[j] = t.clone(part=t.part-1) + new_program = program[:first-1] + tuple(new_after) + new_step = ('remove_part', first-1, (program[first-1],) + part, (), first-1) + + new_cost = removes.get(part_normal, 1.0) * 0.99 + yield (new_program, new_step, new_cost) + + + # Insert a new part (goal) after this part. + if token.type in ('COMMA', 'FROM'): + for new, cost in inserts.items(): + # Don't try to insert a head into the body. + if new[-1].type == 'FROM': + continue + + new_real = tuple([t.clone(rule=program[first].rule, + part=program[first].part+1) + for t in map_vars([], new, [], variables) + [Token('COMMA', ',', stop=True)]]) + + new_after = list(program[last:]) + for j, t in enumerate(new_after): + if t.rule > token.rule: + break + new_after[j] = t.clone(rule=program[first].rule, part=t.part+1) + + new_program = program[:last] + new_real + tuple(new_after) + new_step = ('add_part', last, (), new_real, last) + new_cost = cost * 0.4 + yield (new_program, new_step, new_cost) + + # Insert a new part (goal) at the end of current rule. + if token.type == 'PERIOD': + for new, cost in inserts.items(): + # Don't try to insert a head into the body. + if new[-1].type == 'FROM': + continue + + prepend = Token('FROM', ':-') if token.part == 0 else Token('COMMA', ',') + new_real = (prepend.clone(stop=True, rule=token.rule, part=token.part),) + \ + tuple([t.clone(rule=token.rule, part=token.part+1) + for t in map_vars([], new, [], variables)]) + + new_after = list(program[last-1:]) + for j, t in enumerate(new_after): + if t.rule > token.rule: + break + new_after[j] = t.clone(rule=t.rule, part=t.part+1) + + new_program = program[:last-1] + new_real + tuple(new_after) + new_step = ('add_part', last-1, (), new_real, last) + new_cost = cost * 0.4 + yield (new_program, new_step, new_cost) + + # Return a cleaned-up list of steps. + def postprocess(steps): + new_steps = [] + for step in steps: + # Remove the last field from each step as it is unnecessary after a + # path has been found. + step = step[:4] + if new_steps: + prev = new_steps[-1] + if prev[0] == 'remove_part' and step[0] == 'remove_part' and \ + prev[1] == step[1]: + new_steps[-1] = ('remove_part', prev[1], prev[2]+step[2], ()) + continue + + if prev[0] == 'remove_part' and step[0] == 'add_part' and \ + prev[1] == step[1]: + new_steps[-1] = ('change_part', prev[1], prev[2], step[3]) + continue + + if prev[0] == 'change_part' and step[0] == 'change_part' and \ + prev[1] == step[1] and step[2] == prev[3]: + new_steps[-1] = ('change_part', prev[1], prev[2], step[3]) + continue + + if prev[0] in ('add_part', 'change_part') and step[0] == 'change_part' and \ + prev[1] == step[1] and step[2] == prev[3][:-1]: + new_steps[-1] = (prev[0], prev[1], prev[2], step[3]+(prev[3][-1],)) + continue + + if prev[0] in ('add_part', 'change_part') and step[0] == 'change_part' and \ + step[1] == prev[1]+1 and step[2] == prev[3][1:]: + new_steps[-1] = (prev[0], prev[1], prev[2], (prev[3][0],)+step[3]) + continue + new_steps.append(step) + for step in new_steps: + print('index {}: {} {} → {}'.format( + step[1], step[0], stringify(step[2]), stringify(step[3]))) + return new_steps + + # Main loop: best-first search through generated programs. + todo = PQueue() # priority queue of candidate solutions + done = set() # programs we have already visited + + # Each program gets a task with the sequence of edits that generated it and + # the associated cost. First candidate with cost 1 is the initial program. + program = tuple(annotate(code)) + todo.push((program, (), 1.0), -1.0) + + n_tested = 0 + start_time = time.monotonic() + total_time = 0 + while total_time < timeout: + # Get next candidate. + task = todo.pop() + if task == None: + break + program, path, path_cost = task + + # If we have already seen this code, skip it. + code = compose(program) + if code in done: + continue + done.add(code) + + # Print some info about the current task. + if debug: + print('Cost {:.12f}'.format(path_cost)) + for step_type, idx, a, b, _ in path: + print('index {}: {} {} → {}'.format(idx, step_type, stringify(a), stringify(b))) + + # If the code is correct, we are done. + if test(code): + path = postprocess(path) + return code, path, total_time, n_tested + n_tested += 1 + + # Otherwise generate new solutions. + for new_program, new_step, new_cost in step(program, path): + new_path_cost = path_cost * new_cost + if new_path_cost < 0.01: + continue + new_path = path + (new_step,) + todo.push((new_program, new_path, new_path_cost), -new_path_cost) + + total_time = time.monotonic() - start_time + + return '', [], total_time, n_tested + +# Return tuples (type, start, end, message) describing edits in [path]. +def fix_hints(code, path): + program = list(annotate(code)) + + for step_type, idx, a, b in path: + if step_type == 'add_rule': + fix_type = 'insert' + msg = 'Add another rule.' + start = program[idx].pos-1 + end = start+2 + elif step_type == 'add_part': + fix_type = 'insert' + msg = 'Add another goal to this rule.' + start = program[idx].pos-1 + end = start+2 + elif step_type == 'remove_rule': + fix_type = 'remove' + msg = 'Remove this rule.' + start = program[idx].pos + end = program[idx + len(a) - 1].pos + elif step_type == 'remove_part': + fix_type = 'remove' + msg = 'Remove this goal.' + start = program[idx].pos + end = idx + len(a) - 1 + if program[end].type in ('COMMA', 'PERIOD', 'SEMI'): + end -= 1 + end = program[end].pos + len(program[end].val) + elif step_type == 'change_part': + fix_type = 'change' + msg = 'Check this part.' + first = 0 + while idx+first < len(program)-1 and first < len(a) and first < len(b) and a[first] == b[first]: + first += 1 + last = len(a)-1 + while last < len(b) and last >= first and a[last] == b[last]: + last -= 1 + start = program[idx+first].pos + end = program[idx+last].pos + len(program[idx+last].val) + + program[idx:idx+len(a)] = [t.clone(pos=program[idx].pos) for t in b] + yield fix_type, start, end, msg + + +# Checks for typos in the code and suggest the nearst uploaded term by other users. +def check_typos(code, names): + for token in annotate(code): + if token.type == 'NAME': + nearest_name = ' ' + nearest_dist = 1000 + own_count = names.get(token.val, 0) # count of the token.val which is compared with the + # each name in the names + for name in names.items(): + if name[0] == token.val: # If the names are the skip the code + continue + + distance = damerau_levenshtein(token.val, name[0]) + + if distance < nearest_dist and distance > 0 and own_count < name[1]: + nearest_dist = distance # Set best_dist and best_name if the less one is found + nearest_name = name[0] + if nearest_dist > 0 and nearest_dist/len(nearest_name) <= 1/3: + yield 'typo', token.pos, token.pos + len(token.val) , 'Did you mean "{}"?'.format(nearest_name) -- cgit v1.2.1