#!/usr/bin/python3 import math import time import prolog.engine from .edits import classify_edits from prolog.util import Token, annotate, compose, map_vars, normalized, rename_vars, stringify from .util import damerau_levenshtein, PQueue # Starting from [code], find a sequence of edits that transforms it into a # correct predicate for [name]. The [test] function is used to test generated # programs. # Return (solution, edits, time spent, #programs checked). If no solution is # found within [timeout] seconds, return solution='' and edits=[]. def fix(name, code, edits, test, timeout=30, debug=False): # A dictionary of edits with costs for each edit type (insert, remove or # change a line). Edits are tuples (before, after), where before and after # are sequences of tokens. Variable names are normalized to A0, A1, A2,…. inserts, removes, changes = classify_edits(edits) # Generate states that can be reached from the given program with one edit. # The program is given as a list of tokens. def step(program, path=None): # Apply edits to program in order; skip tokens with index lower than # last step. start = path[-1][4] if path else 0 for i, token in enumerate(program): # Get variable names in the current rule. if i == 0 or program[i-1].type == 'PERIOD': variables = [] for t in program[i:]: if t.type == 'VARIABLE' and not t.val.startswith('_'): if t.val not in variables: variables.append(t.val) elif t.type == 'PERIOD': break # Remember the first token in the current part. if i == 0 or program[i-1].stop: first = i # Don't edit already modified parts of the program. if i < start: continue # Add a new fact at beginning or after another rule. if i == 0 or token.type == 'PERIOD': new_start = i+1 if token.type == 'PERIOD' else 0 n_rules = program.count(Token('PERIOD', '.')) rule = 0 if i == 0 else program[i-1].rule+1 # index of new rule for new, cost in inserts.items(): # New rule must start with correct predicate name. if not (new[0].type == 'NAME' and new[0].val in name): continue # Here we always insert a fact, so replace trailing :- with .. if new[-1].type == 'FROM': new = new[:-1] new_real = tuple([t.clone(rule=rule, part=0) for t in new + (Token('PERIOD', '.', stop=True),)]) new_after = tuple([t.clone(rule=t.rule+1) for t in program[new_start:]]) new_program = program[:new_start] + new_real + new_after new_step = ('add_rule', new_start, (), new_real, new_start) new_cost = cost * math.pow(0.3, n_rules) yield (new_program, new_step, new_cost) if token.stop and i > first: real_last = last = i+1 if token.type != 'FROM': real_last -= 1 part = program[first:real_last] part_whole = program[first:last] part_normal = tuple(rename_vars(part)) # Apply each edit a→b where a matches this part. seen = False # has there been such an edit? for (a, b), cost in changes.items(): if part_normal == a: seen = True if b[-1].type == 'FROM': b = b[:-1] + (b[-1].clone(stop=True),) b_real = tuple([t.clone(rule=program[first].rule, part=program[first].part) for t in map_vars(a, b, part, variables)]) new_program = program[:first] + b_real + program[real_last:] new_step = ('change_part', first, part, b_real, first) yield (new_program, new_step, cost) # Remove this part. if token.type in ('COMMA', 'SEMI'): if part_normal in removes.keys() or not seen: new_after = list(program[last:]) for j, t in enumerate(new_after): if t.rule > token.rule: break new_after[j] = t.clone(part=t.part-1) new_program = program[:first] + tuple(new_after) new_step = ('remove_part', first, part_whole, (), first-1) new_cost = removes.get(part_normal, 1.0) * 0.99 yield (new_program, new_step, new_cost) # Remove this part at the end of the current rule. if token.type == 'PERIOD' and token.part > 0: if part_normal in removes.keys() or not seen: if token.part == 0: # part is fact, remove rule new_after = list(program[last+1:]) for j, t in enumerate(new_after): new_after[j] = t.clone(rule=t.rule-1) new_program = program[:first] + tuple(new_after) new_step = ('remove_rule', first, part, (), first) else: # part is subgoal, remove part new_after = list(program[last-1:]) for j, t in enumerate(new_after): if t.rule > token.rule: break new_after[j] = t.clone(part=t.part-1) new_program = program[:first-1] + tuple(new_after) new_step = ('remove_part', first-1, (program[first-1],) + part, (), first-1) new_cost = removes.get(part_normal, 1.0) * 0.99 yield (new_program, new_step, new_cost) # Insert a new part (goal) after this part. if token.type in ('COMMA', 'FROM'): for new, cost in inserts.items(): # Don't try to insert a head into the body. if new[-1].type == 'FROM': continue new_real = tuple([t.clone(rule=program[first].rule, part=program[first].part+1) for t in map_vars([], new, [], variables) + [Token('COMMA', ',', stop=True)]]) new_after = list(program[last:]) for j, t in enumerate(new_after): if t.rule > token.rule: break new_after[j] = t.clone(rule=program[first].rule, part=t.part+1) new_program = program[:last] + new_real + tuple(new_after) new_step = ('add_part', last, (), new_real, last) new_cost = cost * 0.4 yield (new_program, new_step, new_cost) # Insert a new part (goal) at the end of current rule. if token.type == 'PERIOD': for new, cost in inserts.items(): # Don't try to insert a head into the body. if new[-1].type == 'FROM': continue prepend = Token('FROM', ':-') if token.part == 0 else Token('COMMA', ',') new_real = (prepend.clone(stop=True, rule=token.rule, part=token.part),) + \ tuple([t.clone(rule=token.rule, part=token.part+1) for t in map_vars([], new, [], variables)]) new_after = list(program[last-1:]) for j, t in enumerate(new_after): if t.rule > token.rule: break new_after[j] = t.clone(rule=t.rule, part=t.part+1) new_program = program[:last-1] + new_real + tuple(new_after) new_step = ('add_part', last-1, (), new_real, last) new_cost = cost * 0.4 yield (new_program, new_step, new_cost) # Return a cleaned-up list of steps. def postprocess(steps): new_steps = [] for step in steps: # Remove the last field from each step as it is unnecessary after a # path has been found. step = step[:4] if new_steps: prev = new_steps[-1] if prev[0] == 'remove_part' and step[0] == 'remove_part' and \ prev[1] == step[1]: new_steps[-1] = ('remove_part', prev[1], prev[2]+step[2], ()) continue if prev[0] == 'remove_part' and step[0] == 'add_part' and \ prev[1] == step[1]: new_steps[-1] = ('change_part', prev[1], prev[2], step[3]) continue if prev[0] == 'change_part' and step[0] == 'change_part' and \ prev[1] == step[1] and step[2] == prev[3]: new_steps[-1] = ('change_part', prev[1], prev[2], step[3]) continue if prev[0] in ('add_part', 'change_part') and step[0] == 'change_part' and \ prev[1] == step[1] and step[2] == prev[3][:-1]: new_steps[-1] = (prev[0], prev[1], prev[2], step[3]+(prev[3][-1],)) continue if prev[0] in ('add_part', 'change_part') and step[0] == 'change_part' and \ step[1] == prev[1]+1 and step[2] == prev[3][1:]: new_steps[-1] = (prev[0], prev[1], prev[2], (prev[3][0],)+step[3]) continue new_steps.append(step) for step in new_steps: print('index {}: {} {} → {}'.format( step[1], step[0], stringify(step[2]), stringify(step[3]))) return new_steps # Main loop: best-first search through generated programs. todo = PQueue() # priority queue of candidate solutions done = set() # programs we have already visited # Each program gets a task with the sequence of edits that generated it and # the associated cost. First candidate with cost 1 is the initial program. program = tuple(annotate(code)) todo.push((program, (), 1.0), -1.0) n_tested = 0 start_time = time.monotonic() total_time = 0 while total_time < timeout: # Get next candidate. task = todo.pop() if task == None: break program, path, path_cost = task # If we have already seen this code, skip it. code = compose(program) if code in done: continue done.add(code) # Print some info about the current task. if debug: print('Cost {:.12f}'.format(path_cost)) for step_type, idx, a, b, _ in path: print('index {}: {} {} → {}'.format(idx, step_type, stringify(a), stringify(b))) # If the code is correct, we are done. if test(code): path = postprocess(path) return code, path, total_time, n_tested n_tested += 1 # Otherwise generate new solutions. for new_program, new_step, new_cost in step(program, path): new_path_cost = path_cost * new_cost if new_path_cost < 0.01: continue new_path = path + (new_step,) todo.push((new_program, new_path, new_path_cost), -new_path_cost) total_time = time.monotonic() - start_time return '', [], total_time, n_tested # Return tuples (type, start, end, message) describing edits in [path]. def fix_hints(code, path): program = list(annotate(code)) for step_type, idx, a, b in path: if step_type == 'add_rule': fix_type = 'insert' msg = 'Add another rule.' start = program[idx].pos-1 end = start+2 elif step_type == 'add_part': fix_type = 'insert' msg = 'Add another goal to this rule.' start = program[idx].pos-1 end = start+2 elif step_type == 'remove_rule': fix_type = 'remove' msg = 'Remove this rule.' start = program[idx].pos end = program[idx + len(a) - 1].pos elif step_type == 'remove_part': fix_type = 'remove' msg = 'Remove this goal.' start = program[idx].pos end = idx + len(a) - 1 if program[end].type in ('COMMA', 'PERIOD', 'SEMI'): end -= 1 end = program[end].pos + len(program[end].val) elif step_type == 'change_part': fix_type = 'change' msg = 'Check this part.' first = 0 while idx+first < len(program)-1 and first < len(a) and first < len(b) and a[first] == b[first]: first += 1 last = len(a)-1 while last < len(b) and last >= first and a[last] == b[last]: last -= 1 start = program[idx+first].pos end = program[idx+last].pos + len(program[idx+last].val) program[idx:idx+len(a)] = [t.clone(pos=program[idx].pos) for t in b] yield fix_type, start, end, msg # Checks for typos in the code and suggest the nearst uploaded term by other users. def check_typos(code, names): for token in annotate(code): if token.type == 'NAME': nearest_name = ' ' nearest_dist = 1000 own_count = names.get(token.val, 0) # count of the token.val which is compared with the # each name in the names for name in names.items(): if name[0] == token.val: # If the names are the skip the code continue distance = damerau_levenshtein(token.val, name[0]) if distance < nearest_dist and distance > 0 and own_count < name[1]: nearest_dist = distance # Set best_dist and best_name if the less one is found nearest_name = name[0] if nearest_dist > 0 and nearest_dist/len(nearest_name) <= 1/3: yield 'typo', token.pos, token.pos + len(token.val) , 'Did you mean "{}"?'.format(nearest_name)