#!/usr/bin/python3 import math import time from .edits import classify_edits from prolog.engine import test from prolog.util import Token, compose, decompose, map_vars, normalized, rename_vars, stringify from .util import PQueue # Starting from [code], find a sequence of edits that transforms it into a # correct predicate for [name]. Append [aux_code] when testing (available facts # and predicates). # Return (solution, edits, time spent, #programs checked). If no solution is # found within [timeout] seconds, solution='' and edits=[]. def fix(name, code, edits, aux_code='', timeout=30, debug=False): # A dictionary of edits with costs for each edit type (insert, remove or # change a line). Edits are tuples (before, after), where before and after # are sequences of tokens. Variable names are normalized to A0, A1, A2,…. inserts, removes, changes = classify_edits(edits) # Generate states that can be reached from the given program with one edit. # Program code is given as a list of [lines], where each line is a list of # tokens. Rule ranges are given in [rules] (see prolog.util.decompose). def step(lines, rules, path=None): # Apply edits in order from top to bottom; skip lines with index lower # than last step. start_line = path[-1][1] if path else 0 for start, end in rules: rule_lines = lines[start:end] rule_tokens = [t for line in rule_lines for t in line] # Prepend a new rule (fact) before this rule (only if no line in # the current rule has been modified yet). if start_line == 0 or start > start_line: for after, cost in inserts.items(): new_lines = lines[:start] + (after,) + lines[start:] new_rules = [] for old_start, old_end in rules: if old_start == start: new_rules.append((start, start+1)) new_rules.append((old_start + (0 if old_start < start else 1), old_end + (0 if old_end < start else 1))) new_step = ('add_rule', start, (tuple(), after)) # Decrease probability as we add more rules. new_cost = cost * math.pow(0.3, len(rules)) yield (new_lines, new_rules, new_step, new_cost) # Apply some edits for each line in this rule. for line_idx in range(start, end): if line_idx < start_line: continue line = lines[line_idx] line_normal = tuple(rename_vars(line)) # Apply edits whose left-hand side matches this line. seen = False # has there been such an edit? for (before, after), cost in changes.items(): if line_normal == before: seen = True mapping = map_vars(before, after, line, rule_tokens) after_real = tuple([t if t.type != 'VARIABLE' else Token('VARIABLE', mapping[t.val]) for t in after]) new_lines = lines[:line_idx] + (after_real,) + lines[line_idx+1:] new_step = ('change_line', line_idx, (tuple(line), after_real)) yield (new_lines, rules, new_step, cost) # Remove the current line. if line_normal in removes.keys() or not seen: new_lines = lines[:line_idx] + lines[line_idx+1:] new_rules = [] for old_start, old_end in rules: new_start, new_end = (old_start - (0 if old_start <= line_idx else 1), old_end - (0 if old_end <= line_idx else 1)) if new_end > new_start: new_rules.append((new_start, new_end)) new_step = ('remove_line', line_idx, (tuple(line), ())) new_cost = removes.get(line_normal, 1.0) * 0.99 yield (new_lines, new_rules, new_step, new_cost) # Add a line after this one. for after, cost in inserts.items(): # Don't try to insert a head into the body. if after[-1].type == 'FROM': continue mapping = map_vars([], after, [], rule_tokens) after_real = tuple([t if t.type != 'VARIABLE' else Token('VARIABLE', mapping[t.val]) for t in after]) idx = line_idx+1 new_lines = lines[:idx] + (after_real,) + lines[idx:] new_rules = [] for old_start, old_end in rules: new_rules.append((old_start + (0 if old_start < idx else 1), old_end + (0 if old_end < idx else 1))) new_step = ('add_subgoal', idx, ((), after_real)) new_cost = cost * 0.4 yield (new_lines, new_rules, new_step, new_cost) # Return a cleaned-up list of steps: # - multiple line edits in a single line are merged into one # - check if any lines can be removed from the program def postprocess(steps): new_steps = [] for step in steps: if new_steps: prev = new_steps[-1] if prev[1] == step[1] and \ prev[0] in ('add_rule', 'add_subgoal', 'change_line') and \ step[0] == 'change_line' and \ normalized(prev[2][1]) == normalized(step[2][0]): new_steps[-1] = (prev[0], prev[1], (prev[2][0], step[2][1])) continue if prev[0] == 'add_subgoal' and step[0] == 'remove_line' and \ prev[1]+1 == step[1]: new_steps[-1] = ('change_line', prev[1], (step[2][0], prev[2][1])) continue new_steps.append(step) return new_steps # Main loop: best-first search through generated programs. todo = PQueue() # priority queue of candidate solutions done = set() # programs we have already visited # Each program gets a task with the sequence of edits that generated it and # the associated cost. First candidate with cost 1 is the initial program. lines, rules = decompose(code) todo.push(((tuple(lines), tuple(rules)), (), 1.0), -1.0) n_tested = 0 start_time = time.monotonic() total_time = 0 while total_time < timeout: # Get next candidate. task = todo.pop() if task == None: break (lines, rules), path, path_cost = task # If we have already seen this code, skip it. code = compose(lines, rules) if code in done: continue done.add(code) # Print some info about the current task. if debug: print('Cost {:.12f}'.format(path_cost)) for step_type, line, (before, after) in path: print('line {}: {} {} → {}'.format(line, step_type, stringify(before), stringify(after))) # If the code is correct, we are done. if test(name, code + '\n' + aux_code): path = postprocess(path) return code, path, total_time, n_tested n_tested += 1 # Otherwise generate new solutions. for new_lines, new_rules, new_step, new_cost in step(lines, rules, path): new_path_cost = path_cost * new_cost if new_path_cost < 0.01: continue new_path = path + (new_step,) todo.push(((tuple(new_lines), tuple(new_rules)), new_path, new_path_cost), -new_path_cost) total_time = time.monotonic() - start_time return '', [], total_time, n_tested