import sqlite3 import pandas as pd import interface_sqlite3.encdec.de_code as dc from interface_sqlite3.actual_queries import prepareQuery # First version class queryHandlerBasicSqlite: def __init__(self, dataConfig): try: dbPath = dataConfig['dbPath'] dbfileDefault = dataConfig.get('dbfile_default') except: raise Exception('Missing required input in Data Provider Configuration') self.dbPath = dbPath self.dbfileDefault = dbfileDefault # Encoding self.dbEncoded = True if dataConfig.get("db_encoded") is True else False self.textsEncoded = True if dataConfig.get("texts_encoded") is True else False self.keyRing = None if self.dbEncoded or self.textsEncoded: keyPath = self.dbPath + 'keys/' self.keyRing = dc.keyRing(keyPath, self.dbEncoded, self.textsEncoded) def query(self, queryData, pandas=False, dbFile=None): # PREPARE THE QUERY # Formerly, a query string was pre-generated outside and # sent directly # Now the method processes a query data OBJECT # and creates the query (which may be complex) # accordingly queryData['pandas'] = pandas # Mostly redundant if self.dbEncoded: queryData = self.encodeQuery(queryData) queryToExecute = prepareQuery(queryData) # Get the connection to the DB dbFileLocal = dbFile if dbFile is not None else self.dbfileDefault if dbFileLocal is None: raise Exception("No db file specified with no default given -- can't execute query") # db = self.dbPath + dbFileLocal connection = sqlite3.connect(f"file:{db}?mode=ro", uri=True) # If the query is a simple string, execute it here: if type(queryToExecute)==str: if pandas: results = pd.read_sql(queryToExecute, connection) if(self.dbEncoded): results = self.db_results_decode_pandas(results) else: connection.row_factory = dict_factory queryReponse = connection.cursor().execute(queryToExecute) results = queryReponse.fetchall() if(self.dbEncoded): results = self.db_results_decode(results) else: # If not a string, 'queryToExecute' should be a method/function reference results = queryToExecute(connection, queryData) if self.dbEncoded: if pandas: results = self.db_results_decode_pandas(results) else: results = self.db_results_decode(results) connection.close() return results def textQuery(self, queryData, getFormatting=False): try: sigla = queryData['sigla'] minChar = queryData['minChar'] maxChar = queryData['maxChar'] except: return None with open(f"{self.dbPath}/itxt/{sigla}", 'r', encoding="utf-32-le") as file1: file1.seek(4*minChar) cont = file1.read(maxChar-minChar) if self.textsEncoded and self.keyRing.textKeys.get(sigla) is not None: key = self.keyRing.textKeys.get(sigla) cont = dc.decodeTextByKey(cont, key, minChar-1) if not getFormatting: return cont else: return cont, self.getTextFormatting(sigla, minChar, maxChar) def getTextFormatting(self, sigla, minChar, maxChar): with open(f"{self.dbPath}/ftxt/{sigla}", 'rb') as file1: file1.seek(minChar-1) formatCodes = [char for char in file1.read(maxChar-minChar)] return formatCodes def encodeQuery(self, queryData): type = queryData.get('queryType') if type in ["forma", "lemma", "formaLemma", "lemmaForma"]: try: data = queryData['data'] dataNorm = queryData['dataNorm'] data = [dc.db_encode(self.keyRing.vettDictEnc, datum) for datum in data] dataNorm = [dc.db_encode(self.keyRing.vettDictEnc, datum) for datum in dataNorm] queryData['data'] = data queryData['dataNorm'] = dataNorm except KeyError as err: raise KeyError('Missing required data for query type ' + type + ': ' + str(err)) return queryData def db_results_decode(self, result): for row in result: for key, value in row.items(): if isColumnToDecode(key): row[key] = dc.db_decode(self.keyRing.vettDictDec, value) return result def db_results_decode_pandas(self, df): for col in df.columns: if isColumnToDecode(col): df[col] = df[col].apply( lambda el: dc.db_decode(self.keyRing.vettDictDec, el) ) return df # Utilities # Dict factory non-Pandas queries def dict_factory(cursor, row): fields = [column[0] for column in cursor.description] return {key: value for key, value in zip(fields, row)} # Does the column data (in returned results) need decoding? def isColumnToDecode(col): columns = ['forma', 'lemma', 'cat_gr', 'disambiguatore'] if col in columns or col.startswith('highlight'): return True return False