# %% import csv import json import openpyxl as op import re # BASIC CONFIGURATION DATA_FOLDER = './data/' OUTPUT_FOLDER = './output/' ONTO_FILENAME = 'manoscritti_dariah' # No extension! ent_filename = ONTO_FILENAME + '_entities.csv' rel_filename = ONTO_FILENAME + '_relations.csv' # PART I: parse xlsx to (multiple) csv # Excel configuration XLSX_FILENAME = 'Struttura_NEW.xlsx' ENTITIES_SHEETNAME = 'Entità' RELATIONS_SHEETNAME = 'Relazioni' # %% # Import the defining xlsx through openpyxl input_data = op.load_workbook(DATA_FOLDER + XLSX_FILENAME) # Read relevant sheets entities_sheet = input_data[ENTITIES_SHEETNAME] relations_sheet = input_data[RELATIONS_SHEETNAME] # Parse sheet data into a dict (assuming the xlsx has headers) # Get non-empty headers for entities sheet entities_keys = [cell for cell in next(entities_sheet.values) if cell] # Get entity rows as list of dicts raw_entities = [{key: row[ind] for ind, key in enumerate(entities_keys)} for row in entities_sheet.values][1:] # Get non-empty headers for relations sheet relations_keys = [cell for cell in next(relations_sheet.values) if cell] # Get relation rows as list of dicts raw_relations = [{key: row[ind] for ind, key in enumerate(relations_keys)} for row in relations_sheet.values][1:] # %% # NOTE: # a. Non ci sono, al momento, _constraints_ di unicità per le relazioni che siano imposti tramite il "foglio master". # b. Non ci sono neanche constraint di esistenza, TRANNE l'id univoca, intesa NON come quella del sistema ma quella della comunità di riferimento, e tipica del settore di dominio considerato; nel caso specifico, la SEGNATURA. # c. Per identificare le informazioni 'atomiche' non si usa un campo dedicato, ma una logica. AL MOMENTO la logica è che si considera atomica una entità che non è mai 'prima' in una relazione. L'ORDINE DELLE RELAZIONI E' IMPORTANTE a differenza di quanto assumevo inizialmente. 'Atomiche' è probabilmente un _misnomer_, dato che può trattarsi di informazioni composite, come ad esempio una data. Si tratta più precisamente (forse) di ATTRIBUTI, nel senso di informazioni "figlie" che hanno significato solo in associazione all'Entità _parent_ -- proprio come nel caso delle date. # d. Si effettua un controllo di unicità sulle entità, basato sul nome normalizzato (parole trattate con: sostituzione del _whitespace_ contiguo con spazio singolo ' ', title() e strip()). Nessuna entità può avere nome vuoto. Eventuali nomi duplicati vengono segnalati per un controllo manuale. # e. Si effettua un controllo di unicità sulle relazioni, che riguarda tutta la terna SOGGETTO-RELAZIONE-OGGETTO (normalizzata in modo simile ai nomi di entità, ma nella RELAZIONE gli spazi sono underscores e si usa lower() invece che title()). Nessuno dei membri della terna può essere vuoto; il nome della relazione inversa invece è opzionale e non viene controllato. # f. Si effettuano controlli di consistenza sulle relazioni: # .f1. Nessuna relazione con entità non definite come SOGGETTO. # .f2. Warning sulle entità "orfane", ovvero non presenti in alcuna relazione. # g. Una relazione può avere un OGGETTO non definito nel foglio Entità. E' sottointeso, in questi casi, che si tratta di un'informazione 'atomica' / un attributo. # # TODO: completare secondo le specifiche sopra -> Fatto al 90% # TODO: effettuare il merge con i "miei" CSV, che hanno informazioni in più! --> Fatto, solo da riverificare # TODO: ottimizzare un po' la scrittura del codice -> Non prioritario # Process entities: def normalize_ent_name(name: str) -> str: return re.sub(r'\s+', ' ', name.strip().title()) # 1. Filter out unnamed entities, normalize entity names, collect aliases, discover duplicates clean_entities = {} for ent in raw_entities: entity_names = ent['Concetto'] if not isinstance(entity_names, str): continue aliases = [normalize_ent_name(al) for al in re.split(r'\r\n|\r|\n', entity_names) if al.strip()] if not aliases: continue entity_name = aliases[0] entity_same_as = aliases[1:] if clean_entities.get(entity_name): # DUPLICATE! clean_entities[entity_name].append({'Alias': entity_same_as, 'Raw': ent}) else: clean_entities[entity_name] = [{'Alias': entity_same_as, 'Raw': ent}] # TEMP - filter out a few of the entities manually clean_entities.pop(normalize_ent_name('Circolazione Del Manoscritto'), None) clean_entities.pop(normalize_ent_name('Luogo Di Conservazione'), None) clean_entities.pop(normalize_ent_name('Possessore'), None) clean_entities.pop(normalize_ent_name('Volume'), None) all_entities = clean_entities.keys() duplicated_entities = [ent_name for ent_name, ent_val in clean_entities.items() if len(ent_val)>1] if duplicated_entities: print('WARNING! DUPLICATED ENTITIES:') for ent in duplicated_entities: print(ent) # %% # Process relations: # 1. Filter ill-formed relations and normalize entity names def normalize_rel_name(name: str) -> str: return re.sub(r'\s+', '_', name.strip().lower()).replace('__', '_') clean_relations = [] for rel in raw_relations: subj = rel['Soggetto'] obj = rel['Oggetto'] if not isinstance(subj, str) or not isinstance(obj, str): continue subj = re.sub(r'\s+', ' ', subj.strip().title()) obj = re.sub(r'\s+', ' ', obj.strip().title()) if subj==obj: continue rel_name = rel['Relazione'] if isinstance(rel_name, str): rel_name = normalize_rel_name(rel_name) rel_name_inv = rel.get('Relazione Inversa (opzionale)') if isinstance(rel_name_inv, str): rel_name_inv = normalize_rel_name(rel_name_inv) clean_rel = {'Soggetto': subj, 'Relazione': rel_name, 'Oggetto': obj, 'Relazione Inversa': rel_name_inv} clean_relations.append(clean_rel) # TEMP - filter out a few of the relations manually def manual_remove(rel: dict) -> bool: if rel['Soggetto']==normalize_ent_name('Circolazione del manoscritto'): return True if rel['Soggetto']==normalize_ent_name('Fascicolo') and rel['Oggetto']==normalize_ent_name('Data'): return True # return False clean_relations = [rel for rel in clean_relations if not manual_remove(rel)] # In this case, I don't really care if a relation appears twice # Get all Entities that appear as Subjects all_subjects = set(rel['Soggetto'] for rel in clean_relations) # Get all Entities that appear either as Subjects or Objects all_cited_entities = set(sum([[rel['Soggetto'], rel['Oggetto']] for rel in clean_relations], [])) # Get entities that appear in the relations sheet but NOT in the entities sheet undefined_entities = all_cited_entities - all_entities # Get all entities that appear purely as Subjects; it is (mostly) OK if they are not defined in the Entity sheet. atomic_entities = all_cited_entities - all_subjects # Entities which are not 'atomic' and are undefined are a problem problematic_entities = undefined_entities - atomic_entities # Get entities which do not appear in any relation, and are NOT classified as Subclasses. subclass_entities = {key for key, val in clean_entities.items() if val[0]['Raw']['Sottoclasse di']} unused_entities = (all_entities - all_cited_entities) - subclass_entities # %% #### # MANUS ONLINE (MOL) API: https://api.iccu.sbn.it/devportal/apis #### # %% if problematic_entities: print('ERROR: some non-atomic entities are undefined') print() for ent in sorted(list(problematic_entities)): print(ent) # %% if unused_entities: print('Unused entities:') print() for ent in sorted(list(unused_entities)): print(ent) # %% if undefined_entities: print("Undefined 'atomic' entities:") print() for ent in sorted(list(undefined_entities)): print(ent) # %% # Export results if not problematic_entities and not duplicated_entities: nonatomic_entities = {ent: ent_val for ent, ent_val in clean_entities.items() if ent not in atomic_entities} entities_to_export = [{'ENTITÀ': ent, 'ATTRIBUTO (LITERAL)': None, 'SAME AS': ', '.join(ent_val[0]['Alias'])} for ent, ent_val in nonatomic_entities.items()] # atomic_rels = [rel for rel in clean_relations if rel['Oggetto'] in atomic_entities] entities_to_export += [{'ENTITÀ': rel['Soggetto'], 'ATTRIBUTO (LITERAL)': rel['Oggetto'], 'SAME AS': None} for rel in atomic_rels] entities_to_export = sorted(entities_to_export, key=lambda x: x['ENTITÀ']) with open(OUTPUT_FOLDER + ent_filename, 'w', encoding='utf-8', newline='\n') as ent_csv: headers = ['ENTITÀ', 'ATTRIBUTO (LITERAL)', 'SAME AS'] writer = csv.DictWriter(ent_csv, fieldnames=headers) writer.writeheader() writer.writerows(entities_to_export) relations_to_export = [{'ENTITÀ 1': rel['Oggetto'], 'ENTITÀ 2': rel['Soggetto'], 'NOME RELAZIONE': rel['Relazione'], 'NOME RELAZIONE INVERSA': rel['Relazione Inversa']} for rel in clean_relations] # Add subclass relations relations_to_export += [{'ENTITÀ 1': ent, 'ENTITÀ 2': clean_entities[ent][0]['Raw']['Sottoclasse di'], 'NOME RELAZIONE': 'is_subclass_of', 'NOME RELAZIONE INVERSA': 'is_superclass_of'} for ent in subclass_entities] relations_to_export = sorted(relations_to_export, key=lambda x: (x['ENTITÀ 1'], x['ENTITÀ 2']) ) with open(OUTPUT_FOLDER + rel_filename, 'w', encoding='utf-8', newline='\n') as rel_csv: headers = ['ENTITÀ 1', 'ENTITÀ 2', 'NOME RELAZIONE', 'NOME RELAZIONE INVERSA'] writer = csv.DictWriter(rel_csv, fieldnames=headers) writer.writeheader() writer.writerows(relations_to_export) # %%