EAD_to_CSV_marcovaldi.ipynb 26 KB

Inizio radunando tutti gli **IMPORT** necessari per girare il notebook, per chiarezza.
# IMPORT ESSENZIALI

# Per il parsing dell'XML -- questo pacchetto è incluso anche nel più generale lxml
import xml.etree.ElementTree as ET
# Utilities per leggere/scrivere files csv
import csv
# Utilities per gestire i character encodings
import unicodedata
# Dizionari ordinati
from collections import OrderedDict

# IMPORT OPZIONALI

# Per fare un stima della velocità delle varie istruzioni
from datetime import datetime
# Generatore di numeri casuali -- può sempre servire in fase di testing
from random import *
# Può servire per alcuni test
import sys
# FUNZIONI **ElementTree** ha una funzione built-in, **iter**, che scorre (molto velocemente) su tutti i 'nodi' dell'albero di dati che rappresenta l'XML. La funzione *iter* purtroppo però non traccia i nodi 'parents'. Ho esteso quindi la libreria scrivendo una mia versione di *iter*, **'traceElems'**, che dovrebbe riuscire a fornirci tutto quello di cui abbiamo bisogno. *traceElems* traccia tutti i nodi nell'albero tenendo conto dei 'parents', e restituisce tutti quelli per cui la funzione-argomento 'condition' ritorna True. **NON** indaga i nodi **figli** di quelli che sono restituiti.
# La funzione BASE: traceElems
def traceElems(node: ET.Element, condition, parents: list = [], coords: list = []):
    res = []
    jj = 0
    for child in node:
        if condition(child):
            res.append({'a_par': parents+[node],
                        'coords': coords+[jj], 'child': child})
        else:
            res = res + traceElems(child, condition, parents+[node], coords+[jj])
        jj = jj+1   
    return res

# Funzione-base per stoppare traceElems
def isLeafOrC(aa: ET.Element):
    if(aa.tag=='c' or len(aa)==0):
        return True
    else:
        return False
# Funzioni-utilità che servono solo a visualizzare meglio i dati sul notebook.
def shownode(node: ET.Element):
    return (node.tag, node.attrib, node.text.replace('\t','').replace('n','').strip() \
                               if type(node.text) is str else '')

def shownodelist(el: ET.Element):
    return list(map(shownode, el))


# Utility copiata da INTERNEZZ -- versione 'multipla' del metodo str.index:
def indices(lst, element):
    result = []
    offset = -1
    while True:
        try:
            offset = lst.index(element, offset+1)
        except ValueError:
            return result
        result.append(offset)
# AL LAVORO
**DA CAMBIARE A SECONDA DEL COMPUTER**: directory di input e output
import_dir = '/Users/federicaspinelli/Google Drive/OVI:CNR/LAVORO 2020/SELEZIONE CONTENUTI/01_ASPO/XDAMS/'
export_dir = '/Users/federicaspinelli/Google Drive/OVI:CNR/CSV/ASPO/marcovaldi/'
Importo il file XML del Datini, tracciando il tempo necessario
ts1 = datetime.timestamp(datetime.now())

treeDatini = ET.parse(import_dir + 'export_aspoMV001--marcovaldi.xml')
rootDatini = treeDatini.getroot()

ts2 = datetime.timestamp(datetime.now())
print(ts2 - ts1)
0.05707383155822754
Uso *iter* per trovare tutti i nodi con label **'c'** nel file Datini, e mi faccio restituire il valore dell'attributo **'level'**; salvo tutti i *levels* nella variabile **cLevs**
cLevs = set(map(lambda a : a.attrib['level'], rootDatini.iter('c')))
print(cLevs)
{'item', 'fonds'}
A questo punto metto al lavoro la funzione **traceElems**: registro TUTTI i nodi **'c'** dividendoli in base all'attributo **'level'**; mi faccio stampare il numero di elementi per ogni livello ed il tempo trascorso. **OCCHIO:** per come è costruita, questa routine non va ad investigare dentro i livelli restituiti -- quindi si perde eventuali sotto-livelli con la stessa label di quelli che trova durante il primo scan. La presenza di sotto-livelli di questo tipo va controllata separatamente.
ts1 = datetime.timestamp(datetime.now())

allCs = {}

for label in cLevs:
    def tempFilt(aa: ET.Element):
        if(aa.tag=='c' and aa.attrib['level']==label):
            return True
        else:
            return False
       
    allCs[label] = traceElems(rootDatini, tempFilt);
    print('# di tag "c", livello ' + label + ', primo passaggio:', len(allCs[label]))
print()
print('Tempo trascorso:', datetime.timestamp(datetime.now()) - ts1)
# di tag "c", livello item, primo passaggio: 827
# di tag "c", livello fonds, primo passaggio: 1

Tempo trascorso: 0.004173994064331055
Notare che l'elaborazione è piuttosto veloce (sul mio laptop) malgrado la dimensione del file. Rimane il problema dei livelli dentro a livelli omonimi. Vediamo di affrontarlo.
ts1 = datetime.timestamp(datetime.now())

allCs2 = {}

for label in cLevs:
    partial = allCs[label]
    print('# di tag "c", livello ' + label + ', primo passaggio:', len(partial))
    allCs2[label] = partial
    partialUpdate = []
    while True:
        def tempFilt(aa: ET.Element):
            if(aa.tag=='c' and aa.attrib['level']==label):
                 return True
            else:
                 return False
        for node in partial:
            partialUpdate = partialUpdate + traceElems(node['child'], tempFilt)
        #print(len(partialUpdate))
        partial = partialUpdate
        if(len(partialUpdate)==0):
            break
        allCs2[label] = allCs2[label] + partial
        partialUpdate = []

    print('# di tag "c", livello ' + label + ', totali:', len(allCs2[label]))

print()
print('Tempo trascorso:', datetime.timestamp(datetime.now()) - ts1)
# di tag "c", livello item, primo passaggio: 827
# di tag "c", livello item, totali: 827
# di tag "c", livello fonds, primo passaggio: 1
# di tag "c", livello fonds, totali: 1

Tempo trascorso: 0.040377140045166016
A questo punto diventa facile visualizzare tutti i dettagli dei vari elementi **'c'**, di qualunque livello; un esempio è fornito nella prossima cella. Si può cambiare l'elemento da visualizzare cambiando il valore delle variabili *ii* e *level*
def traduttoreItem(elem):
    # Variabile che contiene l'output della traduzione:
    csvProt = {}

    # Processo i nodi-parent di 'elem'
    par_tags = list(map(lambda a: a.tag, elem['a_par']))
    par_attributes = list(map(lambda a: a.attrib, elem['a_par']))
    # e0: Le varie id dei nodi parent
    for ii in indices(par_tags, 'c'):
        key = 'id_' + par_attributes[ii]['level']
        csvProt[key] = par_attributes[ii]['id']
    
    # Processo i nodi-child di 'elem'
    toProc = traceElems(elem['child'], isLeafOrC)
    first = True
    for node in toProc:
        tags = list(map(lambda a: a.tag, node['a_par'])) + [node['child'].tag]
        attributes = list(map(lambda a: a.attrib, node['a_par'])) + [node['child'].attrib]
        content = node['child'].text

        # Da controllare solo per il primo nodo
        # (informazioni a livello del nodo, uguali per tutti i figli)
        if(first):
            # e1 ID della item
            csvProt['id'] = attributes[tags.index('c')]['id']
            # e2 Audience: external o internal
            try:
                csvProt['audience'] = attributes[tags.index('c')]['audience']
            except:
                pass
            # e3 Otherlevel
            try:
                csvProt['altro_livello'] = attributes[tags.index('c')]['otherlevel']
            except:
                pass
            first = False

        # La 'ciccia': si processa il contenuto vero e proprio
        # e4 Repository (qui dovrebbe essere sempre l'Archivio di Prato)
        if('repository' in tags):
            csvProt['repository'] = content
        
        # e8 Tipologia
        try:
            ii = tags.index('materialspec')
            if(attributes[ii]['label']=='tipologia'): 
                csvProt['tipologia'] = content
        except:
            pass
        
        # e9 Segnature buste e registri
        try:
            ii = tags.index('container')
            type1 = attributes[ii]['type']
            if(type1=='busta'):
                csvProt['segnatura_busta'] = content
            elif(type1=='inserto'):
                csvProt['segnatura_inserto'] = content
        except:
            pass

        # e9 Segnatura codice (solo Marcovaldi ha segnatura in unitid)
        try:
            ii = tags.index('unitid')
            type1 = attributes[ii]['type']
            if(type1.find('chiave')>=0):
               csvProt['segnatura_codice'] = content
        except:
            pass

        # e11: Il titolo da unittitle
        try:
            aa = csvProt['titolo_aspo']
        except:
            try:
                ii = tags.index('unittitle')
                try:
                    csvProt['titolo_aspo'] = str(node['a_par'][ii].text).replace('\t','').replace('\n','').strip()
                except:
                    csvProt['titolo_aspo'] = str(content).replace('\t','').replace('\n','').strip()
            except:
                pass
        
        # e12 Scope-content head & body
        elif('scopecontent' in tags):
            if('head' in tags):
                csvProt['scope-content_head'] = content
            else:
                if('p' in tags):
                    csvProt['scope-content_body'] = content
        
        # e14 Nome della compagnia
        try:
            ii = tags.index('corpname')
            if(attributes[ii]['role']=='compagnia'):
                try:
                    authId = attributes[ii]['authfilenumber']
                    csvProt['compagnia'] = '{nome: ' + content + ', authID: ' + authId + '}'
                except:
                    csvProt['compagnia'] = '{nome: ' + content + '}'
        except:
            pass
       
       # e16 Persona + ruolo
        try:
            ii = tags.index('persname')
            key = 'persona_' + attributes[ii]['role']
            try:
                authId = attributes[ii]['authfilenumber']
                csvProt[key] = '{"nome": ' + "\"" + content + "\"" + ', "authID": ' + "\"" + authId + "\"" + '}'
            except:
                csvProt[key] = '{"nome": ' + "\"" + content + "\"" + '}'
        except:
            pass
        
        # e17 Date varie: tutte quelle con 'type' definito
        try:
            ii = tags.index('date')
            key = 'data_' + attributes[ii]['type']
            csvProt[key] = content
        except:
            pass

        # e18 Data 1: periodo
        if('unitdate' in tags):
            csvProt['data_periodo'] = content        

        # e19 Luogo + 'ruolo'
        try:
            ii = tags.index('geogname')
            key = 'luogo_' + attributes[ii]['role']
            try:
                authId = attributes[ii]['authfilenumber']
                csvProt[key] = '{luogo: ' + content + ', authID: ' + authId + '}'
            except:
                csvProt[key] = '{luogo: ' + content + '}'
        except:
            pass
        
        # e20 Supporto fisico
        try:
            ii = tags.index('physfacet')
            if(attributes[ii]['type']=='supporto'):
                csvProt['supporto'] = content
        except:
            pass

        # e21 Physdesc 
        if('extent' in tags):
            csvProt['numero'] = content
        if('genreform' in tags):
            csvProt['genere'] = content  

        # e23 Consistenza
        try:
            ii = tags.index('extent')
            type1 = attributes[ii]['unit']
            cvsProt['consistenza'] = type1 + ': ' + content
        except:
            pass
        
        # e24 Note
        if('note' in tags):
            csvProt['nota'] = content
   
        # e26 Oggetto digitale allegato (nome)
        # Questo è un campo multiplo; per il momento, salviamo tutto
        # su una cella concatenando e separando i campi con una pipe
        # '| '
        try:
            ii = tags.index('daoloc')
            out = attributes[ii]['title']
            try:
                csvProt['oggetto_digitale'] = csvProt['oggetto_digitale'] + ' | ' + out
            except:
                csvProt['oggetto_digitale'] = out
        except:
            pass  
    
    return csvProt


# Di pari passo alla funzione, definisco un dict contenente tutti gli header;
# servirà per il CSV.
itemHeader = OrderedDict()

# e1 ID dell'entità
itemHeader.update({'id': '<c level="X" id=#>'})

# e2 Audience: external o internal
itemHeader.update({'audience': '<c level="item" audience=#>'})

# bioghist
itemHeader.update({'bioghist': '<bioghist=#>'})

# arrangement
itemHeader.update({'arrangement': '<arrangement=#>'})

# relatedmaterial
itemHeader.update({'relatedmaterial': '<relatedmaterial=#>'})

# e3 Scope content, head & body
itemHeader.update(
{'scope-content_head': '<scopecontent><head>#',
 'scope-content_body': '<scopecontent><p>#'})

# e4 Titolo
itemHeader.update({'titolo_aspo': '<unittitle>#'})

# e5 Nome della compagnia
itemHeader.update({'compagnia': '<corpname>#'})

# e6 Soggetto
itemHeader.update({'soggetto': '<subject>#'})

# e8 Date varie: tutte quelle con 'type' definito
itemHeader.update(
{'data_inizio': '<date type="inizio">#',
 'data_fine': '<date type="fine">#',
 'data_chiusura': '<date type="chiusura">#'})

# e7 Data 1: periodo
itemHeader.update({'data_periodo': '<unitdate>#'})

# e9 Origine
itemHeader.update({'origine': '<origination label=#1>#2, #1 - #2'})

# e10 Tipologia
itemHeader.update({'tipologia': '<materialspec label="tipologia">#'})

# e11 Persona + ruolo
itemHeader.update(
{'persona_tenutario': '<persname role="tenutario">#', 
 'persona_destinatario': '<persname role="destinatario">#',
 'persona_mittente': '<persname role="mittente">#',
 'persona_indirizzata': '<persname role="indirizzata">#',
 'persona_mano': '<persname role="mano">#',})

# e12 Segnature buste e registri Datini
itemHeader.update(
{'segnatura_registri_1': '<container type="%numero un%">#',
 'segnatura_registri_2': '<container type="%numero sott%">#',
 'segnatura_inserto': '<container type="inserto">#',
 'segnatura_busta': '<container type="busta">#'})

# e17 Segnatura codice
itemHeader.update({'segnatura_codice': '<unitid type="chiave">#'})

# e13 Luogo + 'ruolo'
itemHeader.update(
{"luogo_partenza": '<geogname role="partenza">#',
 "luogo_arrivo": '<geogname role="arrivo">#',
 "luogo_luogo": '<geogname role="luogo">#'})

# e14 Supporto fisico
itemHeader.update({'supporto': '<physfacet type="supporto">#'})

itemHeader.update({'numero': '<extent>#'})

itemHeader.update({'genere': '<genreform>#'})

# e15 Repository (qui dovrebbe essere sempre l'Archivio di Prato)
itemHeader.update({'repository': '<repository>#'})

# Note
itemHeader.update({'nota': '<note>#'})

# Odd
itemHeader.update({'altre_informazioni': '<odd>#'})

# e16 Consistenza
itemHeader.update({'consistenza': '<extent unit=#1>#2, #1: #2'})

# e18 Oggetto digitale allegato (nome)
itemHeader.update({'oggetto_digitale': '<daoloc title=#>'})

# e19 Otherlevel
itemHeader.update({'altro_livello': '<c otherlevel=#>'})

#0: Le varie id dei nodi parent
itemHeader.update(
{'id_subfonds': '<c level="subfonds" id=#>',
 'id_fonds': '<c level="fonds" id=#>',
 'id_series': '<c level="series" id=#>',
 'id_subseries': '<c level="subseries" id=#>',
 'id_file': '<c level="file" id=#>',
 'id_otherlevel': '<c level="otherlevel" id=#>',
 'id_collection': '<c level="collection" id=#>'})
Test della funzione traduttore **NB:** l'ho definita basandomi sugli item, ma sembra funzionare decentemente anche sugli altri livelli!
test = allCs2['item'][1]
toShow = traduttoreItem(test)
for key in toShow.keys():
    print(key + ': ' + str(toShow[key]))
    print()
id_fonds: IT-ASPO-MV001-0000827

id: IT-ASPO-MV001-0000001

audience: external

tipologia: carteggio

repository: Archivio di Stato di Prato

segnatura_codice: 2

titolo_aspo: ANGIOLINI GINO DI FALCONIERI�a�ANGIOLINI LOTTO DI FALCONIERI PRETE DI GALCIANA (SER)

persona_mittente: {"nome": "ANGIOLINI GINO DI FALCONIERI", "authID": "IT-ASPO-AU00003-0005595"}

persona_destinatario: {"nome": "ANGIOLINI LOTTO DI FALCONIERI PRETE DI GALCIANA (SER)", "authID": "IT-ASPO-AU00003-0005596"}

luogo_partenza: {luogo: AREZZO, authID: IT-ASPO-GEO0001-0000496}

luogo_arrivo: {luogo: PRATO, authID: IT-ASPO-GEO0001-0000532}

data_inizio: 24/09/****

data_periodo: 24/09/****

numero: 1

segnatura_busta: 7028

nota: data al banco di Leo di ser Iacopo in Prato

oggetto_digitale: MV000000201.jpg | MV000000202.jpg

# Export Produciamo il CSV per gli item tracciando, al solito, il tempo impiegato.
# Do it! Export del CSV - items.

ts1 = datetime.timestamp(datetime.now())

# Apro il file per l'export
with open(export_dir + "data_item.csv", "w", newline="") as csv_file:
    # Definisco la classe-motore per l'export
    writer = csv.DictWriter(csv_file, fieldnames=list(itemHeader.keys()))
    # Scrivo l'intestazione
    writer.writeheader()
    # Scrivo la seconda riga, esplicativa
    writer.writerow(itemHeader)
    # Scrivo gli item tradotti, uno a uno
    for ii in range(len(allCs2['item'])):
        test = allCs2['item'][ii]
        writer.writerow(traduttoreItem(test))

print('Tempo trascorso:', datetime.timestamp(datetime.now()) - ts1)
Tempo trascorso: 0.22597312927246094
# Altri livelli
Definisco un dizionario ridotto per l'header delle *subseries*, poi esporto -- per il momento con lo stesso traduttore usato per gli *item*
*Rinse & Repeat* con i livelli *series*, *subfonds* e *fonds*
ts1 = datetime.timestamp(datetime.now())

fondsKeys = set()
for ii in range(len(allCs2['fonds'])):
    test = allCs2['fonds'][ii]
    fondsKeys = fondsKeys.union( traduttoreItem(test).keys() )

fondsHeader = OrderedDict()
for key in itemHeader:
    if(key in fondsKeys):
        fondsHeader[key] = itemHeader[key]


with open(export_dir + "data_fonds.csv", "w", newline="") as csv_file:
    writer = csv.DictWriter(csv_file, fieldnames=list(fondsHeader.keys()))
    writer.writeheader()
    writer.writerow(fondsHeader)
    for ii in range(len(allCs2['fonds'])):
        test = allCs2['fonds'][ii]
        writer.writerow(traduttoreItem(test))

print('Tempo trascorso:', datetime.timestamp(datetime.now()) - ts1)
Tempo trascorso: 0.05441093444824219