# IMPORT ESSENZIALI
# Per il parsing dell'XML -- questo pacchetto è incluso anche nel più generale lxml
import xml.etree.ElementTree as ET
# Utilities per leggere/scrivere files csv
import csv
# Utilities per gestire i character encodings
import unicodedata
# Dizionari ordinati
from collections import OrderedDict
# IMPORT OPZIONALI
# Per fare un stima della velocità delle varie istruzioni
from datetime import datetime
# Generatore di numeri casuali -- può sempre servire in fase di testing
from random import *
# Può servire per alcuni test
import sys
# La funzione BASE: traceElems
def traceElems(node: ET.Element, condition, parents: list = [], coords: list = []):
res = []
jj = 0
for child in node:
if condition(child):
res.append({'a_par': parents+[node],
'coords': coords+[jj], 'child': child})
else:
res = res + traceElems(child, condition, parents+[node], coords+[jj])
jj = jj+1
return res
# Funzione-base per stoppare traceElems
def isLeafOrC(aa: ET.Element):
if(aa.tag=='c' or len(aa)==0):
return True
else:
return False
# Funzioni-utilità che servono solo a visualizzare meglio i dati sul notebook.
def shownode(node: ET.Element):
return (node.tag, node.attrib, node.text.replace('\t','').replace('n','').strip() \
if type(node.text) is str else '')
def shownodelist(el: ET.Element):
return list(map(shownode, el))
# Utility copiata da INTERNEZZ -- versione 'multipla' del metodo str.index:
def indices(lst, element):
result = []
offset = -1
while True:
try:
offset = lst.index(element, offset+1)
except ValueError:
return result
result.append(offset)
import_dir = '/Users/federicaspinelli/Google Drive/OVI:CNR/LAVORO 2020/SELEZIONE CONTENUTI/01_ASPO/XDAMS/'
export_dir = '/Users/federicaspinelli/Google Drive/OVI:CNR/CSV/ASPO/marcovaldi/'
ts1 = datetime.timestamp(datetime.now())
treeDatini = ET.parse(import_dir + 'export_aspoMV001--marcovaldi.xml')
rootDatini = treeDatini.getroot()
ts2 = datetime.timestamp(datetime.now())
print(ts2 - ts1)
0.05707383155822754
cLevs = set(map(lambda a : a.attrib['level'], rootDatini.iter('c')))
print(cLevs)
{'item', 'fonds'}
ts1 = datetime.timestamp(datetime.now())
allCs = {}
for label in cLevs:
def tempFilt(aa: ET.Element):
if(aa.tag=='c' and aa.attrib['level']==label):
return True
else:
return False
allCs[label] = traceElems(rootDatini, tempFilt);
print('# di tag "c", livello ' + label + ', primo passaggio:', len(allCs[label]))
print()
print('Tempo trascorso:', datetime.timestamp(datetime.now()) - ts1)
# di tag "c", livello item, primo passaggio: 827 # di tag "c", livello fonds, primo passaggio: 1 Tempo trascorso: 0.004173994064331055
ts1 = datetime.timestamp(datetime.now())
allCs2 = {}
for label in cLevs:
partial = allCs[label]
print('# di tag "c", livello ' + label + ', primo passaggio:', len(partial))
allCs2[label] = partial
partialUpdate = []
while True:
def tempFilt(aa: ET.Element):
if(aa.tag=='c' and aa.attrib['level']==label):
return True
else:
return False
for node in partial:
partialUpdate = partialUpdate + traceElems(node['child'], tempFilt)
#print(len(partialUpdate))
partial = partialUpdate
if(len(partialUpdate)==0):
break
allCs2[label] = allCs2[label] + partial
partialUpdate = []
print('# di tag "c", livello ' + label + ', totali:', len(allCs2[label]))
print()
print('Tempo trascorso:', datetime.timestamp(datetime.now()) - ts1)
# di tag "c", livello item, primo passaggio: 827 # di tag "c", livello item, totali: 827 # di tag "c", livello fonds, primo passaggio: 1 # di tag "c", livello fonds, totali: 1 Tempo trascorso: 0.040377140045166016
def traduttoreItem(elem):
# Variabile che contiene l'output della traduzione:
csvProt = {}
# Processo i nodi-parent di 'elem'
par_tags = list(map(lambda a: a.tag, elem['a_par']))
par_attributes = list(map(lambda a: a.attrib, elem['a_par']))
# e0: Le varie id dei nodi parent
for ii in indices(par_tags, 'c'):
key = 'id_' + par_attributes[ii]['level']
csvProt[key] = par_attributes[ii]['id']
# Processo i nodi-child di 'elem'
toProc = traceElems(elem['child'], isLeafOrC)
first = True
for node in toProc:
tags = list(map(lambda a: a.tag, node['a_par'])) + [node['child'].tag]
attributes = list(map(lambda a: a.attrib, node['a_par'])) + [node['child'].attrib]
content = node['child'].text
# Da controllare solo per il primo nodo
# (informazioni a livello del nodo, uguali per tutti i figli)
if(first):
# e1 ID della item
csvProt['id'] = attributes[tags.index('c')]['id']
# e2 Audience: external o internal
try:
csvProt['audience'] = attributes[tags.index('c')]['audience']
except:
pass
# e3 Otherlevel
try:
csvProt['altro_livello'] = attributes[tags.index('c')]['otherlevel']
except:
pass
first = False
# La 'ciccia': si processa il contenuto vero e proprio
# e4 Repository (qui dovrebbe essere sempre l'Archivio di Prato)
if('repository' in tags):
csvProt['repository'] = content
# e8 Tipologia
try:
ii = tags.index('materialspec')
if(attributes[ii]['label']=='tipologia'):
csvProt['tipologia'] = content
except:
pass
# e9 Segnature buste e registri
try:
ii = tags.index('container')
type1 = attributes[ii]['type']
if(type1=='busta'):
csvProt['segnatura_busta'] = content
elif(type1=='inserto'):
csvProt['segnatura_inserto'] = content
except:
pass
# e9 Segnatura codice (solo Marcovaldi ha segnatura in unitid)
try:
ii = tags.index('unitid')
type1 = attributes[ii]['type']
if(type1.find('chiave')>=0):
csvProt['segnatura_codice'] = content
except:
pass
# e11: Il titolo da unittitle
try:
aa = csvProt['titolo_aspo']
except:
try:
ii = tags.index('unittitle')
try:
csvProt['titolo_aspo'] = str(node['a_par'][ii].text).replace('\t','').replace('\n','').strip()
except:
csvProt['titolo_aspo'] = str(content).replace('\t','').replace('\n','').strip()
except:
pass
# e12 Scope-content head & body
elif('scopecontent' in tags):
if('head' in tags):
csvProt['scope-content_head'] = content
else:
if('p' in tags):
csvProt['scope-content_body'] = content
# e14 Nome della compagnia
try:
ii = tags.index('corpname')
if(attributes[ii]['role']=='compagnia'):
try:
authId = attributes[ii]['authfilenumber']
csvProt['compagnia'] = '{nome: ' + content + ', authID: ' + authId + '}'
except:
csvProt['compagnia'] = '{nome: ' + content + '}'
except:
pass
# e16 Persona + ruolo
try:
ii = tags.index('persname')
key = 'persona_' + attributes[ii]['role']
try:
authId = attributes[ii]['authfilenumber']
csvProt[key] = '{"nome": ' + "\"" + content + "\"" + ', "authID": ' + "\"" + authId + "\"" + '}'
except:
csvProt[key] = '{"nome": ' + "\"" + content + "\"" + '}'
except:
pass
# e17 Date varie: tutte quelle con 'type' definito
try:
ii = tags.index('date')
key = 'data_' + attributes[ii]['type']
csvProt[key] = content
except:
pass
# e18 Data 1: periodo
if('unitdate' in tags):
csvProt['data_periodo'] = content
# e19 Luogo + 'ruolo'
try:
ii = tags.index('geogname')
key = 'luogo_' + attributes[ii]['role']
try:
authId = attributes[ii]['authfilenumber']
csvProt[key] = '{luogo: ' + content + ', authID: ' + authId + '}'
except:
csvProt[key] = '{luogo: ' + content + '}'
except:
pass
# e20 Supporto fisico
try:
ii = tags.index('physfacet')
if(attributes[ii]['type']=='supporto'):
csvProt['supporto'] = content
except:
pass
# e21 Physdesc
if('extent' in tags):
csvProt['numero'] = content
if('genreform' in tags):
csvProt['genere'] = content
# e23 Consistenza
try:
ii = tags.index('extent')
type1 = attributes[ii]['unit']
cvsProt['consistenza'] = type1 + ': ' + content
except:
pass
# e24 Note
if('note' in tags):
csvProt['nota'] = content
# e26 Oggetto digitale allegato (nome)
# Questo è un campo multiplo; per il momento, salviamo tutto
# su una cella concatenando e separando i campi con una pipe
# '| '
try:
ii = tags.index('daoloc')
out = attributes[ii]['title']
try:
csvProt['oggetto_digitale'] = csvProt['oggetto_digitale'] + ' | ' + out
except:
csvProt['oggetto_digitale'] = out
except:
pass
return csvProt
# Di pari passo alla funzione, definisco un dict contenente tutti gli header;
# servirà per il CSV.
itemHeader = OrderedDict()
# e1 ID dell'entità
itemHeader.update({'id': '<c level="X" id=#>'})
# e2 Audience: external o internal
itemHeader.update({'audience': '<c level="item" audience=#>'})
# bioghist
itemHeader.update({'bioghist': '<bioghist=#>'})
# arrangement
itemHeader.update({'arrangement': '<arrangement=#>'})
# relatedmaterial
itemHeader.update({'relatedmaterial': '<relatedmaterial=#>'})
# e3 Scope content, head & body
itemHeader.update(
{'scope-content_head': '<scopecontent><head>#',
'scope-content_body': '<scopecontent><p>#'})
# e4 Titolo
itemHeader.update({'titolo_aspo': '<unittitle>#'})
# e5 Nome della compagnia
itemHeader.update({'compagnia': '<corpname>#'})
# e6 Soggetto
itemHeader.update({'soggetto': '<subject>#'})
# e8 Date varie: tutte quelle con 'type' definito
itemHeader.update(
{'data_inizio': '<date type="inizio">#',
'data_fine': '<date type="fine">#',
'data_chiusura': '<date type="chiusura">#'})
# e7 Data 1: periodo
itemHeader.update({'data_periodo': '<unitdate>#'})
# e9 Origine
itemHeader.update({'origine': '<origination label=#1>#2, #1 - #2'})
# e10 Tipologia
itemHeader.update({'tipologia': '<materialspec label="tipologia">#'})
# e11 Persona + ruolo
itemHeader.update(
{'persona_tenutario': '<persname role="tenutario">#',
'persona_destinatario': '<persname role="destinatario">#',
'persona_mittente': '<persname role="mittente">#',
'persona_indirizzata': '<persname role="indirizzata">#',
'persona_mano': '<persname role="mano">#',})
# e12 Segnature buste e registri Datini
itemHeader.update(
{'segnatura_registri_1': '<container type="%numero un%">#',
'segnatura_registri_2': '<container type="%numero sott%">#',
'segnatura_inserto': '<container type="inserto">#',
'segnatura_busta': '<container type="busta">#'})
# e17 Segnatura codice
itemHeader.update({'segnatura_codice': '<unitid type="chiave">#'})
# e13 Luogo + 'ruolo'
itemHeader.update(
{"luogo_partenza": '<geogname role="partenza">#',
"luogo_arrivo": '<geogname role="arrivo">#',
"luogo_luogo": '<geogname role="luogo">#'})
# e14 Supporto fisico
itemHeader.update({'supporto': '<physfacet type="supporto">#'})
itemHeader.update({'numero': '<extent>#'})
itemHeader.update({'genere': '<genreform>#'})
# e15 Repository (qui dovrebbe essere sempre l'Archivio di Prato)
itemHeader.update({'repository': '<repository>#'})
# Note
itemHeader.update({'nota': '<note>#'})
# Odd
itemHeader.update({'altre_informazioni': '<odd>#'})
# e16 Consistenza
itemHeader.update({'consistenza': '<extent unit=#1>#2, #1: #2'})
# e18 Oggetto digitale allegato (nome)
itemHeader.update({'oggetto_digitale': '<daoloc title=#>'})
# e19 Otherlevel
itemHeader.update({'altro_livello': '<c otherlevel=#>'})
#0: Le varie id dei nodi parent
itemHeader.update(
{'id_subfonds': '<c level="subfonds" id=#>',
'id_fonds': '<c level="fonds" id=#>',
'id_series': '<c level="series" id=#>',
'id_subseries': '<c level="subseries" id=#>',
'id_file': '<c level="file" id=#>',
'id_otherlevel': '<c level="otherlevel" id=#>',
'id_collection': '<c level="collection" id=#>'})
test = allCs2['item'][1]
toShow = traduttoreItem(test)
for key in toShow.keys():
print(key + ': ' + str(toShow[key]))
print()
id_fonds: IT-ASPO-MV001-0000827 id: IT-ASPO-MV001-0000001 audience: external tipologia: carteggio repository: Archivio di Stato di Prato segnatura_codice: 2 titolo_aspo: ANGIOLINI GINO DI FALCONIERI�a�ANGIOLINI LOTTO DI FALCONIERI PRETE DI GALCIANA (SER) persona_mittente: {"nome": "ANGIOLINI GINO DI FALCONIERI", "authID": "IT-ASPO-AU00003-0005595"} persona_destinatario: {"nome": "ANGIOLINI LOTTO DI FALCONIERI PRETE DI GALCIANA (SER)", "authID": "IT-ASPO-AU00003-0005596"} luogo_partenza: {luogo: AREZZO, authID: IT-ASPO-GEO0001-0000496} luogo_arrivo: {luogo: PRATO, authID: IT-ASPO-GEO0001-0000532} data_inizio: 24/09/**** data_periodo: 24/09/**** numero: 1 segnatura_busta: 7028 nota: data al banco di Leo di ser Iacopo in Prato oggetto_digitale: MV000000201.jpg | MV000000202.jpg
# Do it! Export del CSV - items.
ts1 = datetime.timestamp(datetime.now())
# Apro il file per l'export
with open(export_dir + "data_item.csv", "w", newline="") as csv_file:
# Definisco la classe-motore per l'export
writer = csv.DictWriter(csv_file, fieldnames=list(itemHeader.keys()))
# Scrivo l'intestazione
writer.writeheader()
# Scrivo la seconda riga, esplicativa
writer.writerow(itemHeader)
# Scrivo gli item tradotti, uno a uno
for ii in range(len(allCs2['item'])):
test = allCs2['item'][ii]
writer.writerow(traduttoreItem(test))
print('Tempo trascorso:', datetime.timestamp(datetime.now()) - ts1)
Tempo trascorso: 0.22597312927246094
ts1 = datetime.timestamp(datetime.now())
fondsKeys = set()
for ii in range(len(allCs2['fonds'])):
test = allCs2['fonds'][ii]
fondsKeys = fondsKeys.union( traduttoreItem(test).keys() )
fondsHeader = OrderedDict()
for key in itemHeader:
if(key in fondsKeys):
fondsHeader[key] = itemHeader[key]
with open(export_dir + "data_fonds.csv", "w", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=list(fondsHeader.keys()))
writer.writeheader()
writer.writerow(fondsHeader)
for ii in range(len(allCs2['fonds'])):
test = allCs2['fonds'][ii]
writer.writerow(traduttoreItem(test))
print('Tempo trascorso:', datetime.timestamp(datetime.now()) - ts1)
Tempo trascorso: 0.05441093444824219