# CodeQ: an online programming tutor. # Copyright (C) 2015 UL FRI # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) any # later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more # details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import collections from . import get_connection, return_connection __all__ = ['CodeqUser', 'Solution'] class CodeqUser(collections.namedtuple('CodeqUser', ['id', 'username', 'password', 'name', 'email', 'is_admin', 'is_active', 'date_joined', 'last_login', 'gui_lang', 'robot_address', 'saml_data', 'gui_layout'])): __sql_prefix = 'select id, username, password, name, email, is_admin, is_active, date_joined, last_login, gui_lang, robot_address, saml_data, gui_layout from codeq_user' @staticmethod def get(**kwargs): return _general_get(kwargs, CodeqUser, CodeqUser.__sql_prefix) @staticmethod def list(): return _general_list(CodeqUser, CodeqUser.__sql_prefix) @staticmethod def filter(**kwargs): return _general_filter(kwargs, CodeqUser, CodeqUser.__sql_prefix) @staticmethod def solved_problems(user_id, language): return _run_sql('select g.identifier, p.identifier from solution s inner join problem p on p.id = s.problem_id inner join problem_group g on g.id = p.problem_group_id inner join language l on l.id = p.language_id where s.codeq_user_id = %s and l.identifier = %s and s.done = True', (user_id, language), fetch_one=False) class Problem(collections.namedtuple('Problem', ['id', 'language', 'group', 'identifier'])): __sql_prefix = '''\ select p.id, l.identifier, g.identifier, p.identifier from problem p inner join problem_group g on g.id = p.problem_group_id inner join language l on l.id = p.language_id''' __sql_order = 'p.language_id, p.problem_group_id, p.id' @staticmethod def get(**kwargs): kwargs = {'p.'+k: v for k, v in kwargs.items()} return _general_get(kwargs, Problem, Problem.__sql_prefix) @staticmethod def list(): return _general_list(Problem, Problem.__sql_prefix, order=Problem.__sql_order) @staticmethod def filter(**kwargs): kwargs = {'p.'+k: v for k, v in kwargs.items()} return _general_filter(kwargs, Problem, Problem.__sql_prefix, order=Problem.__sql_order) # get a list of problems with the given language identifier @staticmethod def filter_language(language): kwargs = {'l.identifier': language} return _general_filter(kwargs, Problem, Problem.__sql_prefix, order=Problem.__sql_order) # known as Attempt in the original code class Solution(collections.namedtuple('Solution', ['id', 'done', 'content', 'problem_id', 'codeq_user_id', 'trace'])): __sql_prefix = 'select id, done, content, problem_id, codeq_user_id, trace from solution' @staticmethod def get(**kwargs): return _general_get(kwargs, Solution, Solution.__sql_prefix) @staticmethod def list(): return _general_list(Solution, Solution.__sql_prefix) @staticmethod def filter(**kwargs): return _general_filter(kwargs, Solution, Solution.__sql_prefix) def _no_row_conversion(row): return row def _general_get(kwargs_dict, clazz, sql_select, row_conversion_fn=_no_row_conversion): conditions = [] parameters = [] for field_name, field_value in kwargs_dict.items(): conditions.append(field_name + ' = %s') parameters.append(field_value) if len(conditions) == 0: return None conn = get_connection() try: cur = conn.cursor('crsr1') # a named cursor: scrolling is done on the server cur.arraysize = 1 # scroll unit in the number of rows try: cur.execute(sql_select + ' where ' + ' and '.join(conditions), parameters) row = cur.fetchone() if row: return clazz(*row_conversion_fn(row)) return None finally: cur.close() finally: conn.commit() return_connection(conn) def _general_filter(kwargs_dict, clazz, sql_select, row_conversion_fn=_no_row_conversion, order='id'): conditions = [] parameters = [] for field_name, field_value in kwargs_dict.items(): conditions.append(field_name + ' = %s') parameters.append(field_value) if len(conditions) == 0: return _general_list(clazz, sql_select) conn = get_connection() try: cur = conn.cursor('crsr2') # a named cursor: scrolling is done on the server cur.arraysize = 10000 # scroll unit in the number of rows try: cur.execute(sql_select + ' where ' + ' and '.join(conditions) + ' order by ' + order, parameters) result = [] row = cur.fetchone() while row: result.append(clazz(*row_conversion_fn(row))) row = cur.fetchone() return result finally: cur.close() finally: conn.commit() return_connection(conn) def _general_list(clazz, sql_select, row_conversion_fn=_no_row_conversion, order='id'): conn = get_connection() try: cur = conn.cursor('crsr3') # a named cursor: scrolling is done on the server cur.arraysize = 10000 # scroll unit in the number of rows try: cur.execute(sql_select + ' order by ' + order) result = [] row = cur.fetchone() while row: result.append(clazz(*row_conversion_fn(row))) row = cur.fetchone() return result finally: cur.close() finally: conn.commit() return_connection(conn) def _run_sql(sql, params, fetch_one=False): conn = get_connection() try: cur = conn.cursor() try: cur.execute(sql, params) if fetch_one: return cur.fetchone() else: return cur.fetchall() finally: cur.close() finally: conn.commit() return_connection(conn)