summaryrefslogtreecommitdiff
path: root/monkey/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'monkey/__init__.py')
-rwxr-xr-xmonkey/__init__.py325
1 files changed, 325 insertions, 0 deletions
diff --git a/monkey/__init__.py b/monkey/__init__.py
new file mode 100755
index 0000000..6577450
--- /dev/null
+++ b/monkey/__init__.py
@@ -0,0 +1,325 @@
+#!/usr/bin/python3
+
+import math
+import time
+import prolog.engine
+
+from .edits import classify_edits
+from prolog.util import Token, annotate, compose, map_vars, normalized, rename_vars, stringify
+from .util import damerau_levenshtein, PQueue
+
+# Starting from [code], find a sequence of edits that transforms it into a
+# correct predicate for [name]. The [test] function is used to test generated
+# programs.
+# Return (solution, edits, time spent, #programs checked). If no solution is
+# found within [timeout] seconds, return solution='' and edits=[].
+def fix(name, code, edits, test, timeout=30, debug=False):
+ # A dictionary of edits with costs for each edit type (insert, remove or
+ # change a line). Edits are tuples (before, after), where before and after
+ # are sequences of tokens. Variable names are normalized to A0, A1, A2,….
+ inserts, removes, changes = classify_edits(edits)
+
+ # Generate states that can be reached from the given program with one edit.
+ # The program is given as a list of tokens.
+ def step(program, path=None):
+ # Apply edits to program in order; skip tokens with index lower than
+ # last step.
+ start = path[-1][4] if path else 0
+
+ for i, token in enumerate(program):
+ # Get variable names in the current rule.
+ if i == 0 or program[i-1].type == 'PERIOD':
+ variables = []
+ for t in program[i:]:
+ if t.type == 'VARIABLE' and not t.val.startswith('_'):
+ if t.val not in variables:
+ variables.append(t.val)
+ elif t.type == 'PERIOD':
+ break
+
+ # Remember the first token in the current part.
+ if i == 0 or program[i-1].stop:
+ first = i
+
+ # Don't edit already modified parts of the program.
+ if i < start:
+ continue
+
+ # Add a new fact at beginning or after another rule.
+ if i == 0 or token.type == 'PERIOD':
+ new_start = i+1 if token.type == 'PERIOD' else 0
+ n_rules = program.count(Token('PERIOD', '.'))
+ rule = 0 if i == 0 else program[i-1].rule+1 # index of new rule
+
+ for new, cost in inserts.items():
+ # New rule must start with correct predicate name.
+ if not (new[0].type == 'NAME' and new[0].val in name):
+ continue
+
+ # Here we always insert a fact, so replace trailing :- with ..
+ if new[-1].type == 'FROM':
+ new = new[:-1]
+
+ new_real = tuple([t.clone(rule=rule, part=0) for t in new + (Token('PERIOD', '.', stop=True),)])
+ new_after = tuple([t.clone(rule=t.rule+1) for t in program[new_start:]])
+
+ new_program = program[:new_start] + new_real + new_after
+ new_step = ('add_rule', new_start, (), new_real, new_start)
+ new_cost = cost * math.pow(0.3, n_rules)
+ yield (new_program, new_step, new_cost)
+
+ if token.stop and i > first:
+ real_last = last = i+1
+ if token.type != 'FROM':
+ real_last -= 1
+ part = program[first:real_last]
+ part_whole = program[first:last]
+ part_normal = tuple(rename_vars(part))
+
+ # Apply each edit a→b where a matches this part.
+ seen = False # has there been such an edit?
+ for (a, b), cost in changes.items():
+ if part_normal == a:
+ seen = True
+ if b[-1].type == 'FROM':
+ b = b[:-1] + (b[-1].clone(stop=True),)
+ b_real = tuple([t.clone(rule=program[first].rule,
+ part=program[first].part)
+ for t in map_vars(a, b, part, variables)])
+
+ new_program = program[:first] + b_real + program[real_last:]
+ new_step = ('change_part', first, part, b_real, first)
+ yield (new_program, new_step, cost)
+
+
+ # Remove this part.
+ if token.type in ('COMMA', 'SEMI'):
+ if part_normal in removes.keys() or not seen:
+ new_after = list(program[last:])
+ for j, t in enumerate(new_after):
+ if t.rule > token.rule:
+ break
+ new_after[j] = t.clone(part=t.part-1)
+ new_program = program[:first] + tuple(new_after)
+ new_step = ('remove_part', first, part_whole, (), first-1)
+ new_cost = removes.get(part_normal, 1.0) * 0.99
+ yield (new_program, new_step, new_cost)
+
+ # Remove this part at the end of the current rule.
+ if token.type == 'PERIOD' and token.part > 0:
+ if part_normal in removes.keys() or not seen:
+ if token.part == 0: # part is fact, remove rule
+ new_after = list(program[last+1:])
+ for j, t in enumerate(new_after):
+ new_after[j] = t.clone(rule=t.rule-1)
+ new_program = program[:first] + tuple(new_after)
+ new_step = ('remove_rule', first, part, (), first)
+ else: # part is subgoal, remove part
+ new_after = list(program[last-1:])
+ for j, t in enumerate(new_after):
+ if t.rule > token.rule:
+ break
+ new_after[j] = t.clone(part=t.part-1)
+ new_program = program[:first-1] + tuple(new_after)
+ new_step = ('remove_part', first-1, (program[first-1],) + part, (), first-1)
+
+ new_cost = removes.get(part_normal, 1.0) * 0.99
+ yield (new_program, new_step, new_cost)
+
+
+ # Insert a new part (goal) after this part.
+ if token.type in ('COMMA', 'FROM'):
+ for new, cost in inserts.items():
+ # Don't try to insert a head into the body.
+ if new[-1].type == 'FROM':
+ continue
+
+ new_real = tuple([t.clone(rule=program[first].rule,
+ part=program[first].part+1)
+ for t in map_vars([], new, [], variables) + [Token('COMMA', ',', stop=True)]])
+
+ new_after = list(program[last:])
+ for j, t in enumerate(new_after):
+ if t.rule > token.rule:
+ break
+ new_after[j] = t.clone(rule=program[first].rule, part=t.part+1)
+
+ new_program = program[:last] + new_real + tuple(new_after)
+ new_step = ('add_part', last, (), new_real, last)
+ new_cost = cost * 0.4
+ yield (new_program, new_step, new_cost)
+
+ # Insert a new part (goal) at the end of current rule.
+ if token.type == 'PERIOD':
+ for new, cost in inserts.items():
+ # Don't try to insert a head into the body.
+ if new[-1].type == 'FROM':
+ continue
+
+ prepend = Token('FROM', ':-') if token.part == 0 else Token('COMMA', ',')
+ new_real = (prepend.clone(stop=True, rule=token.rule, part=token.part),) + \
+ tuple([t.clone(rule=token.rule, part=token.part+1)
+ for t in map_vars([], new, [], variables)])
+
+ new_after = list(program[last-1:])
+ for j, t in enumerate(new_after):
+ if t.rule > token.rule:
+ break
+ new_after[j] = t.clone(rule=t.rule, part=t.part+1)
+
+ new_program = program[:last-1] + new_real + tuple(new_after)
+ new_step = ('add_part', last-1, (), new_real, last)
+ new_cost = cost * 0.4
+ yield (new_program, new_step, new_cost)
+
+ # Return a cleaned-up list of steps.
+ def postprocess(steps):
+ new_steps = []
+ for step in steps:
+ # Remove the last field from each step as it is unnecessary after a
+ # path has been found.
+ step = step[:4]
+ if new_steps:
+ prev = new_steps[-1]
+ if prev[0] == 'remove_part' and step[0] == 'remove_part' and \
+ prev[1] == step[1]:
+ new_steps[-1] = ('remove_part', prev[1], prev[2]+step[2], ())
+ continue
+
+ if prev[0] == 'remove_part' and step[0] == 'add_part' and \
+ prev[1] == step[1]:
+ new_steps[-1] = ('change_part', prev[1], prev[2], step[3])
+ continue
+
+ if prev[0] == 'change_part' and step[0] == 'change_part' and \
+ prev[1] == step[1] and step[2] == prev[3]:
+ new_steps[-1] = ('change_part', prev[1], prev[2], step[3])
+ continue
+
+ if prev[0] in ('add_part', 'change_part') and step[0] == 'change_part' and \
+ prev[1] == step[1] and step[2] == prev[3][:-1]:
+ new_steps[-1] = (prev[0], prev[1], prev[2], step[3]+(prev[3][-1],))
+ continue
+
+ if prev[0] in ('add_part', 'change_part') and step[0] == 'change_part' and \
+ step[1] == prev[1]+1 and step[2] == prev[3][1:]:
+ new_steps[-1] = (prev[0], prev[1], prev[2], (prev[3][0],)+step[3])
+ continue
+ new_steps.append(step)
+ for step in new_steps:
+ print('index {}: {} {} → {}'.format(
+ step[1], step[0], stringify(step[2]), stringify(step[3])))
+ return new_steps
+
+ # Main loop: best-first search through generated programs.
+ todo = PQueue() # priority queue of candidate solutions
+ done = set() # programs we have already visited
+
+ # Each program gets a task with the sequence of edits that generated it and
+ # the associated cost. First candidate with cost 1 is the initial program.
+ program = tuple(annotate(code))
+ todo.push((program, (), 1.0), -1.0)
+
+ n_tested = 0
+ start_time = time.monotonic()
+ total_time = 0
+ while total_time < timeout:
+ # Get next candidate.
+ task = todo.pop()
+ if task == None:
+ break
+ program, path, path_cost = task
+
+ # If we have already seen this code, skip it.
+ code = compose(program)
+ if code in done:
+ continue
+ done.add(code)
+
+ # Print some info about the current task.
+ if debug:
+ print('Cost {:.12f}'.format(path_cost))
+ for step_type, idx, a, b, _ in path:
+ print('index {}: {} {} → {}'.format(idx, step_type, stringify(a), stringify(b)))
+
+ # If the code is correct, we are done.
+ if test(code):
+ path = postprocess(path)
+ return code, path, total_time, n_tested
+ n_tested += 1
+
+ # Otherwise generate new solutions.
+ for new_program, new_step, new_cost in step(program, path):
+ new_path_cost = path_cost * new_cost
+ if new_path_cost < 0.01:
+ continue
+ new_path = path + (new_step,)
+ todo.push((new_program, new_path, new_path_cost), -new_path_cost)
+
+ total_time = time.monotonic() - start_time
+
+ return '', [], total_time, n_tested
+
+# Return tuples (type, start, end, message) describing edits in [path].
+def fix_hints(code, path):
+ program = list(annotate(code))
+
+ for step_type, idx, a, b in path:
+ if step_type == 'add_rule':
+ fix_type = 'insert'
+ msg = 'Add another rule.'
+ start = program[idx].pos-1
+ end = start+2
+ elif step_type == 'add_part':
+ fix_type = 'insert'
+ msg = 'Add another goal to this rule.'
+ start = program[idx].pos-1
+ end = start+2
+ elif step_type == 'remove_rule':
+ fix_type = 'remove'
+ msg = 'Remove this rule.'
+ start = program[idx].pos
+ end = program[idx + len(a) - 1].pos
+ elif step_type == 'remove_part':
+ fix_type = 'remove'
+ msg = 'Remove this goal.'
+ start = program[idx].pos
+ end = idx + len(a) - 1
+ if program[end].type in ('COMMA', 'PERIOD', 'SEMI'):
+ end -= 1
+ end = program[end].pos + len(program[end].val)
+ elif step_type == 'change_part':
+ fix_type = 'change'
+ msg = 'Check this part.'
+ first = 0
+ while idx+first < len(program)-1 and first < len(a) and first < len(b) and a[first] == b[first]:
+ first += 1
+ last = len(a)-1
+ while last < len(b) and last >= first and a[last] == b[last]:
+ last -= 1
+ start = program[idx+first].pos
+ end = program[idx+last].pos + len(program[idx+last].val)
+
+ program[idx:idx+len(a)] = [t.clone(pos=program[idx].pos) for t in b]
+ yield fix_type, start, end, msg
+
+
+# Checks for typos in the code and suggest the nearst uploaded term by other users.
+def check_typos(code, names):
+ for token in annotate(code):
+ if token.type == 'NAME':
+ nearest_name = ' '
+ nearest_dist = 1000
+ own_count = names.get(token.val, 0) # count of the token.val which is compared with the
+ # each name in the names
+ for name in names.items():
+ if name[0] == token.val: # If the names are the skip the code
+ continue
+
+ distance = damerau_levenshtein(token.val, name[0])
+
+ if distance < nearest_dist and distance > 0 and own_count < name[1]:
+ nearest_dist = distance # Set best_dist and best_name if the less one is found
+ nearest_name = name[0]
+ if nearest_dist > 0 and nearest_dist/len(nearest_name) <= 1/3:
+ yield 'typo', token.pos, token.pos + len(token.val) , 'Did you mean "{}"?'.format(nearest_name)