import sqlite3 import pandas as pd import interface_sqlite3.encdec.de_code as dc from engine.data_interface.QueryHandlerAbstract import QueryHandlerAbstract # First version class queryHandlerBasicSqlite(QueryHandlerAbstract): def __init__(self, dataConfig): try: dbPath = dataConfig['dbPath'] dbfileDefault = dataConfig['dbfile_default'] except: raise Exception('Missing required input in Data Provider Configuration') self.dbPath = dbPath self.dbfileDefault = dbfileDefault # Encoding self.dbEncoded = True if dataConfig.get("db_encoded") is True else False self.textsEncoded = True if dataConfig.get("texts_encoded") is True else False self.keyRing = None if self.dbEncoded or self.textsEncoded: keyPath = self.dbPath + 'keys/' self.keyRing = dc.keyRing(keyPath, self.dbEncoded, self.textsEncoded) def query(self, queryData, pandas=False, dbFile=None): # Formerly the query string was pre-generated outside and # sent here _in lieu_ of the query data # Now the method processes a query data OBJECT and creates the query # accordingly if self.dbEncoded: queryData = self.encodeQuery(queryData) queryString = prepareQueryString(queryData) dbfileLocal = dbFile if dbFile is not None else self.dbfileDefault db = self.dbPath + dbfileLocal connection = sqlite3.connect(f"file:{db}?mode=ro", uri=True) # PANDAS? if pandas: results = pd.read_sql(queryString, connection) if(self.dbEncoded): results = self.db_results_decode_pandas(results) else: connection.row_factory = dict_factory queryReponse = connection.cursor().execute(queryString) results = queryReponse.fetchall() if(self.dbEncoded): results = self.db_results_decode(results) connection.close() return results def textQuery(self, queryData): try: sigla = queryData['sigla'] minChar = queryData['minChar'] maxChar = queryData['maxChar'] except: return None with open(f"{self.dbPath}/itxt/{sigla}", 'r', encoding="utf-32-le") as file1: file1.seek(4*minChar) cont = file1.read(maxChar-minChar) if self.textsEncoded and self.keyRing.textKeys.get(sigla) is not None: key = self.keyRing.textKeys.get(sigla) cont = dc.decodeTextByKey(cont, key, minChar-1) return cont def formatQuery(self, queryData): try: sigla = queryData['sigla'] minChar = queryData['minChar'] maxChar = queryData['maxChar'] except: return None with open(f"{self.dbPath}/ftxt/{sigla}", 'r', encoding="utf-32-le") as file1: file1.seek(4*minChar) formbytes = file1.read(maxChar-minChar) form = [byte for byte in formbytes] return form def encodeQuery(self, queryData): type = queryData.get('queryType') if type in ["forma", "lemma", "formaLemma", "lemmaForma"]: try: data = queryData['data'] dataNorm = queryData['dataNorm'] data = [dc.db_encode(self.keyRing.vettSpec, datum) for datum in data] dataNorm = [dc.db_encode(self.keyRing.vettSpec, datum) for datum in dataNorm] queryData['data'] = data queryData['dataNorm'] = dataNorm except KeyError as err: raise KeyError('Missing required data for query type ' + type + ': ' + str(err)) return queryData def db_results_decode(self, result): for row in result: for key, value in row.items(): if isColumnToDecode(key): row[key] = dc.db_decode(self.keyRing.vettSpec, value) return result def db_results_decode_pandas(self, df): for col in df.columns: if isColumnToDecode(col): df[col] = df[col].apply( lambda el: dc.db_decode(self.keyRing.vettSpec, el) ) return df # Utilities def prepareQueryString(queryData): type = queryData.get('queryType') # KeyError protected -- returns None if the key is not defined ################# if type=='occ_tables': return "SELECT name FROM sqlite_master WHERE type='table'" ################# if type=='forma': try: data = queryData['data'] dataNorm = queryData['dataNorm'] except KeyError as err: raise KeyError('Missing required data for query type ' + type + ': ' + str(err)) joinedQueryData = "'" + "' OR spec LIKE '".join(data) + "'" if len(dataNorm)==0: return f"SELECT spec AS forma, nocc AS occ, cod FROM form WHERE spec LIKE {joinedQueryData} ORDER BY idfor" else: joinedQueryDataNorm = "'" + "' OR norm LIKE '".join(dataNorm) + "'" return f"SELECT DISTINCT spec AS forma, nocc AS occ, cod FROM form WHERE (spec LIKE {joinedQueryData}) OR (norm LIKE {joinedQueryDataNorm}) ORDER BY idfor" ################### elif type=='lemma': try: data = queryData['data'] dataNorm = queryData['dataNorm'] except KeyError as err: raise KeyError('Missing required data for query type ' + type + ': ' + str(err)) joinedQueryData = "'" + "' OR spec LIKE '".join(data) + "'" if len(dataNorm)==0: return f"SELECT spec AS lemma, cat AS cat_gr, omo AS disambiguatore, nocc AS occ, cod FROM lem WHERE spec LIKE {joinedQueryData} ORDER BY idlem" else: joinedQueryDataNorm = "'" + "' OR norm LIKE '".join(dataNorm) + "'" return f"SELECT DISTINCT spec AS lemma, cat AS cat_gr, omo AS disambiguatore, nocc AS occ, cod FROM lem WHERE (spec LIKE {joinedQueryData}) OR (norm LIKE {joinedQueryDataNorm}) ORDER BY idlem" ######################## elif type=='lemmaForma': try: data = queryData['data'] dataNorm = queryData['dataNorm'] except KeyError as err: raise KeyError('Missing required data for query type ' + type + ': ' + str(err)) joinedQueryData = "'" + "' OR form.spec LIKE '".join(data) + "'" if len(dataNorm)==0: return f"SELECT lem.spec AS lemma, lem.cat AS cat_gr, form.spec AS forma, lem.omo AS disambiguatore, pfl.nocc AS occ, lem.cod FROM pfl INNER JOIN form ON form.cod = pfl.forma INNER JOIN lem ON lem.cod != 0 AND lem.cod = pfl.lemma WHERE lem.spec LIKE {joinedQueryData} ORDER BY lem.idlem" else: joinedQueryData = "'" + "' OR lem.spec LIKE '".join(data) + "'" joinedQueryDataNorm = "'" + "' OR lem.norm LIKE '".join(dataNorm) + "'" return f"SELECT DISTINCT lem.spec AS lemma, lem.cat AS cat_gr, form.spec AS forma, lem.omo AS disambiguatore, pfl.nocc AS occ, lem.cod FROM pfl INNER JOIN form ON form.cod = pfl.forma INNER JOIN lem ON lem.cod = pfl.lemma WHERE (lem.spec LIKE {joinedQueryData}) OR (lem.norm LIKE {joinedQueryDataNorm}) ORDER BY lem.idlem" ######################## elif type=='formaLemma': try: data = queryData['data'] dataNorm = queryData['dataNorm'] except KeyError as err: raise KeyError('Missing required data for query type ' + type + ': ' + str(err)) joinedQueryData = "'" + "' OR form.spec LIKE '".join(data) + "'" if len(dataNorm)==0: return f"SELECT form.spec AS forma, lem.spec AS lemma, lem.cat AS cat_gr, lem.omo AS disambiguatore, pfl.nocc AS occ, form.cod FROM pfl INNER JOIN form ON form.cod = pfl.forma INNER JOIN lem ON lem.cod = pfl.lemma WHERE form.spec LIKE {joinedQueryData} ORDER BY form.idfor" else: joinedQueryDataNorm = "'" + "' OR form.norm LIKE '".join(dataNorm) + "'" return f"SELECT DISTINCT form.spec AS forma, lem.spec AS lemma, lem.cat AS cat_gr, lem.omo AS disambiguatore, pfl.nocc AS occ, form.cod FROM pfl INNER JOIN form ON form.cod = pfl.forma INNER JOIN lem ON lem.cod = pfl.lemma WHERE (form.spec LIKE {joinedQueryData}) OR (form.norm LIKE {joinedQueryDataNorm}) ORDER BY form.idfor" ################# elif type=='pfl': try: codList = queryData['codList'] except KeyError as err: raise KeyError('Missing required data for query type ' + type + ': ' + str(err)) strlist = ",".join(str(c) for c in codList) return f"SELECT DISTINCT lemma as codLemma, forma as codForma FROM pfl WHERE lemma IN ({strlist})" ################### elif type=='texts': try: codList = queryData['codList'] table = queryData['table'] subtype = queryData['querySubtype'] formCodList = queryData.get('formCodList') # KeyError-safe (None if absent) except KeyError as err: raise KeyError('Missing required data for query type ' + type + ': ' + str(err)) strlist = ",".join(str(c) for c in codList) if subtype==0: return f"SELECT tab.cod, tab.ntx, tab.pitxt, tab.elemlen, tab.mappa, tab.numperiod, tab.links, tab.numorg, intbib.sigla, tab.vol, tab.pag, tab.riga, tab.col, tab.tipostanza, tab.stanza, tab.verso, tab.numbrano, lem.spec AS lemma, lem.cat AS cat_gr, lem.omo AS disambiguatore FROM {table} AS tab INNER JOIN intbib ON tab.ntx = intbib.ntx INNER JOIN lem ON tab.indlem = lem.cod WHERE tab.cod IN ({strlist})" elif subtype==1: return f"SELECT tab.cod, tab.ntx, tab.pitxt, tab.elemlen, tab.mappa, tab.numperiod, tab.links, tab.numorg, intbib.sigla, tab.vol, tab.pag, tab.riga, tab.col, tab.tipostanza, tab.stanza, tab.verso, tab.numbrano, lem.spec AS lemma, lem.cat AS cat_gr, lem.omo AS disambiguatore FROM {table} AS tab INNER JOIN intbib ON tab.ntx = intbib.ntx INNER JOIN lem ON tab.indlem = lem.cod WHERE tab.indlem IN ({strlist})" elif subtype==2: if formCodList is None: return None strform = ",".join(str(c) for c in formCodList) return f"SELECT tab.cod, tab.ntx, tab.pitxt, tab.elemlen, tab.mappa, tab.numperiod, tab.links, tab.numorg, intbib.sigla, tab.vol, tab.pag, tab.riga, tab.col, tab.tipostanza, tab.stanza, tab.verso, tab.numbrano, lem.spec AS lemma, lem.cat AS cat_gr, lem.omo AS disambiguatore FROM {table} AS tab INNER JOIN intbib ON tab.ntx = intbib.ntx INNER JOIN lem ON tab.indlem = lem.cod WHERE tab.indlem IN ({strlist}) OR (tab.indlem = 0 AND tab.cod IN ({strform}))" ###################### elif type=='contexts': try: table = queryData['table'] ntxlocal = queryData['ntxlocal'] mappalocal = queryData['mappalocal'] parole = queryData['parole'] except KeyError as err: raise KeyError('Missing required data for query type ' + type + ': ' + str(err)) return f"SELECT tab.pitxt, tab.elemlen FROM {table} AS tab WHERE tab.ntx = {ntxlocal} AND tab.mappa <= {mappalocal+int(parole/2)} AND tab.mappa >= {mappalocal-int(parole/2)}" ################# elif type=='bib': try: row = queryData['row'] sigla = row['sigla'] except KeyError as err: raise KeyError('Missing required data for query type ' + type + ': ' + str(err)) return f"SELECT [Anno iniziale], [Anno finale], [Data codificata], [Titolo Abbreviato], [Autore], [Titolo], [Curatore], [Data descrittiva], [Area generica], [Area specifica], [Genere], [Forma], [Tipo], IQ FROM datibib WHERE Sigla='{sigla}'" ################# elif type=='rif': try: row = queryData['row'] numorg = row['numorg'] ntx = row['ntx'] except: return None return f"SELECT head AS Rif_organico, full AS Rif_completo FROM org WHERE (indice='{numorg}' AND ntx='{ntx}')" ################# elif type=='highlight': try: row = queryData['row'] col = queryData['col'] except: return None return f"SELECT spec as highlight FROM form WHERE cod={row[col]}" ################# elif type =='singlecontext': try: subtype = queryData['querySubtype'] table = queryData['table'] parole = queryData['parole'] periodi = queryData['periodi'] brani = queryData['brani'] ntxlocal = queryData['ntxlocal'] mappalocal = queryData['mappalocal'] periodlocal = queryData['periodlocal'] numbranolocal = queryData['numbranolocal'] except: return None if subtype == 'parole': return f"SELECT tab.pitxt, tab.elemlen FROM {table} AS tab WHERE tab.ntx = {ntxlocal} AND tab.mappa <= {mappalocal+int(parole/2)} AND tab.mappa >= {mappalocal-int(parole/2)}" elif subtype == 'periodi': return f"SELECT piniz, pfin FROM periodi WHERE ntx = {ntxlocal} AND numperiod <= {periodlocal+int(periodi/2)} AND numperiod >= {periodlocal-int(periodi/2)}" elif subtype == 'brani': return f"SELECT piniz, pfin FROM linkbase WHERE {ntxlocal} = ntx AND tipo = 2 AND id BETWEEN {numbranolocal-int(brani/2)} AND {numbranolocal+int(brani/2)}" ################# elif type =='links': try: subtype = queryData['querySubtype'] ntxlocal = queryData['ntxlocal'] pinizlocal = queryData['pinizlocal'] pfinlocal = queryData['pfinlocal'] pitxtlocal = queryData['pitxtlocal'] except: return None if subtype == 'nota': return f"SELECT ta.ntx, ta.id, ta.piniz, ta.pfin, tb.mappain, tb.mappafin FROM linkbase AS tb INNER JOIN linknoteass AS ta ON tb.ntx = ta.ntx AND tb.id = ta.id WHERE (((tb.tipo= 1) AND (tb.ntx = {ntxlocal})) AND ((tb.piniz BETWEEN {pinizlocal} AND {pfinlocal}) OR ({pitxtlocal} BETWEEN tb.piniz AND tb.pfin)))" if subtype == 'testo_associato': return f"SELECT ta.ntx, ta.id, ta.piniz, ta.pfin, tb.mappain, tb.mappafin FROM linkbase AS tb INNER JOIN linknoteass AS ta ON tb.ntx = ta.ntx AND tb.id = ta.id WHERE (((tb.tipo= 2) AND (tb.ntx = {ntxlocal})) AND ((tb.piniz BETWEEN {pinizlocal} AND {pfinlocal}) OR ({pitxtlocal} BETWEEN tb.piniz AND tb.pfin)))" ##### else: raise ValueError('Unrecognized query type: ' + type) # Dict factory non-Pandas queries def dict_factory(cursor, row): fields = [column[0] for column in cursor.description] return {key: value for key, value in zip(fields, row)} # Does the column data (in returned results) need decoding? def isColumnToDecode(col): columns = ['forma', 'lemma', 'cat_gr', 'disambiguatore'] if col in columns or col.startswith('highlight'): return True return False