123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- import sqlite3
- import polars as pl
- import interface_sqlite3.encdec.de_code as dc
- from interface_sqlite3.actual_queries import prepareQuery
- # First version
- class queryHandlerBasicSqlite:
- def __init__(self, dataConfig):
- try:
- dbPath = dataConfig['dbPath']
- dbfileDefault = dataConfig.get('dbfile_default')
- except:
- raise Exception('Missing required input in Data Provider Configuration')
- self.dbPath = dbPath
- self.dbfileDefault = dbfileDefault
- # Encoding
- self.dbEncoded = True if dataConfig.get("db_encoded") is True else False
- self.textsEncoded = True if dataConfig.get("texts_encoded") is True else False
- self.keyRing = None
- if self.dbEncoded or self.textsEncoded:
- keyPath = self.dbPath + 'keys/'
- self.keyRing = dc.keyRing(keyPath, self.dbEncoded, self.textsEncoded)
- def query(self, queryData, polars=False, dbFile=None):
- # PREPARE THE QUERY
- # Formerly, a query string was pre-generated outside and
- # sent directly
- # Now the method processes a query data OBJECT
- # and creates the query (which may be complex)
- # accordingly
- if self.dbEncoded:
- queryData = self.encodeQuery(queryData)
- queryToExecute = prepareQuery(queryData)
- # Get the connection to the DB
- dbFileLocal = dbFile if dbFile is not None else self.dbfileDefault
- if dbFileLocal is None:
- raise Exception("No db file specified with no default given -- can't execute query")
- db = self.dbPath + dbFileLocal
- connection = sqlite3.connect(f"file:{db}?mode=ro", uri=True)
- # If the query is a simple string, execute it here:
- if type(queryToExecute) == str:
- if polars:
- results = pl.read_sql(queryToExecute, connection)
- if self.dbEncoded:
- results = self.db_results_decode_polars(results)
- else:
- connection.row_factory = dict_factory
- queryReponse = connection.cursor().execute(queryToExecute)
- results = queryReponse.fetchall()
- if self.dbEncoded:
- results = self.db_results_decode(results)
- else:
- # If not a string, 'queryToExecute' should be a method/function reference
- results = queryToExecute(connection, queryData)
- if self.dbEncoded:
- results = self.db_results_decode_polars(results)
- connection.close()
- return results
-
- def textQuery(self, queryData, getFormatting=False):
- try:
- sigla = queryData['sigla']
- minChar = queryData['minChar']
- maxChar = queryData['maxChar']
- except:
- return None
- with open(f"{self.dbPath}/itxt/{sigla}", 'r', encoding="utf-32-le") as file1:
- file1.seek(4*minChar)
- cont = file1.read(maxChar-minChar)
- if self.textsEncoded and self.keyRing.textKeys.get(sigla) is not None:
- key = self.keyRing.textKeys.get(sigla)
- cont = dc.decodeTextByKey(cont, key, minChar-1)
- if not getFormatting:
- return cont
- else:
- return cont, self.getTextFormatting(sigla, minChar, maxChar)
- def getTextFormatting(self, sigla, minChar, maxChar):
- with open(f"{self.dbPath}/ftxt/{sigla}", 'rb') as file1:
- file1.seek(minChar-1)
- formatCodes = [char for char in file1.read(maxChar-minChar)]
- return formatCodes
-
- def encodeQuery(self, queryData):
- type = queryData.get('queryType')
- if type in ["forma", "lemma", "formaLemma", "lemmaForma"]:
- try:
- data = queryData['data']
- dataNorm = queryData['dataNorm']
- data = [dc.db_encode(self.keyRing.vettDictEnc, datum) for datum in data]
- dataNorm = [dc.db_encode(self.keyRing.vettDictEnc, datum) for datum in dataNorm]
- queryData['data'] = data
- queryData['dataNorm'] = dataNorm
- except KeyError as err:
- raise KeyError('Missing required data for query type ' + type + ': ' + str(err))
-
- return queryData
- def db_results_decode(self, result):
- for row in result:
- for key, value in row.items():
- if isColumnToDecode(key):
- row[key] = dc.db_decode(self.keyRing.vettDictDec, value)
- return result
- def db_results_decode_polars(self, df):
- for col in df.columns:
- if isColumnToDecode(col):
- df = df.with_column(pl.col(col).apply(lambda el: dc.db_decode(self.keyRing.vettDictDec, el)))
- return df
- # Utilities
- # Dict factory non-Pandas queries
- def dict_factory(cursor, row):
- fields = [column[0] for column in cursor.description]
- return {key: value for key, value in zip(fields, row)}
- # Does the column data (in returned results) need decoding?
- def isColumnToDecode(col):
- columns = ['forma', 'lemma', 'cat_gr', 'disambiguatore']
- if col in columns or col.startswith('highlight'):
- return True
- return False
|