#!/usr/bin/python3 import math import time from .edits import classify_edits from .prolog.engine import test from .prolog.util import compose, decompose, map_vars, rename_vars, stringify from .util import PQueue, Token # Starting from [code], find a sequence of [edits] that transforms it into a # correct predicate for [name]. Append [aux_code] when testing (available facts # and predicates). # Return (solution, edits, time spent, #programs checked). If no solution is # found within [timeout] seconds, solution='' and edits=[]. def fix(name, code, edits, aux_code='', timeout=30, debug=False): todo = PQueue() # priority queue of candidate solutions done = set() # programs we have already visited inserts, removes, changes = classify_edits(edits) # Generate states that can be reached from the given program with one edit. # Program code is given as a list of [lines], where each line is a list of # tokens. Rule ranges are given in [rules] (see prolog.util.decompose). def step(lines, rules, prev=None): # Apply edits in order from top to bottom; skip lines with index lower # than last step. start_line = prev[1] if prev else 0 for start, end in rules: rule_lines = lines[start:end] rule_tokens = [t for line in rule_lines for t in line] for line_idx in range(start, end): if line_idx < start_line: continue line = lines[line_idx] line_normal = list(line) rename_vars(line_normal) line_normal = tuple(line_normal) seen = False # Apply each edit that matches this line. for (before, after), cost in changes.items(): if line_normal == before: seen = True mapping = map_vars(before, after, line, rule_tokens) after_real = tuple([t if t.type != 'VARIABLE' else Token('VARIABLE', mapping[t.val]) for t in after]) new_lines = lines[:line_idx] + (after_real,) + lines[line_idx+1:] new_step = ('change_line', line_idx, (tuple(line), after_real)) yield (new_lines, rules, new_step, cost) # Remove the current line. if line_normal in removes.keys() or not seen: new_lines = lines[:line_idx] + lines[line_idx+1:] new_rules = [] for old_start, old_end in rules: new_start, new_end = (old_start - (0 if old_start <= line_idx else 1), old_end - (0 if old_end <= line_idx else 1)) if new_end > new_start: new_rules.append((new_start, new_end)) new_step = ('remove_line', line_idx, (tuple(line), ())) new_cost = removes[line_normal] if line_normal in removes.keys() else 0.9 yield (new_lines, new_rules, new_step, new_cost) # Add a line to the current rule. for after, cost in inserts.items(): mapping = map_vars([], after, [], rule_tokens) after_real = [t if t.type != 'VARIABLE' else Token('VARIABLE', mapping[t.val]) for t in after] after_real = tuple(after_real) new_lines = lines[:end] + (after_real,) + lines[end:] new_rules = [] for old_start, old_end in rules: new_rules.append((old_start + (0 if old_start < end else 1), old_end + (0 if old_end < end else 1))) new_step = ('add_subgoal', end, ((), after_real)) yield (new_lines, new_rules, new_step, cost) # Add a new fact at the end. if len(rules) < 2: for after, cost in inserts.items(): new_lines = lines + (after,) new_rules = rules + (((len(lines), len(lines)+1)),) new_step = ('add_rule', len(new_lines)-1, (tuple(), tuple(after))) yield (new_lines, new_rules, new_step, cost) # Return a cleaned-up list of steps: # - multiple line edits in a single line are merged into one # - check if any lines can be removed from the program def postprocess(steps): new_steps = [] i = 0 while i < len(steps): if i < len(steps)-1 and \ steps[i][0] == 'change_line' and steps[i+1][0] == 'change_line' and \ steps[i][1] == steps[i+1][1] and steps[i][2][1] == steps[i+1][2][0]: new_steps.append(('change_line', steps[i][1], (steps[i][2][0], steps[i+1][2][1]))) i += 1 else: new_steps.append(steps[i]) i += 1 return new_steps # Add the initial program to the queue. lines, rules = decompose(code) todo.push(((tuple(lines), tuple(rules)), (), 1.0), -1.0) start_time = time.monotonic() n_tested = 0 while True: # Have we been at it for too long? total_time = time.monotonic() - start_time if total_time > timeout: break # Get next candidate. task = todo.pop() if task == None: break (lines, rules), path, path_cost = task # If we have already seen this code, skip it. code = compose(lines, rules) if code in done: continue done.add(code) if debug: print('Cost {:.12f}'.format(path_cost)) for step_type, line, (before, after) in path: print('line {}: {} {} → {}'.format(line, step_type, stringify(before), stringify(after))) # If the code is correct, we are done. if test(name, code + '\n' + aux_code): path = postprocess(path) return code, path, total_time, n_tested n_tested += 1 # Otherwise generate new solutions. path = list(path) prev_step = path[-1] if path else None for new_lines, new_rules, new_step, new_cost in step(lines, rules, prev_step): new_path_cost = path_cost * new_cost * 0.99 if new_path_cost < 0.005: continue new_path = tuple(path + [new_step]) todo.push(((tuple(new_lines), tuple(new_rules)), new_path, new_path_cost), -new_path_cost) return '', [], total_time, n_tested # Return a list of character ranges modified by the sequence [edits]. def fix_ranges(edits): marks = [] for step_type, line, (before, after) in edits: if step_type == 'change_line': marks.append(('change', (before[0].pos, before[-1].pos+len(before[-1].val)))) elif step_type == 'remove_line': marks.append(('remove', (before[0].pos, before[-1].pos+len(before[-1].val)))) elif step_type == 'add_subgoal' or step_type == 'add_rule': marks.append((step_type, line)) return marks