# CodeQ: an online programming tutor. # Copyright (C) 2015 UL FRI # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) any # later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more # details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import collections import math from .action import expand, parse from .graph import Node from prolog.util import Token, normalized, parse as prolog_parse, rename_vars, rename_vars_ast, rename_vars_list, interesting_ranges, stringify, tokenize from .util import get_line, avg, logistic def get_edits_from_trace(trace, test, id): submissions = set() # Program versions at 'test' actions. queries = set() # Queries run by the student. # For each observed edit, store a list of features (with repeats) of ASTs # where they were observed. edits = collections.defaultdict(list) def add_edit(path, start, end, tree): if start == end: return edits[(path, start, end)].append(id) # Parse trace actions and ensure there is a separate action for each # inserted/removed character. try: actions = parse(trace) expand(actions) except: # Only a few traces fail to parse, so just skip them. actions = [] # State variables. open_edits = [] code_next = '' # Program code after applying the current action. done = False # Set to True on first correct version. prev_tree = None prev_action = None for action_id, action in enumerate(actions): code = code_next code_next = action.apply(code) if action.type in {'solve', 'solve_all', 'test'}: if action.type == 'solve' or action.type == 'solve_all': queries.add(action.query.rstrip(' .')) elif action.type == 'test': correct = test(code) submissions.add((code, correct)) if correct: # Ignore actions after the first correct version. done = True break tree = prolog_parse(code) if tree and tree.leaves() and tree != prev_tree: for terminals, path in interesting_ranges(tree): pos_start = terminals[0].pos pos_end = terminals[-1].pos + len(terminals[-1].val) # If there is an open edit with the same range, don't add a new one. found = False for e_start_tree, e_start_tokens, e_path, e_pos_start, e_pos_end in open_edits: if e_pos_start == pos_start and e_pos_end == pos_end: found = True break if not found: #print('OPENING {}'.format(terminals)) open_edits.append([tree, terminals, path, pos_start, pos_end]) prev_tree = tree if action.type in {'insert', 'remove'}: new_open_edits = [] for start_tree, start_tokens, path, pos_start, pos_end in open_edits: new_pos_start, new_pos_end = pos_start, pos_end if action.type == 'remove': if action.offset < pos_end: new_pos_end -= 1 if action.offset < pos_start: new_pos_start -= 1 elif action.type == 'insert': if action.offset < pos_start: new_pos_start += 1 new_pos_end += 1 elif action.offset == pos_start: new_pos_end += 1 elif action.offset < pos_end: new_pos_end += 1 elif action.offset == pos_end: if (prev_action is None or prev_action.type == 'insert' and prev_action.offset == action.offset-1 or prev_action.type == 'remove' and prev_action.offset == action.offset-1): orig_next = None for terminal in start_tree.leaves(): if terminal.pos >= start_tokens[-1].pos + len(start_tokens[-1].val): orig_next = terminal break if not (orig_next and orig_next.val[0] == action.text): new_pos_end += 1 pass if new_pos_start != new_pos_end: new_open_edits.append([start_tree, start_tokens, path, new_pos_start, new_pos_end]) open_edits = new_open_edits prev_action = action if done: for start_tree, start_tokens, path, pos_start, pos_end in open_edits: end_tokens = tokenize(code[pos_start:pos_end]) names = {} start_normal = rename_vars_list(start_tokens, names) end_normal = rename_vars_list(end_tokens, names) norm_tree = rename_vars_ast(start_tree, names) add_edit(path, tuple(start_normal), tuple(end_normal), norm_tree) return edits, submissions, queries def get_edits_from_solutions(solutions, test): # For each observed edit, store a list of features (with repeats) of ASTs # where they were observed. submissions = collections.Counter() queries = collections.Counter() edits = collections.defaultdict(list) for solution in solutions: trace = solution.trace uid = solution.codeq_user_id trace_edits, trace_submissions, trace_queries = get_edits_from_trace(trace, test, uid) # Update edits. for edit, features in trace_edits.items(): edits[edit].extend(features) # Update submission/query counters (use normalized variables). for code, correct in trace_submissions: code = stringify(rename_vars(tokenize(code))) submissions[(code, correct)] += 1 for query in trace_queries: code = stringify(rename_vars(tokenize(query))) queries[code] += 1 # Discard edits that only occur in one trace. singletons = [edit for edit in edits if len(edits[edit]) < 2] for edit in singletons: del edits[edit] n_start = collections.Counter() n_start_all = 0 for (path, a, b), features in edits.items(): edits[(path, a, b)] = len(features) n_start[(path, a)] += len(features) n_start_all += len(features) # Find the probability of each edit a → b. new_edits = {} for (path, a, b), count in edits.items(): if a != b: p = count / n_start[(path, a)] new_edits[(path, a, b)] = p edits = new_edits # Tweak the edit distribution to improve search. avg_p = avg(edits.values()) for edit, p in edits.items(): edits[edit] = logistic(p, k=3, x_0=avg_p) return edits, submissions, queries def classify_edits(edits): inserts = {} removes = {} changes = {} for (before, after), cost in edits.items(): if after and not before: inserts[after] = cost elif before and not after: removes[before] = cost else: changes[(before, after)] = cost return inserts, removes, changes # Extract edits and other data from existing traces for each problem. if __name__ == '__main__': import pickle from db.models import CodeqUser, Problem, Solution import server.problems from termcolor import colored # Ignore traces from these users. ignored_users = [ 1, # admin 231, # test 360, # test2 358, # sasha ] edits, submissions, queries = {}, {}, {} for problem in Problem.list(): pid = problem.id solutions = [s for s in Solution.filter(problem_id=pid, done=True) if s.codeq_user_id not in ignored_users] if not solutions: print('No traces for {}'.format(problem.identifier)) continue # Testing function. solved_problems = [p for p in CodeqUser.solved_problems(1, problem.language) if p != (problem.group, problem.identifier)] other_solutions = server.problems.solutions_for_problems(problem.language, solved_problems) problem_module = server.problems.load_problem(problem.language, problem.group, problem.identifier, 'common') def test(code): correct, hints = problem_module.test(code, solved_problems) return correct print('Analyzing traces for {}… '.format(problem.identifier), end='', flush=True) print('{} traces… '.format(len(solutions)), end='', flush=True) try: edits[pid], submissions[pid], queries[pid] = get_edits_from_solutions(solutions, test) print('{} edits, {} submissions, {} queries'.format( len(edits[pid]), len(submissions[pid]), len(queries[pid]))) except Exception as ex: import traceback traceback.print_exc() pickle.dump((edits, submissions, queries), open('edits.pickle', 'wb'))