#!/usr/bin/python3 # coding=utf-8 # CodeQ: an online programming tutor. # Copyright (C) 2015 UL FRI # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) any # later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more # details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import sys, json, sqlite3, psycopg2 from utils import filenamefy sqc = None # sqlite3 connection handle pgc = None # postgresql connection handle def log(line): sys.stdout.write('{0}\n'.format(line)) def _no_cleanup(row, data): pass def _no_conversion(row): return row, None def copy_table(src_qry, dst_qry, conversion_fn, cleanup_fn, sequence): conversion_fn = conversion_fn or _no_conversion cleanup_fn = cleanup_fn or _no_cleanup src_cur = sqc.cursor() try: dst_cur = pgc.cursor() try: row = None for row in src_cur.execute(src_qry): new_row, cleanup_data = conversion_fn(row) dst_cur.execute(dst_qry, new_row) cleanup_fn(new_row, cleanup_data) if sequence and row: dst_cur.execute("select setval('{0}', {1})".format(sequence, row[0])) pgc.commit() except: pgc.rollback() raise finally: dst_cur.close() finally: src_cur.close() def convert_auth_user(row): log('Converting auth_user id={0}'.format(row[0])) return (row[0], row[1], row[2], row[3], row[4] != 0, row[5] != 0, row[6], row[7]), None def _handle_trace_insert(trace, i): length = (trace[i+2] << 8) + trace[i+3] js = { 'typ': 'ins', 'off': (trace[i] << 8) + trace[i+1], 'txt': trace[i+4:i+4+length].decode() } return js, i + 4 + length def _handle_trace_remove(trace, i): js = { 'typ': 'rm', 'off': (trace[i] << 8) + trace[i+1], 'len': (trace[i+2] << 8) + trace[i+3] } return js, i + 4 def _handle_trace_solve(trace, i): length = (trace[i] << 8) + trace[i+1] js = { 'typ': 'prolog_solve', 'query': trace[i+2:i+2+length].decode() } return js, i + 2 + length def _handle_trace_solve_all(trace, i): length = (trace[i] << 8) + trace[i+1] js = { 'typ': 'slva', 'qry': trace[i+2:i+2+length].decode() } return js, i + 2 + length def _handle_trace_test(trace, i): js = { 'typ': 'tst', 'tot': trace[i], 'pas': trace[i+1] } return js, i + 2 def _handle_trace_next(trace, i): return {'typ': 'prolog_next'}, i def _handle_trace_stop(trace, i): return {'typ': 'prolog_stop'}, i def _handle_trace_hint(trace, i): return {'typ': 'hnt'}, i _trace_handler = { 1: _handle_trace_insert, # insert 2: _handle_trace_remove, # remove 3: _handle_trace_solve, # solve 4: _handle_trace_solve_all, # solve all 5: _handle_trace_next, # next solution 7: _handle_trace_stop, # stop/end 8: _handle_trace_test, # test 9: _handle_trace_hint # hint } def convert_trace(trace, i): type = trace[i] i += 1 dt = int(((trace[i] << 8) + (trace[i+1])) * 100.0) i += 2 handler = _trace_handler.get(type) if handler: js, i = handler(trace, i) js['dt'] = dt return json.dumps(js), i return None, i def convert_attempt(row): log('Converting tutor_attempt id={0}'.format(row[0])) old_trace = bytes(row[5]) new_trace = [] n = len(old_trace) i = 0 while i < n: trace, i = convert_trace(old_trace, i) if trace: new_trace.append(trace) return (row[0], row[1] != 0, row[2], row[3], row[4], '[' + ','.join(new_trace) + ']'), None def convert_problem_group(row): return (row[0], filenamefy(row[1])), None def convert_problem(row): return (row[0], row[1], filenamefy(row[2])), None if __name__ == '__main__': sqc = sqlite3.connect('db.sqlite3') # pgc = psycopg2.connect(host='localhost', port=5432, database='codeq', user='codeq', password='c0d3q') pgc = psycopg2.connect(host='172.31.200.40', port=5432, database='codeq', user='codeq', password='c0d3q') pgc.autocommit = False try: log('Creating language prolog') cur = pgc.cursor() try: cur.execute("insert into language (identifier) values ('prolog') returning id") _language_prolog_id = cur.fetchone()[0] finally: cur.close() log('Copying auth_group -> user_group') copy_table( 'select id, name from auth_group', 'insert into user_group (id, name) values (%s, %s)', None, None, 'user_group_id_seq') log('Copying auth_user -> codeq_user') copy_table( 'select id, username, password, email, is_superuser, is_active, date_joined, last_login from auth_user order by id', 'insert into codeq_user (id, username, password, email, is_admin, is_active, date_joined, last_login) values (%s, %s, %s, %s, %s, %s, %s, %s)', convert_auth_user, None, 'codeq_user_id_seq') log ('Copying auth_user_groups -> user_in_group') copy_table( 'select user_id, group_id from auth_user_groups', 'insert into user_in_group (codeq_user_id, user_group_id) values (%s, %s)', None, None, None) log('Copying tutor_group -> problem_group') copy_table( 'select id, name from tutor_group', 'insert into problem_group (id, identifier) values (%s, %s)', convert_problem_group, None, 'problem_group_id_seq') log('Copying tutor_problem -> problem') copy_table( 'select id, group_id, name, visible from tutor_problem', 'insert into problem (id, language_id, problem_group_id, identifier) values (%s, ' + str(_language_prolog_id) + ', %s, %s)', convert_problem, None, 'problem_id_seq') log('Copying tutor_attempt -> solution') copy_table( 'select id, done, content, problem_id, user_id, trace from tutor_attempt order by id', 'insert into solution (id, done, content, problem_id, codeq_user_id, trace) values (%s, %s, %s, %s, %s, %s)', convert_attempt, None, 'solution_id_seq') finally: sqc.close() pgc.close()