From ee5da92aeffcc3505e2be2efeb1a235ab852709a Mon Sep 17 00:00:00 2001 From: Timotej Lazar Date: Tue, 13 Oct 2015 16:24:39 +0200 Subject: Move monkey/monkey.py to monkey/__init__.py --- monkey/__init__.py | 325 +++++++++++++++++++++++++++++++++++++++++++++++++++++ monkey/monkey.py | 325 ----------------------------------------------------- monkey/test.py | 4 +- 3 files changed, 327 insertions(+), 327 deletions(-) create mode 100755 monkey/__init__.py delete mode 100755 monkey/monkey.py diff --git a/monkey/__init__.py b/monkey/__init__.py new file mode 100755 index 0000000..6577450 --- /dev/null +++ b/monkey/__init__.py @@ -0,0 +1,325 @@ +#!/usr/bin/python3 + +import math +import time +import prolog.engine + +from .edits import classify_edits +from prolog.util import Token, annotate, compose, map_vars, normalized, rename_vars, stringify +from .util import damerau_levenshtein, PQueue + +# Starting from [code], find a sequence of edits that transforms it into a +# correct predicate for [name]. The [test] function is used to test generated +# programs. +# Return (solution, edits, time spent, #programs checked). If no solution is +# found within [timeout] seconds, return solution='' and edits=[]. +def fix(name, code, edits, test, timeout=30, debug=False): + # A dictionary of edits with costs for each edit type (insert, remove or + # change a line). Edits are tuples (before, after), where before and after + # are sequences of tokens. Variable names are normalized to A0, A1, A2,…. + inserts, removes, changes = classify_edits(edits) + + # Generate states that can be reached from the given program with one edit. + # The program is given as a list of tokens. + def step(program, path=None): + # Apply edits to program in order; skip tokens with index lower than + # last step. + start = path[-1][4] if path else 0 + + for i, token in enumerate(program): + # Get variable names in the current rule. + if i == 0 or program[i-1].type == 'PERIOD': + variables = [] + for t in program[i:]: + if t.type == 'VARIABLE' and not t.val.startswith('_'): + if t.val not in variables: + variables.append(t.val) + elif t.type == 'PERIOD': + break + + # Remember the first token in the current part. + if i == 0 or program[i-1].stop: + first = i + + # Don't edit already modified parts of the program. + if i < start: + continue + + # Add a new fact at beginning or after another rule. + if i == 0 or token.type == 'PERIOD': + new_start = i+1 if token.type == 'PERIOD' else 0 + n_rules = program.count(Token('PERIOD', '.')) + rule = 0 if i == 0 else program[i-1].rule+1 # index of new rule + + for new, cost in inserts.items(): + # New rule must start with correct predicate name. + if not (new[0].type == 'NAME' and new[0].val in name): + continue + + # Here we always insert a fact, so replace trailing :- with .. + if new[-1].type == 'FROM': + new = new[:-1] + + new_real = tuple([t.clone(rule=rule, part=0) for t in new + (Token('PERIOD', '.', stop=True),)]) + new_after = tuple([t.clone(rule=t.rule+1) for t in program[new_start:]]) + + new_program = program[:new_start] + new_real + new_after + new_step = ('add_rule', new_start, (), new_real, new_start) + new_cost = cost * math.pow(0.3, n_rules) + yield (new_program, new_step, new_cost) + + if token.stop and i > first: + real_last = last = i+1 + if token.type != 'FROM': + real_last -= 1 + part = program[first:real_last] + part_whole = program[first:last] + part_normal = tuple(rename_vars(part)) + + # Apply each edit a→b where a matches this part. + seen = False # has there been such an edit? + for (a, b), cost in changes.items(): + if part_normal == a: + seen = True + if b[-1].type == 'FROM': + b = b[:-1] + (b[-1].clone(stop=True),) + b_real = tuple([t.clone(rule=program[first].rule, + part=program[first].part) + for t in map_vars(a, b, part, variables)]) + + new_program = program[:first] + b_real + program[real_last:] + new_step = ('change_part', first, part, b_real, first) + yield (new_program, new_step, cost) + + + # Remove this part. + if token.type in ('COMMA', 'SEMI'): + if part_normal in removes.keys() or not seen: + new_after = list(program[last:]) + for j, t in enumerate(new_after): + if t.rule > token.rule: + break + new_after[j] = t.clone(part=t.part-1) + new_program = program[:first] + tuple(new_after) + new_step = ('remove_part', first, part_whole, (), first-1) + new_cost = removes.get(part_normal, 1.0) * 0.99 + yield (new_program, new_step, new_cost) + + # Remove this part at the end of the current rule. + if token.type == 'PERIOD' and token.part > 0: + if part_normal in removes.keys() or not seen: + if token.part == 0: # part is fact, remove rule + new_after = list(program[last+1:]) + for j, t in enumerate(new_after): + new_after[j] = t.clone(rule=t.rule-1) + new_program = program[:first] + tuple(new_after) + new_step = ('remove_rule', first, part, (), first) + else: # part is subgoal, remove part + new_after = list(program[last-1:]) + for j, t in enumerate(new_after): + if t.rule > token.rule: + break + new_after[j] = t.clone(part=t.part-1) + new_program = program[:first-1] + tuple(new_after) + new_step = ('remove_part', first-1, (program[first-1],) + part, (), first-1) + + new_cost = removes.get(part_normal, 1.0) * 0.99 + yield (new_program, new_step, new_cost) + + + # Insert a new part (goal) after this part. + if token.type in ('COMMA', 'FROM'): + for new, cost in inserts.items(): + # Don't try to insert a head into the body. + if new[-1].type == 'FROM': + continue + + new_real = tuple([t.clone(rule=program[first].rule, + part=program[first].part+1) + for t in map_vars([], new, [], variables) + [Token('COMMA', ',', stop=True)]]) + + new_after = list(program[last:]) + for j, t in enumerate(new_after): + if t.rule > token.rule: + break + new_after[j] = t.clone(rule=program[first].rule, part=t.part+1) + + new_program = program[:last] + new_real + tuple(new_after) + new_step = ('add_part', last, (), new_real, last) + new_cost = cost * 0.4 + yield (new_program, new_step, new_cost) + + # Insert a new part (goal) at the end of current rule. + if token.type == 'PERIOD': + for new, cost in inserts.items(): + # Don't try to insert a head into the body. + if new[-1].type == 'FROM': + continue + + prepend = Token('FROM', ':-') if token.part == 0 else Token('COMMA', ',') + new_real = (prepend.clone(stop=True, rule=token.rule, part=token.part),) + \ + tuple([t.clone(rule=token.rule, part=token.part+1) + for t in map_vars([], new, [], variables)]) + + new_after = list(program[last-1:]) + for j, t in enumerate(new_after): + if t.rule > token.rule: + break + new_after[j] = t.clone(rule=t.rule, part=t.part+1) + + new_program = program[:last-1] + new_real + tuple(new_after) + new_step = ('add_part', last-1, (), new_real, last) + new_cost = cost * 0.4 + yield (new_program, new_step, new_cost) + + # Return a cleaned-up list of steps. + def postprocess(steps): + new_steps = [] + for step in steps: + # Remove the last field from each step as it is unnecessary after a + # path has been found. + step = step[:4] + if new_steps: + prev = new_steps[-1] + if prev[0] == 'remove_part' and step[0] == 'remove_part' and \ + prev[1] == step[1]: + new_steps[-1] = ('remove_part', prev[1], prev[2]+step[2], ()) + continue + + if prev[0] == 'remove_part' and step[0] == 'add_part' and \ + prev[1] == step[1]: + new_steps[-1] = ('change_part', prev[1], prev[2], step[3]) + continue + + if prev[0] == 'change_part' and step[0] == 'change_part' and \ + prev[1] == step[1] and step[2] == prev[3]: + new_steps[-1] = ('change_part', prev[1], prev[2], step[3]) + continue + + if prev[0] in ('add_part', 'change_part') and step[0] == 'change_part' and \ + prev[1] == step[1] and step[2] == prev[3][:-1]: + new_steps[-1] = (prev[0], prev[1], prev[2], step[3]+(prev[3][-1],)) + continue + + if prev[0] in ('add_part', 'change_part') and step[0] == 'change_part' and \ + step[1] == prev[1]+1 and step[2] == prev[3][1:]: + new_steps[-1] = (prev[0], prev[1], prev[2], (prev[3][0],)+step[3]) + continue + new_steps.append(step) + for step in new_steps: + print('index {}: {} {} → {}'.format( + step[1], step[0], stringify(step[2]), stringify(step[3]))) + return new_steps + + # Main loop: best-first search through generated programs. + todo = PQueue() # priority queue of candidate solutions + done = set() # programs we have already visited + + # Each program gets a task with the sequence of edits that generated it and + # the associated cost. First candidate with cost 1 is the initial program. + program = tuple(annotate(code)) + todo.push((program, (), 1.0), -1.0) + + n_tested = 0 + start_time = time.monotonic() + total_time = 0 + while total_time < timeout: + # Get next candidate. + task = todo.pop() + if task == None: + break + program, path, path_cost = task + + # If we have already seen this code, skip it. + code = compose(program) + if code in done: + continue + done.add(code) + + # Print some info about the current task. + if debug: + print('Cost {:.12f}'.format(path_cost)) + for step_type, idx, a, b, _ in path: + print('index {}: {} {} → {}'.format(idx, step_type, stringify(a), stringify(b))) + + # If the code is correct, we are done. + if test(code): + path = postprocess(path) + return code, path, total_time, n_tested + n_tested += 1 + + # Otherwise generate new solutions. + for new_program, new_step, new_cost in step(program, path): + new_path_cost = path_cost * new_cost + if new_path_cost < 0.01: + continue + new_path = path + (new_step,) + todo.push((new_program, new_path, new_path_cost), -new_path_cost) + + total_time = time.monotonic() - start_time + + return '', [], total_time, n_tested + +# Return tuples (type, start, end, message) describing edits in [path]. +def fix_hints(code, path): + program = list(annotate(code)) + + for step_type, idx, a, b in path: + if step_type == 'add_rule': + fix_type = 'insert' + msg = 'Add another rule.' + start = program[idx].pos-1 + end = start+2 + elif step_type == 'add_part': + fix_type = 'insert' + msg = 'Add another goal to this rule.' + start = program[idx].pos-1 + end = start+2 + elif step_type == 'remove_rule': + fix_type = 'remove' + msg = 'Remove this rule.' + start = program[idx].pos + end = program[idx + len(a) - 1].pos + elif step_type == 'remove_part': + fix_type = 'remove' + msg = 'Remove this goal.' + start = program[idx].pos + end = idx + len(a) - 1 + if program[end].type in ('COMMA', 'PERIOD', 'SEMI'): + end -= 1 + end = program[end].pos + len(program[end].val) + elif step_type == 'change_part': + fix_type = 'change' + msg = 'Check this part.' + first = 0 + while idx+first < len(program)-1 and first < len(a) and first < len(b) and a[first] == b[first]: + first += 1 + last = len(a)-1 + while last < len(b) and last >= first and a[last] == b[last]: + last -= 1 + start = program[idx+first].pos + end = program[idx+last].pos + len(program[idx+last].val) + + program[idx:idx+len(a)] = [t.clone(pos=program[idx].pos) for t in b] + yield fix_type, start, end, msg + + +# Checks for typos in the code and suggest the nearst uploaded term by other users. +def check_typos(code, names): + for token in annotate(code): + if token.type == 'NAME': + nearest_name = ' ' + nearest_dist = 1000 + own_count = names.get(token.val, 0) # count of the token.val which is compared with the + # each name in the names + for name in names.items(): + if name[0] == token.val: # If the names are the skip the code + continue + + distance = damerau_levenshtein(token.val, name[0]) + + if distance < nearest_dist and distance > 0 and own_count < name[1]: + nearest_dist = distance # Set best_dist and best_name if the less one is found + nearest_name = name[0] + if nearest_dist > 0 and nearest_dist/len(nearest_name) <= 1/3: + yield 'typo', token.pos, token.pos + len(token.val) , 'Did you mean "{}"?'.format(nearest_name) diff --git a/monkey/monkey.py b/monkey/monkey.py deleted file mode 100755 index 6577450..0000000 --- a/monkey/monkey.py +++ /dev/null @@ -1,325 +0,0 @@ -#!/usr/bin/python3 - -import math -import time -import prolog.engine - -from .edits import classify_edits -from prolog.util import Token, annotate, compose, map_vars, normalized, rename_vars, stringify -from .util import damerau_levenshtein, PQueue - -# Starting from [code], find a sequence of edits that transforms it into a -# correct predicate for [name]. The [test] function is used to test generated -# programs. -# Return (solution, edits, time spent, #programs checked). If no solution is -# found within [timeout] seconds, return solution='' and edits=[]. -def fix(name, code, edits, test, timeout=30, debug=False): - # A dictionary of edits with costs for each edit type (insert, remove or - # change a line). Edits are tuples (before, after), where before and after - # are sequences of tokens. Variable names are normalized to A0, A1, A2,…. - inserts, removes, changes = classify_edits(edits) - - # Generate states that can be reached from the given program with one edit. - # The program is given as a list of tokens. - def step(program, path=None): - # Apply edits to program in order; skip tokens with index lower than - # last step. - start = path[-1][4] if path else 0 - - for i, token in enumerate(program): - # Get variable names in the current rule. - if i == 0 or program[i-1].type == 'PERIOD': - variables = [] - for t in program[i:]: - if t.type == 'VARIABLE' and not t.val.startswith('_'): - if t.val not in variables: - variables.append(t.val) - elif t.type == 'PERIOD': - break - - # Remember the first token in the current part. - if i == 0 or program[i-1].stop: - first = i - - # Don't edit already modified parts of the program. - if i < start: - continue - - # Add a new fact at beginning or after another rule. - if i == 0 or token.type == 'PERIOD': - new_start = i+1 if token.type == 'PERIOD' else 0 - n_rules = program.count(Token('PERIOD', '.')) - rule = 0 if i == 0 else program[i-1].rule+1 # index of new rule - - for new, cost in inserts.items(): - # New rule must start with correct predicate name. - if not (new[0].type == 'NAME' and new[0].val in name): - continue - - # Here we always insert a fact, so replace trailing :- with .. - if new[-1].type == 'FROM': - new = new[:-1] - - new_real = tuple([t.clone(rule=rule, part=0) for t in new + (Token('PERIOD', '.', stop=True),)]) - new_after = tuple([t.clone(rule=t.rule+1) for t in program[new_start:]]) - - new_program = program[:new_start] + new_real + new_after - new_step = ('add_rule', new_start, (), new_real, new_start) - new_cost = cost * math.pow(0.3, n_rules) - yield (new_program, new_step, new_cost) - - if token.stop and i > first: - real_last = last = i+1 - if token.type != 'FROM': - real_last -= 1 - part = program[first:real_last] - part_whole = program[first:last] - part_normal = tuple(rename_vars(part)) - - # Apply each edit a→b where a matches this part. - seen = False # has there been such an edit? - for (a, b), cost in changes.items(): - if part_normal == a: - seen = True - if b[-1].type == 'FROM': - b = b[:-1] + (b[-1].clone(stop=True),) - b_real = tuple([t.clone(rule=program[first].rule, - part=program[first].part) - for t in map_vars(a, b, part, variables)]) - - new_program = program[:first] + b_real + program[real_last:] - new_step = ('change_part', first, part, b_real, first) - yield (new_program, new_step, cost) - - - # Remove this part. - if token.type in ('COMMA', 'SEMI'): - if part_normal in removes.keys() or not seen: - new_after = list(program[last:]) - for j, t in enumerate(new_after): - if t.rule > token.rule: - break - new_after[j] = t.clone(part=t.part-1) - new_program = program[:first] + tuple(new_after) - new_step = ('remove_part', first, part_whole, (), first-1) - new_cost = removes.get(part_normal, 1.0) * 0.99 - yield (new_program, new_step, new_cost) - - # Remove this part at the end of the current rule. - if token.type == 'PERIOD' and token.part > 0: - if part_normal in removes.keys() or not seen: - if token.part == 0: # part is fact, remove rule - new_after = list(program[last+1:]) - for j, t in enumerate(new_after): - new_after[j] = t.clone(rule=t.rule-1) - new_program = program[:first] + tuple(new_after) - new_step = ('remove_rule', first, part, (), first) - else: # part is subgoal, remove part - new_after = list(program[last-1:]) - for j, t in enumerate(new_after): - if t.rule > token.rule: - break - new_after[j] = t.clone(part=t.part-1) - new_program = program[:first-1] + tuple(new_after) - new_step = ('remove_part', first-1, (program[first-1],) + part, (), first-1) - - new_cost = removes.get(part_normal, 1.0) * 0.99 - yield (new_program, new_step, new_cost) - - - # Insert a new part (goal) after this part. - if token.type in ('COMMA', 'FROM'): - for new, cost in inserts.items(): - # Don't try to insert a head into the body. - if new[-1].type == 'FROM': - continue - - new_real = tuple([t.clone(rule=program[first].rule, - part=program[first].part+1) - for t in map_vars([], new, [], variables) + [Token('COMMA', ',', stop=True)]]) - - new_after = list(program[last:]) - for j, t in enumerate(new_after): - if t.rule > token.rule: - break - new_after[j] = t.clone(rule=program[first].rule, part=t.part+1) - - new_program = program[:last] + new_real + tuple(new_after) - new_step = ('add_part', last, (), new_real, last) - new_cost = cost * 0.4 - yield (new_program, new_step, new_cost) - - # Insert a new part (goal) at the end of current rule. - if token.type == 'PERIOD': - for new, cost in inserts.items(): - # Don't try to insert a head into the body. - if new[-1].type == 'FROM': - continue - - prepend = Token('FROM', ':-') if token.part == 0 else Token('COMMA', ',') - new_real = (prepend.clone(stop=True, rule=token.rule, part=token.part),) + \ - tuple([t.clone(rule=token.rule, part=token.part+1) - for t in map_vars([], new, [], variables)]) - - new_after = list(program[last-1:]) - for j, t in enumerate(new_after): - if t.rule > token.rule: - break - new_after[j] = t.clone(rule=t.rule, part=t.part+1) - - new_program = program[:last-1] + new_real + tuple(new_after) - new_step = ('add_part', last-1, (), new_real, last) - new_cost = cost * 0.4 - yield (new_program, new_step, new_cost) - - # Return a cleaned-up list of steps. - def postprocess(steps): - new_steps = [] - for step in steps: - # Remove the last field from each step as it is unnecessary after a - # path has been found. - step = step[:4] - if new_steps: - prev = new_steps[-1] - if prev[0] == 'remove_part' and step[0] == 'remove_part' and \ - prev[1] == step[1]: - new_steps[-1] = ('remove_part', prev[1], prev[2]+step[2], ()) - continue - - if prev[0] == 'remove_part' and step[0] == 'add_part' and \ - prev[1] == step[1]: - new_steps[-1] = ('change_part', prev[1], prev[2], step[3]) - continue - - if prev[0] == 'change_part' and step[0] == 'change_part' and \ - prev[1] == step[1] and step[2] == prev[3]: - new_steps[-1] = ('change_part', prev[1], prev[2], step[3]) - continue - - if prev[0] in ('add_part', 'change_part') and step[0] == 'change_part' and \ - prev[1] == step[1] and step[2] == prev[3][:-1]: - new_steps[-1] = (prev[0], prev[1], prev[2], step[3]+(prev[3][-1],)) - continue - - if prev[0] in ('add_part', 'change_part') and step[0] == 'change_part' and \ - step[1] == prev[1]+1 and step[2] == prev[3][1:]: - new_steps[-1] = (prev[0], prev[1], prev[2], (prev[3][0],)+step[3]) - continue - new_steps.append(step) - for step in new_steps: - print('index {}: {} {} → {}'.format( - step[1], step[0], stringify(step[2]), stringify(step[3]))) - return new_steps - - # Main loop: best-first search through generated programs. - todo = PQueue() # priority queue of candidate solutions - done = set() # programs we have already visited - - # Each program gets a task with the sequence of edits that generated it and - # the associated cost. First candidate with cost 1 is the initial program. - program = tuple(annotate(code)) - todo.push((program, (), 1.0), -1.0) - - n_tested = 0 - start_time = time.monotonic() - total_time = 0 - while total_time < timeout: - # Get next candidate. - task = todo.pop() - if task == None: - break - program, path, path_cost = task - - # If we have already seen this code, skip it. - code = compose(program) - if code in done: - continue - done.add(code) - - # Print some info about the current task. - if debug: - print('Cost {:.12f}'.format(path_cost)) - for step_type, idx, a, b, _ in path: - print('index {}: {} {} → {}'.format(idx, step_type, stringify(a), stringify(b))) - - # If the code is correct, we are done. - if test(code): - path = postprocess(path) - return code, path, total_time, n_tested - n_tested += 1 - - # Otherwise generate new solutions. - for new_program, new_step, new_cost in step(program, path): - new_path_cost = path_cost * new_cost - if new_path_cost < 0.01: - continue - new_path = path + (new_step,) - todo.push((new_program, new_path, new_path_cost), -new_path_cost) - - total_time = time.monotonic() - start_time - - return '', [], total_time, n_tested - -# Return tuples (type, start, end, message) describing edits in [path]. -def fix_hints(code, path): - program = list(annotate(code)) - - for step_type, idx, a, b in path: - if step_type == 'add_rule': - fix_type = 'insert' - msg = 'Add another rule.' - start = program[idx].pos-1 - end = start+2 - elif step_type == 'add_part': - fix_type = 'insert' - msg = 'Add another goal to this rule.' - start = program[idx].pos-1 - end = start+2 - elif step_type == 'remove_rule': - fix_type = 'remove' - msg = 'Remove this rule.' - start = program[idx].pos - end = program[idx + len(a) - 1].pos - elif step_type == 'remove_part': - fix_type = 'remove' - msg = 'Remove this goal.' - start = program[idx].pos - end = idx + len(a) - 1 - if program[end].type in ('COMMA', 'PERIOD', 'SEMI'): - end -= 1 - end = program[end].pos + len(program[end].val) - elif step_type == 'change_part': - fix_type = 'change' - msg = 'Check this part.' - first = 0 - while idx+first < len(program)-1 and first < len(a) and first < len(b) and a[first] == b[first]: - first += 1 - last = len(a)-1 - while last < len(b) and last >= first and a[last] == b[last]: - last -= 1 - start = program[idx+first].pos - end = program[idx+last].pos + len(program[idx+last].val) - - program[idx:idx+len(a)] = [t.clone(pos=program[idx].pos) for t in b] - yield fix_type, start, end, msg - - -# Checks for typos in the code and suggest the nearst uploaded term by other users. -def check_typos(code, names): - for token in annotate(code): - if token.type == 'NAME': - nearest_name = ' ' - nearest_dist = 1000 - own_count = names.get(token.val, 0) # count of the token.val which is compared with the - # each name in the names - for name in names.items(): - if name[0] == token.val: # If the names are the skip the code - continue - - distance = damerau_levenshtein(token.val, name[0]) - - if distance < nearest_dist and distance > 0 and own_count < name[1]: - nearest_dist = distance # Set best_dist and best_name if the less one is found - nearest_name = name[0] - if nearest_dist > 0 and nearest_dist/len(nearest_name) <= 1/3: - yield 'typo', token.pos, token.pos + len(token.val) , 'Did you mean "{}"?'.format(nearest_name) diff --git a/monkey/test.py b/monkey/test.py index 6736702..9936cdc 100755 --- a/monkey/test.py +++ b/monkey/test.py @@ -9,7 +9,7 @@ from termcolor import colored from db.models import CodeqUser, Problem, Solution from .edits import classify_edits, trace_graph from .graph import graphviz -from .monkey import fix, fix_hints +from . import fix, fix_hints from prolog.util import annotate, compose, stringify import server.problems from .util import indent @@ -20,13 +20,13 @@ if len(sys.argv) < 2: sys.exit(1) pid = int(sys.argv[1]) -name = Problem.get(id=pid).name language, problem_group, problem = Problem.get_identifier(pid) solved_problems = [p for p in CodeqUser.solved_problems(1, language) if p != (problem_group, problem)] other_solutions = server.problems.solutions_for_problems(language, solved_problems) problem_module = server.problems.load_problem(language, problem_group, problem, 'common') +name = server.problems.load_problem(language, problem_group, problem, 'en').name # Testing function. def test(code): -- cgit v1.2.1