Slowly-Changing Dimensions – nach welchen Kategorien erfolgte die Marktsegmentierung im letzten Jahresabschluss und inwieweit hat das neue Segmentierungsschema Einfluss auf die Vergleichbarkeit mit den aktuellen Berichtszahlen? Solche und ähnliche Fragen muss ein Data Warehouse (DWH) jederzeit beantworten können. Im Zeitalter der relationalen Datenbanken hatte sich dafür das Konzept Slowly-Changing-Dimension bewährt. Sind aber die klassischen DWH Best Practices noch ein guter Wegweiser in einer Big Data Umgebung? Wie kann ich effizient historisierte Daten korrigieren und nachladen, auch wenn es sich um Terrabyte an Daten handelt? Wie kann ich dabei die Nachvollziehbarkeit und Reproduzierbarkeit frühere Analyseergebnisse gewährleisten, ohne durch zu viel Overhead die Abfrageperformanz zu beeinträchtigen? Am Beispiel von Google BigQuery möchten wir demonstrieren, wie man ein altes DWH Problem in der neuen cloudbasierten Big Data Architektur besser lösen kann.

In den letzten Jahren migrieren immer mehr Unternehmen ihr DWH auf Cloud-basierte Plattformen, um die drastisch steigenden Anforderungen bezüglich Volume, Geschwindigkeit und Varietät der zu analysierenden Daten auch in Zukunft erfolgreich meistern zu können. Google zum Beispiel bietet mit BigQuery eine hochskalierbare Analytical Data Warehouse Lösung als Managed Service auf der Google Cloud Platform (GCP) an. Ähnliche Angebote sind AWS Redshift und Snowflake.

Die technische Architektur von BigQuery unterscheidet sich dabei grundlegend von der klassischer relationaler Datenbanken (RDBMS, z.B. Oracle), welche bisher die gängigste Plattform für DWH waren. Als DWH Entwickler frage ich mich nun, inwiefern BigQuery DWH-typische Anforderungen hinsichtlich Datenhistorisierung unterstützt und ob z.B. Slowly-Changing Dimension Typ 2 (SCD-Typ-2) auch für BigQuery der optimale Designansatz ist. Datenhistorisierung ist dabei eine sehr wichtige DWH Kernanforderung, um die Ergebnisse früherer Reports und Analysen jederzeit nachvollziehen und auf Basis des damaligen Datenbestands wieder reproduzieren zu können. Deshalb werde ich mich in diesem Artikel v.a. auf diesen Anwendungsfall konzentrieren.

Datenhistorisierung in der relationalen Welt

RDBMS skalieren v.a. vertikal, d.h. mehr Speicherbedarf erfordert irgendwann den zeitaufwendigen Kauf einer größeren und teureren Hardware mit samt einem anschließenden Migrationsprojekt. Daher besteht ein großer Anreiz darin, mittels SCD-Typ-2 Speicherplatz zu sparen, und nur die Datensätze zu versionieren, die sich tatsächlich geändert haben.

In meinen bisherigen DWH-Projekten habe ich bei der Erstellung von ETL-Prozessen, welche einen Full Load durchführen, fast immer die Erfahrung gemacht, dass die Performanz maßgeblich durch die Datenhistorisierung beeinträchtigt wird. Den geringsten Einfluss dabei hat das Hinzufügen neuer Datensätze sowie das Schließen nicht mehr im Quelldatensatz vorhandener Datensätze. In beiden Fällen reicht es aus, die Business Key Spalten zu vergleichen, welche in der Regel indiziert sind. Die “teuerste” Operation ist hingegen festzustellen, welche Datensätze zwar auf beiden Seiten vorhanden sind, aber nicht die gleichen Spaltenwerte haben. Dazu muss ich sämtliche Datenspalten paarweise miteinander vergleichen und bei optionalen Spalten muss ich zusätzlich noch die NOT NULL Prüfung hinzufügen, da NULL == NULL immer zu FALSE ausgewertet wird. Im Endeffekt wird ein Full Table Scan auf beiden Tabellen, Quelle und Ziel, durchgeführt. Bei Tabellen mit hundert Spalten ist dies sehr aufwendig und skaliert schlecht mit steigender Anzahl der Datensätze und das obwohl die zeilenorientierte Datenstruktur von RDBMS-Systemen auf solche Abfragemuster (SELECT über alle Spalten einer Tabelle) hin optimiert ist.

Abbildung 1 – Slowly-Changing Dimensions-Typ-2 bei Full Loads

Letztendlich hat sich in RDBMS Umgebungen der SCD-Typ-2 Designansatz trotzdem häufig als geeigneter Kompromiss zwischen Speicherplatzverbrauch (und deren hohen Kosten) und Performanz der ETL-Prozesse erwiesen. Gilt das aber auch dann, wenn ich ein DWH in BigQuery aufbauen möchte?

Die BigQuery Architektur

BigQuery liegt ein fundamental anderes Architekturkonzept zugrunde. Zunächst einmal sind Rechenkapazität und Speicherplatz völlig unabhängig voneinander und sehr flexibel horizontal skalierbar. Mehr Speicherbedarf für historisierte Daten, auf die meistens nur selten zugegriffen wird, benötigt nicht gleichzeitig mehr CPU-Ressourcen, weshalb ich in so einer Architektur nicht zu dem oben beschriebenen Kompromiss gezwungen bin.

Dieses Skalierungsmuster schlägt sich auch in im BigQuery Preismodell nieder. Für 1 TB Speicherplatz (“active storage”) werden Stand 08/2020 etwa 23 USD pro Monat berechnet (Speicherort Frankfurt). Sofern auf eine Tabellenpartition länger als 90 Tage nicht zugegriffen wird, was bei historischen Daten häufig der Fall ist, reduzieren sich die Speicherkosten (“long-term storage”) sogar auf 16 USD pro Monat. Für die Abfrage und Auswertung (“on-demand queries”) von 1 TB an Daten werden z.Z. 5 USD berechnet, und zwar jedes Mal und nicht pro Monat. Der tägliche Abgleich großer historischer Datenmengen auf mögliche Änderungen hin, kann da schnell ins Geld gehen.

Ein zweiter fundamentaler Architekturunterschied zu RDBMS ist, dass BigQuery die Daten spaltenorientiert abspeichert (Abb. 2), was Abfragen ganzer Tabellenzeilen wie bei der SCD-Typ-2 Delta-Ermittlung noch teurer macht. Bei zeilenorientierter Speicherung kann ich das Glück haben, dass der gesuchte Datensatz in einer Datei bzgl. Im gleichen Block auf der Festplatte liegt. Bei spaltenorientierter Speicherung ist die Anzahl der zu lesenden Dateien mindestes so groß wie die Spaltenzahl und jede dieser Dateien enthält viele Daten, nach denen ich gerade gar nicht suche, da sie sich auf andere Zeilen beziehen. Das erste, was ich mir bei BigQuery also abgewöhnt habe, war ein SELECT * FROM über alle Spalten einer Tabelle durchzuführen, nur um zu schauen, was denn so an Daten darin zu finden sind. Sofern eine Tabelle nicht partitioniert ist oder ich keinen Filter auf die Partitionsspalte setze, würde das unweigerlich einen Full Table Scan auslösen, da die Dateien aller Spalten gelesen werden müssen, selbst wenn ich nur die ersten 100 Zeilen sehen möchte. Zur Erinnerung: abgerechnet wird in BigQuery v.a. nach verarbeiteter, d.h. vom Plattenspeicher gelesener Datenmenge.

Abbildung 2 – BigQuery Speicherstruktur
Abbildung 2 – BigQuery Speicherstruktur

Neben der Eingrenzung auf die jeweils relevanten Spalten in einer SELECT-Abfrage ist die Partitionierung von Tabellen in BigQuery das Mittel der Wahl, um den Aufwand (sowohl den technischen als auch den finanziellen) der Datenverarbeitung zu reduzieren. Da BigQuery keine Indizes und auch keine Primary Keys kennt, ist hier eine adäquate Partitionierung von Tabellen noch wichtiger als bei klassischen RDBMS.

Mit einem klassischen SCD-Typ-2 Designansatz habe ich in BigQuery also deutliche Nachteile. Verkürzt gesagt würde ich durch diesen Ansatz billigen Speicherplatz gegen teure Rechenkraft (zur Delta-Ermittlung) eintauschen. Ferner könnte ich auch nie die optimale Performanz erzielen, da zeilenbasierte Abfragen quer zur spaltenorientierten Speicherstruktur laufen. Dadurch würde ich eher gegen BigQuery arbeiten und nicht mit BigQuery.

Functional ETL & Snapshots

Ein zunächst radikal erscheinender alternativer Ansatz wäre, die Daten nach der Truncate-Insert Methode zu laden, und zwar jeweils für eine ganze Partition. Da die vorhandenen Datenversionen aber erhalten bleiben sollen, reduziert sich das für gewöhnlich auf Insert-only bzw. Append-only Operationen, die einerseits extrem schnell sind, andererseits aber auch den größten Speicherverbrauch aufweisen. Es hängt somit neben der Datenmenge v.a. auch von der Ladefrequenz ab, ob dieser Mehrverbrauch noch wirtschaftlich ist.

Wie der Name Slowly-Changing-Dimension schon sagt, stehen hier aber meistens Anwendungsfälle im Fokus, bei denen die Daten sich nicht sehr häufig ändern, z.B. Kunden- oder Vertragsstammdaten. Anstatt alle fünf Minuten reicht es also meistens aus, wenn diese Daten nur einmal täglich durch einen Batch-Lauf ins DWH geladen werden.

Um das Ganze anschaulicher zu machen, möchte ich diesen alternativen ETL-Ansatz an folgendem Anwendungsfall demonstrieren (Abb. 3):

Abbildung 3 – Full Load Datenhistorisierung mittels Snapshot Logik

Aus dem HR-System eines Großkonzerns werden täglich zwei Dateien geliefert, die komplette Mitarbeiterliste (employees) sowie die aktuellen Stammdaten aller Abteilungen (departments). Es handelt sich in beiden Fällen also um Full Loads. Diese Dateien könnten z.B. in Google Cloud Storage (das Pendant zu AWS S3) abgelegt sein, was als Staging Area des DWH dient, oder aber direkt in Staging Tables in BigQuery importiert werden.

Im nächtlichen Ladelauf für den 25. Juni werden nun die jeweils neuesten Dateien vom ETL-Prozess genommen, miteinander gejoint und als neue Partition in die Zieltabelle eingefügt. Sofern eine Datenquelle nicht geliefert hat (hier z.B. employees) wird automatisch die jeweilige Datei vom Vortag genommen. Über die Zeit entsteht so in der Zieltabelle eine chronologische Sequenz an täglichen Datenscheiben (Snapshots). Jede von Ihnen enthält den kompletten Datenbestand zum jeweiligen Ladestichtag. Das hat mehrere Vorteile:

  1. Es ist kein kostenintensiver Delta-Abgleich notwendig; die Daten in der Zieltabelle sind unveränderlich.
  2. Abfragen auf historische Datenbestände ist einfach und effizient über entsprechende Partitionsfilter möglich (sog. time travelling).
  3. Die Datenscheiben sind voneinander völlig unabhängig und können so auch ggf. parallel beladen werden; es gibt also keine Seiteneffekte und das Ergebnis hängt allein von den Inputdaten und dem Ladedatum als Parameter ab (Zustandslosigkeit).
  4. Die mehrfache Ausführung eines Ladelaufs erzeugt bei unveränderten Quelldaten immer den exakt gleichen Zustand in der Zieltabelle, ohne Duplikate zu erzeugen; die Ladelogik ist also idempotent.

Unveränderliche Daten, frei von Seiteneffekten, Zustandslosigkeit und Idempotenz sind die Kerneigenschaften eines funktionalen Designs. Fast alle verteilten Systeme, z.B. Hadoop/Map–Reduce, Apache Spark, etc. folgen dem funktionalen Paradigma, da es einen hohen Grad an Parallelisierung ermöglicht.

Die unter Punkt 3 genannte Unabhängigkeit der Datenscheiben macht zudem Reloads früherer Datenscheiben extrem einfach (Abb. 4). Sollte sich zum Beispiel nachträglich herausstellen, dass die employees Daten vom 23. Juni fehlerhaft waren, muss lediglich die entsprechende Partition der Zieltabelle komplett gelöscht und der Ladelauf mit dem Parameter load_date = 2020-06-23 erneut gestartet werden. BigQuery bietet mit der Query Option WRITE_TRUNCATE die Möglichkeit, beide Schritte in einer Transaction durchzuführen. Der Parameter load_date = 2020-06-23 selektiert wieder jeweils die neuste Datenscheibe, deren Ladedatum kleiner oder gleich 23. Juni 2020 ist.

Abbildung 4 – Reload einer früheren Datenscheibe - Snapshot

Das Löschen einer ganzen Partition ist dabei viel schneller als das Löschen einzelner Datensätze und ist neben der Laufzeit auch von der Kostenseite her viel günstiger. BigQuery muss dazu nämlich keine Dateien lesen, sondern lediglich eine leicht zu bestimmende Teilmenge an Dateien komplett löschen. Die zu verarbeitende Datenmenge ist also 0 KB und somit wird auch nichts berechnet. Genau auf diese funktionale Verarbeitungslogik, Daten nur einmal zu schreiben und anschließend nicht mehr zu ändern, ist BigQuery, hin optimiert, um auch extrem große Datenmengen bewältigen zu können.

Sollte ich herausfinden, dass ich einen Fehler im Ladeprozess habe, so dass die Daten der letzten drei Monate fehlerhaft geladen wurden, könnte ich nach dem Bugfix bei Bedarf den Reload aller betroffenen Partitionen aufgrund ihrer Unabhängigkeit, in gewissen Kapazitätsgrenzeng, gleichzeitig durchführen da BigQuery ad-hoc die Rechenkapazität hochskalieren kann.

Bisher habe ich nur Full Load Szenarien betrachtet. Wie müsste ich meinen Ansatz anpassen, um auch Delta Load Szenarien abzudecken, wenn die Datenquellen also inkrementell nur neue und geänderte Datensätze liefern? Der Hauptunterschied beschränkt sich im Prinzip darauf, dass ich jede Staging Tabelle kumulativ von Beginn an bis zum jeweiligen Ladedatum (z.B. 23. Juni 2020), welches die Bedeutung einer Watermark hat, auslese. Die Datenmenge ist dabei fast identisch zu der im Full Load Szenario, da jede Datenscheibe nun deutlich kleiner ist. Da allerdings Datensätze mehrfach geändert sein könnten, ist hier eine zusätzliche Deduplizierung pro Business Key notwendig.

Abbildung 5 – Delta Load Datenhistorisierun

Zu beachtende Grenzen

Das BigQuery Feature Tabellenpartition ist also ein geeignetes Mittel, um die Snapshot Logik effizient zu implementieren. Dabei muss man aber im Auge behalten, dass z.Z. BigQuery maximal 4000 Partitionen pro Tabelle unterstützt. Bei täglicher Beladung kann ich also maximal eine Datenhistorie von 4000 Tagen vorhalten, was allerdings mehr als 10 Jahre sind und in den meisten Fällen ausreichen sollte.

Bei einer stündlichen Beladung käme dieser Ansatz sehr viel schneller an seine Grenzen. Nicht nur würde der Overhead um den Faktor 24 ansteigen, sondern auch die maximal mögliche Historie würde sich auf um den gleichen Faktor verringern. Schon nach knapp 167 Tagen wäre die maximale Anzahl an Partitionen erreicht.

In meinen bisherigen DWH Projekten war es aber häufig der Fall, dass nur die Daten des laufenden Monats oder Quartals auf Tagesbasis vorliegen mussten. Für frühere Zeiträume wurden nur die Monatsenddaten vorgehalten. Aufgrund der 1:1-Beziehung zwischen Snapshot und Tabellenpartition und deren Unabhängigkeit voneinander, ist es relativ einfach, nichtmehr benötigte Datenhistorien zu löschen. Dass reduziert den Overhead an Daten enorm und auch die 4000 Partitionen reichen dann für viel längere Zeiträume.

Bei datums- oder zeitstempelbasierter Partitionierung unterstützt BigQuery z.Z. nur “DAILY” und “HOURLY”. Eine monatliche oder minütlich Partitionierung ist also nicht möglich.

Data Lineage und Snapshot genaue Wiederherstellbarkeit

Der bisher von mir skizzierte funktionale ETL-Ansatz hat allerdings noch eine entscheidende Schwäche. Es ist bisher nicht nachvollziehbar, aus welchen konkreten Datenzuständen in den einzelnen Quellen eine konkrete Partition (Snapshot) in der Zieltabelle erzeugt wurde.

Beim Beispiel in Abb. 3 wurde die Datenscheibe 25.06. in der Zieltabelle mit Mitarbeiterdaten vom 24.06. Erzeugt.

{employees: 2020-06-24; departments: 2020-06-25} --> {target table: 2020-06-25}

Sollten das Quellsystem die Mitarbeiterdaten vom 25.06. aber inzwischen in die Staging Area nachgeliefert haben, würde ein erneuter Ladelauf per 25.05. nun ein anderes Resultat liefern, gemäß der Logik: nehme die jeweils jüngsten verfügbaren Quelldaten per 25.06.:

{employees: 2020-06-25; departments: 2020-06-25} --> {target table: 2020-06-25}

Da in diesem Zuge die ursprüngliche Version der Partition {target table: 2020-06-25} gelöscht wurde, kann ich diesen Zustand nicht mehr rekonstruieren. Ich kann jetzt auch nicht mehr nachvollziehen, warum ein Report vom 25.06. zu Ergebnissen kam, die nun nicht mehr zu der Datenscheibe 25.06. in die Zieltabelle passen.

Dieses Problem lässt sich aber relativ einfach dadurch lösen, dass ich die Angabe aller verwendeten Snapshots aus den Quellen als zusätzliche Metadatenspalte in die Zieltabelle hinzufüge. Im gegebenen Beispiel werden die Daten aus zwei Quelltabellen geladen, künftig könnten aber ggf. noch weitere hinzukommen. Ich möchte also nicht für jede Quelltabelle eine eigene Spalte im Tabellenschema anlegen. Zum einen um die Anzahl der Metadatenspalten möglichst gering und v.a. konstant zu halten. Zum anderen möchte ich auch nicht die Datenzeilen duplizieren, indem ich für jede Quelltabelle eine eigene Zeile einfüge.

In BigQuery kann ich hingegen “nested and repeated fields” definieren (Abb. 6). Im Prinzip ist das eine Spalte, die pro Zeile ein Array an Datentupeln aufnehmen kann. Die Anzahl der Array-Zeilen kann dabei pro Datensatz unterschiedlich sein. Wenn ich also später weitere Quellen in den Ladeprozess mit aufnehme, muss ich keine Änderung an der Metadatenspalte vornehmen. Jedes Array hat dann einfach eine Zeile mehr pro Datensatz.

Abbildung 6 – Nested and Repeated Fields: Schema
Abbildung 6 – Nested and Repeated Fields: Schema

Wie in Abb.7 zu sehen, dupliziere ich durch diese Arrays auch nicht meine Datenspalten. Im Endeffekt habe ich jetzt einen idealen Zustand hinsichtlich Data Lineage erreicht. Jede Datenzeile sagt mir ganz genau, aus welchen Quelltabellen und aus welchem exakten Datenstand er erzeugt wurde.

Abbildung 7 – Nested and Repeated Fields - Daten

Wenn ich nun meinen ETL Code um zwei optionale Parameter (emp_snapshot, dept_snapshot) erweitere, kann ich ganz exakt steuern, auf Basis welchen Datenzustands ich eine Datenscheibe in der Zieltabelle erzeuge und lade.

Für einen Proof-of-Concept habe ich die ETL Logik in Python implementiert und diese mittels Google Cloud Functions als Serverless Function in GCP verfügbar gemacht. Einen Ladelauf kann ich so per HTTP Request starten. Für das Scheduling und die Orchestrierung vieler Ladeprozesse und ihrer Abhängigkeiten bietet sich z.B. Apache Airflow an (Abb. 8).

Abbildung 8 – PoC Aufbau in GCP

Im täglichen Ladelauf würde ich standardmäßig den HTTP Request immer mit dem aktuellen Ladedatum parametrisieren.

Der Ladelauf nimmt aus jeder Quelltabelle die jeweils jüngsten verfügbaren Daten zu diesem Stichtag, d.h. die beiden Parameter emp_snapshot und dept_snapshot werden dynamisch in Abhängigkeit vom Parameter snapshot, bestimmt. Dazu habe ich in meinem ETL-Code eine Hilfsfunktion get_latest_source_snapshots(snapshot) implementiert (Listing 1).

 

SELECT
   "scd_bronze" as dataset,
   "employees_full" as table_name,
   max(snapshot) as source_snapshot
FROM
   fast-analytics-platform.scd_bronze.employees_full
WHERE
   snapshot<= DATE (@snapshot)   
UNION ALL
SELECT
   "scd_bronze" as dataset,
   "departments_full" as table_name,
   max(snapshot) as source_snapshot
FROM
   fast-analytics-platform.scd_bronze.departments_full
WHERE
    snapshot <= DATE (@snapshot) Listing 1 – Bestimmung der aktuellen Snapshots

Am Ende gibt diese Funktion ein Python Dictionary zurück, dessen Werte auf die ETL-Parameter emp_snapshot und dept_snapshot gemappt werden.

{“employees_full”: “2020-06-23”,
“departments_full”: “2020-06-23”}

Muss ich hingegen einen bestimmten Datenzustand rekonstruieren, bei dem die generische Regel nicht zum gewünschten Ergebnis führt, kann ich beim HTTP-Request genau vorgeben, welche Snapshots aus den Quelltabellen zu nehmen sind, indem ich die Werte für die ETL-Parameter emp_snapshot und dept_snapshot beim Funktionsaufruf explizit vorgebe.

Die eigentliche Ladelogik bildet im Kern dieses parametrisierte SQL Statement.

SELECT
   DATE(@snapshot) as snapshot,
   CURRENT_TIMESTAMP() as insert_ts,
   ARRAY<STRUCT<dataset STRING, table STRING, snapshot DATE >> [
      ("scd_bronze", "employees_full", DATE(@emp_snapshot)),
      ("scd_bronze", "departments_full", DATE(@dept_snapshot))
   ] as source_snapshots,
   emp.employee_id,
   emp.last_name,
   emp.first_name,
   emp.email,
   emp.job_title,
   dept.department_code,
   dept.name as department_name,
   dept.building,
   dept.head_of_dept
FROM
(
   SELECT *
   FROM fast-analytics-platform.scd_bronze.employees_full
   WHERE snapshot = DATE(@emp_snapshot)
) as emp
left outer join
(
   SELECT *
   FROM fast-analytics-platform.scd_bronze.departments_full
   WHERE snapshot = DATE(@dept_snapshot)
) as dept
ON emp.department_code = dept.department_code

Listing 2 – Ladelogik inkl. Metadatenspalte

Diese Query wird dann zusammen mit der Option write_disposition = „WRITE_TRUNCATE“ an die BigQuery API übergeben, so dass eine ggf. bereits vorhandene Partition innerhalb der gleichen Transaktion gelöscht und durch die neu geladenen Daten ersetzt wird.

Jetzt könnte man natürlich einwenden, dass dieser Ansatz einen sehr großen Overhead verursacht. Innerhalb eines Snapshots, d.h. einer Tabellenpartition, enthält die Metadatenspalte immer exakt den gleichen Wert und das u.U. für hunderttausende oder gar Millionen von Datensätzen. Ja, das ist auch so, allerdings tut dieser Overhead aufgrund der spaltenorientierten Speicherstruktur von BigQuery nicht so sehr weh. Da jede Spalte unabhängig von den anderen in eigenen Dateien abgespeichert wird, kann BigQuery sich oft wiederholende Spaltenwerte stark komprimieren. Zum anderen haben die Metadaten keinerlei negativen Einfluss auf die Performanz meiner Abfragen auf diese Tabelle. Solange ich die Spalte source_snapshots nicht mit in die SELECT-Klausel aufnehme, werden die Dateien dieser Spalte von BigQuery bei der Ausführung meiner Abfrage komplett ignoriert. Auch hier zeigt sich wieder, dass ich mich von meinem gewohnten RDBMS-geschulten Denken hin und wieder lösen sollte.


Jetzt teilen auf:

Jetzt kommentieren