# coding=utf-8 import sys import importlib.machinery import threading import os.path from db import get_connection, return_connection import logging _path_prefix = os.environ.get('CODEQ_PROBLEMS') or '/var/local/codeq-problems' _module_loading_lock = threading.RLock() # TODO: make a more fine-grained locking import machinery def load_module(fullname): # return importlib.import_module(fullname) with _module_loading_lock: mod = sys.modules.get(fullname, None) if mod is None: parts = fullname.split('.') d = os.path.join(_path_prefix, *parts[:-1]) ff = importlib.machinery.FileFinder(d, (importlib.machinery.SourceFileLoader, ['.py'])) spec = ff.find_spec(fullname) if spec is None: logging.error('ERROR: there is no problem module {0}'.format(fullname)) return None mod = type(sys)(fullname) mod.__loader__ = spec.loader mod.__package__ = spec.parent mod.__spec__ = spec if spec.has_location: mod.__file__ = spec.origin mod.__cached__ = spec.cached sys.modules[fullname] = mod try: spec.loader.exec_module(mod) except: try: del sys.modules[fullname] except KeyError: pass raise return mod def load_language(language, tail_module): return load_module('{0}.{1}'.format(language, tail_module)) def load_problem(language, problem_group, problem, tail_module): return load_module('{0}.problems.{1}.{2}.{3}'.format(language, problem_group, problem, tail_module)) def load_facts(language, fact_module): return load_module('{0}.facts.{1}'.format(language, fact_module)) def load_problems(language, tuples, tail_module): modules = [] for problem_group, problem in tuples: mod = '{0}.problems.{1}.{2}.{3}'.format(language, problem_group, problem, tail_module) modules.append(load_module(mod)) return modules def get_facts(language, problem_module): try: facts = problem_module.facts except AttributeError as e: return None if facts is None: return None module = load_facts(language, facts) if module: try: return module.facts except AttributeError as e: return None return None def solutions_for_problems(language, tuples): if not tuples: return '' modules = load_problems(language, tuples, 'common') solutions = [] for module in modules: try: solutions.append(module.solution) except AttributeError as me: pass return '\n'.join(solutions) def list_problems(language): conn = get_connection() try: cur = conn.cursor() try: cur.arraysize = 1000 cur.execute('select l.identifier, l.name, g.identifier, g.name, p.identifier, p.name from problem p inner join language l on l.id = p.language_id and l.identifier = \''+language+'\' inner join problem_group g on g.id = p.problem_group_id order by l.identifier, g.identifier, p.identifier') result = [] previous_language = '' previous_group = '' current_sublist = None row = cur.fetchone() while row: current_language = row[0] current_group = row[2] if previous_language != current_language or previous_group != current_group: current_sublist = [] result.append({'identifier': {'language': current_language, 'group': current_group}, 'name': {'language': row[1], 'group': row[3]}, 'problems': current_sublist}) previous_group = current_group previous_language = current_language current_sublist.append({'identifier': row[4], 'name': row[5]}) row = cur.fetchone() return result finally: cur.close() finally: conn.commit() return_connection(conn)