ETL für Bastler: Importieren von JSON-Files mit Python 3 und Jupyter Notebook in eine mySQL Datenbank

Moderne BI Selfservice Lösungen wie Tableau, Qlik oder Power BI bieten die Möglichkeit, JSON-Files direkt als Datenquelle anzubinden. Als Alternative dazu beschreibt die folgende Anleitung alle nötigen Schritte, um einen (sehr vereinfachten) ETL-Prozess mithilfe von Python 3 innerhalb von Jupyter Notebook zu realisieren (zu «macgyvern»). Dabei sollen die Daten einer OpenData Schnittstelle gelesen, transformiert und in eine (lokale) mySQL Datenbank gespeichert werden.

Benötigte Vorkenntnisse

  • Grundkenntnisse in Python 3
  • Verständnis von SQL
  • HTML und PHP Kenntnisse (optional)

Technische Voraussetzungen

Für Python und Jupyter Notebook wird die auf jupyter.org empfohlene Anaconda Distribution vorausgesetzt. (Zusätzlich muss die mySQL Connector Library installiert werden, falls diese nicht schon verfügbar ist).

Der im folgenden beschriebene Fall wurde in einer lokalen XAMPP Standard-Installation mit mySQL (MariaDB) und Apache Webserver auf macOS Mojave (MacBook Air Mid 2012) erfolgreich ausgeführt.

Ausgangslage

Die Schweizerischen Bundesbahnen SBB stellen auf data.sbb.ch, opentransportdata.swiss und opendata.swiss eine Auswahl von Datensätzen zu ihrem Bahnbetrieb zur Verfügung. Täglich werden u.a. Daten zur Pünktlichkeit der Bahnfahrten des Vortages im JSON und GEOJSON Format veröffentlicht.

Statt diese Quellen direkt in einer App anzubinden, sollen die Daten zum Zweck einer späteren Analyse über einen längeren Zeitraum täglich abgerufen und lokal gespeichert werden. Die folgenden Ausführungen beschreiben die Weiterverarbeitung einer dieser GEOJSON-Dateien.

Schritt für Schritt

Anmerkung: Der hier beschriebene Ansatz soll nicht als best practice verstanden werden, vielmehr als pragmatische, zielorientierte Lösung, die mit freundlicher Unterstützung von google.com, stackoverflow.com u.a. zustande gekommen ist. Womöglich existieren für einzelne Schritte geeignetere Libraries oder Konzepte. Ebenso wurden Lesbarkeit und Skalierung des Scripts der Vorrang gegenüber Eleganz, Effektivität und Qualität des Codes gegeben.

1. Laden des JSON-Files

Zu Beginn werden die benötigten Libraries für Verbindung und JSON-Interpretation importiert. Anschliessend wird die Anfrage abgeschickt, die Antwort der Quelle in einer Variable («rawData») gespeichert und zur Kontrolle in Jupyter ausgegeben. Da es sich bei der Quelle um ein GEOJSON Objekt handelt, wird direkt die Property «features» angesprochen.
(Nicht nur bei anders aufgebauten JSON Files setzt dies selbstverständlich eine Kenntnis der Datei-Struktur voraus, um den gewünschten Einstiegs-Punkt zu finden).
Um den Abbruch des Scripts durch Python aufgrund einer zu langen Ausgabe im Browser zu verhindern, werden die Daten mit [:3] auf die ersten drei Records begrenzt. (Alternativ bietet sich hier die Pandas Library an; siehe Appendix II).

import requests
import json
response = requests.get("gespeicherte_lokale_datei.geojson")
rawData = json.loads(response.text)
print(rawData['features'][:3])

2. Definieren der Attribute

Um eine iterative Verarbeitung der Datensätze zu ermöglichen und später (falls nötig) einzelne Parameter transformieren zu können, wird eine Liste der gewünschten Attribute angelegt. Aus Gründen der Übersichtlichkeit beschränkt sich dieses Beispiel auf die Eigenschaften «haltestellen_name», «betriebstag», «abfahrtszeit» und «geopos» («geometry»).

Um das Debugging zu erleichtern werden Variablen für die Anzahl maximal zu verarbeitenden Datensätze und für einen counter angelegt.

Zudem wird eine Liste «data» definiert, in welcher die transformierten Datensätze zwischengespeichert werden und die später als Grundlage zur Speicherung in der Datenbank dient.

data = []
counter = 0
max = 10
attributes = [
    "haltestellen_name",
    "betriebstag",
    "abfahrtszeit",
    "geopos"
]

3. Übernahme der Daten und Transformation einzelner Werte

In einer for-Schleife werden nun alle Datensätze (bzw. so viele wie «max» zulässt) abgearbeitet. Die Werte der vorher definierten Attribute werden (wenn vorhanden) übernommen und falls nötig, neu formatiert oder mittels entsprechender Logik in die benötigte Form gebracht.

Jeder einzelne Record erhält hierzu eine lokale Variable «loc» mit Typ Dictionary, die nach Vervollständigung in die Liste «data» übertragen wird.

Zusätzlich dazu werden hier aus der hierarchisch eine Stufe höher liegenden Property «geometry» die Geokoordinaten als «geopos» hinzugefügt.

for item in rawData["features"]:
    if(counter<max):
        #print(item)
        loc = {}           
        p = item["properties"]
        for attr in attributes:
            if(attr in p):
                loc[attr]=p[attr]
            else:
                loc[attr]="null"
        if("geometry" in item):
            if("coordinates" in item["geometry"]):
                loc["geopos"] = item["geometry"]["coordinates"]   
        #print(loc)
        #if(counter==0):
            #print("Attributes: "+ str(len(loc)))
        data.append(loc)
        counter+=1
#print(data)

4. Verbindung mit der Datenbank und Vorbereitung der Speicherung

Mithilfe der mySQL Connector Library wird die Kommunikation mit der Datenbank ermöglicht. Um SQL-Operationen ausführen zu können, benötigen wir ausserdem eine Instanz «mycursor» der Klasse «MySQLCursor» .

import mysql.connector
db = mysql.connector.connect(
    host="host_name",
    user="user_name",
    passwd="password",
    database="datenbank_name"
)
mycursor = db.cursor()

5. Vorbereitung des SQL-Statements

An dieser Stelle werden für alle Attribute und Werte leere Strings angelegt, die anschliessend zum eigentlichen SQL-Statement zusammengesetzt werden. (Da es hier noch zu Anpassungen in den Attributen kommen kann, wird ein «Klon» (Deep-Copy) der vorher definierten Attribute erstellt, die unabhängig von den oben verwendeten Eigenschaften beliebig erweitert werden kann, beispielsweise um die Records mit dem Import-Zeitpunkt als zusätzliche Eigenschaft anzureichern).

Der String «sql_attributes string» umfasst danach alle nötigen Spaltennamen während «sql_values_string» sämtliche dazu passenden Werte pro Eintrag enthält.

import copy
import datetime
from time import gmtime, strftime

table_name = "zieltabelle" 

# Bei Bedarf kann die bereits bestehende Tabelle hier geleert werden:
#sql_truncate = "TRUNCATE TABLE "+table_name+"; "
#mycursor.execute(sql_truncate)
#db.commit()

sql_attributes = copy.deepcopy(attributes)
sql_attributes.append("import_zeitpunkt")
sql_attributes_string = ", ".join(sql_attributes)
#print(sql_attributes)
#print(sql_attributes_string)

for item in data:
    sql_values = []
    sql_values_string = ""
    for attribute in sql_attributes:
        if(attribute in item):
            value = str(item[str(attribute)]).strip()
            sql_values.append('"'+str(value)+'"')
  
    importDateTime = strftime("%Y-%m-%d %H:%M:%S")
    sql_values.append('"'+str(importDateTime)+'"') 
    sql_values_string = ", ".join(sql_values)
    #print(sql_values)
    #print(sql_values_string)

6. Speicherung in der Datenbank

Achtung: Wir befinden uns noch immer in der äusseren for-Schleife auf Ebene «item»!
Nun wird aus den beiden Variablen «sql_attributes_string» und «sql_values_string» das eigentliche SQL-Statement zusammengesetzt und ausgeführt. (Der Statement-String wurde zur besseren Leserlichkeit mit Klammern auf mehrere Zeilen verteilt).

An dieser Stelle sei daran erinnert, dass der Umweg über die Listen «sql_values» und «sql_attributes» gemacht wurde, um den Code übersichtlicher und flexibler zu gestalten und gegebenenfalls später weitere Attribute einfach hinzufügen zu können.

    sql_insert = (
        "INSERT INTO "+table_name+" ("+sql_attributes_string+") "
        "VALUES ("+sql_values_string+");"
        )
    mycursor.execute(sql_insert)
    db.commit()
print("Viel Erfolg beim Ausprobieren!")

Falls alles geklappt hat, sind jetzt 10 Datensätze in der Datenbank gespeichert. Nun kann die Tabelle geleert und die Variable «max» auf einen sicher ausreichenden maximalen Wert vergrössert werden, um bei erneuter Ausführung alle Datensätze aus der Quelldatei einzulesen.

Falls Jupyter dabei unerwartete Fehler ausgibt, sind eventuell zusätzliche (Typen- und Werte-) Prüfungen auf den einzelnen Records nötig, deren Beschreibung jedoch den Umfang dieses Beitrages sprengen würde.

Appendix I: CREATE TABLE in Jupyter

Anstelle von phpMyAdmin oder ähnlichen Anwendungen kann die Zieltabelle in der Datenbank mit folgendem Script innerhalb Jupyter Notebook angelegt werden:

import mysql.connector
db = mysql.connector.connect(
    host="host_name",
    user="user_name",
    passwd="password",
    database="datenbank_name"
)
mycursor = db.cursor()
table_name = "zieltabelle"
sql_delete = "DROP TABLE IF EXISTS "+table_name
mycursor.execute(sql_delete)
db.commit()
sql_create = (
    "CREATE TABLE IF NOT EXISTS "+table_name+" "
    "(id INT NOT NULL AUTO_INCREMENT, PRIMARY KEY (id), " 
    "UNIQUE INDEX id_UNIQUE (id ASC));"
)
mycursor.execute(sql_create)
sql_alter = (
    "ALTER TABLE  "+table_name+" "
        "ADD COLUMN haltestellen_name VARCHAR (128) NOT NULL DEFAULT 'null', "
        "ADD COLUMN geopos VARCHAR (128) NOT NULL DEFAULT 'null', "
        "ADD COLUMN betriebstag DATE  NOT NULL DEFAULT '1000-01-01', "
        "ADD COLUMN abfahrtszeit DATETIME  NOT NULL DEFAULT '1000-01-01 00:00:00', "
        "ADD COLUMN import_zeitpunkt DATETIME  NOT NULL DEFAULT '1000-01-01 00:00:00'; "
)
mycursor.execute(sql_alter)
db.commit()
print("done")

Appendix II: Python Pandas

Obwohl die vielseitige Library Python Pandas in diesem Beispiel einzig dazu verwendet wird, die befüllte Tabelle lesbarer direkt in Jupyter Notebook anzuzeigen, sind diese Tierchen zum Abschluss auf jeden Fall eine Erwähnung in ihrer Rolle als mächtige Helfer zur Datenanalyse wert 🐼.

import mysql.connector
import pandas as pd

db = mysql.connector.connect(
    host="host_name",
    user="user_name",
    passwd="password",
    database="datenbank_name "
)
mycursor = db.cursor()   
table_name = "zieltabelle"
sql_select = "SELECT * FROM "+table_name+"; "
mycursor.execute(sql_select)
rows = mycursor.fetchall()
df = pd.DataFrame(rows, columns=mycursor.column_names)
print(df.head(5))

 

Gutes Gelingen!

Beitrag teilen

Raphael Röthlin

Raphael Röthlin ist Infografiker bei der Ringier AG und bloggt aus dem Unterricht des CAS Business Intelligence und Analytics.

Alle Beiträge ansehen von Raphael Röthlin →

Schreibe einen Kommentar