create mobile friendly website

apache spark

Big Data - Informationen neu gelebt (Teil IV)

Apache Spark

Apache Spark ist, wie auch MapReduce, eine Plattform für verteilte Berechnungen in einem Cluster. Während bei MapReduce Daten bei der Verarbeitung aber immer wieder „auf Platte“ geschrieben werden müssen, ermöglicht Apache Spark eine Verarbeitung innerhalb des Arbeitsspeichers der Knoten des Big-Data-Cluster. Zudem können Zwischenergebnisse für weitere Verwendungen dort vorgehalten werden. Auf diese Weise kann Apache Spark gegenüber MapReduce deutliche Performance-Gewinne verzeichnen.

Geschichte

Die Entwicklung von Apache Spark begann 2009 an der University of California in Berkeley. Viele der Projekt­­mit­arbeiter hatten vorher an der Entwicklung des Hadoop MapReduce-Algorithmus mitgearbeitet und waren mit dessen Effizienz bei iterativen und interaktiven Ver­arbeitungen unzufrieden. Spark wurde daher von Anfang an dafür konzipiert, entsprechende Verarbeitungsalgo­rithmen sehr schnell ausführen zu können. Bereits seine erste, 2009 veröffentlichte, Version war in einigen Bereichen bis zu 20-mal schneller als MapReduce.

Vorteile

Verglichen mit MapReduce hat Apache Spark zwei wesentliche Vorteile: Zum einen ist es entworfen worden, um die Berechnungen in möglichst hoher Geschwindigkeit durchzuführen. Dies ermöglicht nicht nur, dass auch aufwendigste Batch-Verarbeitungen in ihrem vorgesehenen Zeitrahmen durchgeführt werden können. Es erlaubt aufgrund seiner Fähigkeit, sämtliche Berechnungen im Arbeits­­speicher durchführen zu können - auch interaktive Abfragen auf Terabyte von Daten, ohne mehrere Minuten oder gar Stunden auf Ergebnisse warten zu müssen.

Zum anderen wurde Apache Spark entwickelt, um eine Menge von Arbeitsgebieten im Umfeld von Big Data ab­zudecken, ohne dafür mehrere verteilte Systeme parallel betreiben zu müssen. Batch-Verarbeitung, iterative Algorithmen, interaktive Abfragen und Streaming stellen verschiedenste Anforderungen an Big-Data-Systeme. Vor Apache Spark mussten daher unterschiedliche Systeme zur Erfüllung dieser Anforderungen vorgehalten werden. Durch die Bündelung dieser heterogenen Datenver­ar­beitungsformen in einer einzigen Engine kann Apache Spark auf einfache und kostengünstige Weise verschiedene Verarbeitungstypen kombinieren und so die Analyse produktiver Daten wirksam unterstützen.

Spark bietet einfache APIs für Python, Java, Scala, SQL und R und ist zudem stark in andere Big-Data-Werkzeuge integriert. Es kann auf Hadoop Clustern laufen und auf nahe­zu jede Art von Hadoop-Datenquellen (Parquet, ORC, Avro, CSV etc.) sowie die NoSQL-Datenbank Cassandra zugreifen.

Der Spark Stack

Um aus unterschiedlichen Anwendungsgebieten stammende Anforderungen erfüllen zu können, unterteilt sich Apache Spark in den Spark-Kern (Core) und mehrere, darauf aufbauende und auf die jeweiligen Anforderungen spezialisierte Komponenten. Der Spark Core ist eine sogenannte Computational Engine. Er ist verantwortlich für das Scheduling sowie die Verteilung und Überwachung der aus vielen Tasks bestehenden und im Spark Cluster laufenden Applikationen.

Der Spark Core kann Daten In-Memory verarbeiten und ist somit sehr schnell. Zudem unterstützt er viele Verarbeitungsarten (Batch, iterativ, interaktiv). Der Spark Core kann so diverse spezialisierte, auf ihm aufbauende Kompo­nenten (wie SQL oder Machine Learning), be­dienen.

Die weiteren Komponenten sind untereinander sehr stark integriert und können daher wie Bibliotheken in einem Softwareprojekt frei kombiniert werden. Vorteil dieser Integra­tion ist, dass Komponenten der höheren Ebene gemeinsam von Verbesserungen des Spark Core profi­tieren. Statt diverse spezialisierte Systeme vorzuhalten, genügt nun ein einziges. Hierdurch können die Kosten des Betriebes der Big-Data-Anwendung gesenkt werden.

Darüber hinaus ist es möglich, Anwendungen zu entwickeln, welche die verschiedenen Komponenten und deren Verarbeitungsprozessmodelle nahtlos miteinander kombinieren. So kann z. B. eine Anwendung, die Machine Learning benutzt, die Daten in Echtzeit klassifizieren. Gleichzei­tig können Analysten die klassifizierten oder auch die Rohdaten ebenfalls in Echtzeit mit SQL abfragen. Zusätzlich können Data Scientists mit anspruchsvolleren Anforderungen auf diese Daten zugreifen und mit R, Python, Scala oder Java auswerten. Dennoch muss die IT immer nur ein System beaufsichtigen.

Der Aufbau des Spark Stacks ist in Abbildung 1 dargestellt. Die obere Ebene zeigt die Spark-Komponenten und ihren Haupteinsatzzweck, die untere Ebene die verwendbaren Cluster Management Systeme.

I/O: Spark vs. MapReduce

MapReduce ist ein sehr starres Programmiermodell. Um das gewünschte Ziel zu erreichen, ist es in der Regel erforderlich, mehrere MapReduce-Verarbeitungen mit­ein­ander zu kombinieren und so ganze Ketten von MapReduce Jobs zu entwickeln. Neben dem, verglichen mit dem Spark-Ansatz, erhöhten Programmieraufwand von MapReduce hemmt besonders der I/O die Performance der Verarbeitung. Daten müssen immer wieder aus dem Dateisystem (z. B. HDFS) gelesen und geschrieben werden. Der Ansatz von Spark ist hier ein anderer: Die Daten werden zu Beginn der Verarbeitung gelesen. Spark versucht anschließend, die Daten dauerhaft im Speicher zu halten und erst zum Ende der Verarbeitung wieder auf das verwendete Dateisystem zu schreiben (siehe Abbildung 2). Zudem ist es möglich, Verzweigungen zu bauen und so Daten einmalig zu lesen und in verschiedenen Strängen zu verarbeiten.

Architektur

Spark nutzt eine Master/Slave-Architektur bestehend aus einem zentralen Koordinator (Driver) und mehreren verteilten Worker Nodes. Driver und zugehörige Executors bilden ein Spark-Programm, welches durch einen externen Service, den Cluster Manager, gestartet wird. Sowohl der Driver als auch jeder Executor laufen in je einem separaten Java-Prozess.

Der Driver hat zwei Hauptaufgaben: Zunächst muss die Applikation in einzelne, physische Ausführungseinheiten (Tasks) übersetzt werden, um eine parallele Verarbeitung zu ermöglichen. Ein Spark-Programm erzeugt bei Er­stellung implizit einen Directed Acyclic Graph (DAG), welcher bei der Ausführung des Driver zu einem Ausführungsplan übersetzt wird und aus mehreren Stages bestehen kann. Jede Stage wiederum besteht aus einer Vielzahl an Tasks. Die zweite Hauptaufgabe ist die Verteilung der Tasks auf die Knoten des Cluster. Dies überlässt der Driver dem Cluster Management System (z. B. YARN).

Spark Executors stellen Worker-Prozesse dar, welche die individuellen Tasks ausführen, die Ergebnisse speichern und diese zurück an den Driver geben. Executors ermög­lichen die Verarbeitung der Daten und deren Zwischen­speicherung im Arbeitsspeicher. Wird ein Executor gestartet, registriert sich dieser beim Driver und bekommt einen Task zugewiesen. Jede Applikation erhält ihre eigenen Executor-Prozesse. Der Knoten eines Cluster kann somit mehrere Tasks verschiedener Applikationen in ihren eigenen Executor-Prozessen simultan bearbeiten.

Aufgabe des Cluster Manager ist es, die zur Verfügung stehenden Ressourcen des Cluster an die einzelnen Appli­ka­tionen zu verteilen und die Executors zu starten. Er ist austauschbar und kein integraler Bestandteil von Spark. Die Spark-Architektur ist in Abbildung 3 noch einmal veranschaulicht.

RDD: Resilient Distributed Dataset

Die Verarbeitung der Daten in Spark erfolgt mit Resilient Distributed Datasets (RDD). Ein RDD in Spark ist – vereinfacht ausgedrückt – eine unveränderliche, verteilte Sammlung von Objekten. Es ist die bevorzugte Abstrak­tionsschicht in Spark zur Verarbeitung von Daten und Speicherung von Zwischenergebnissen. Ein RDD unterteilt sich dabei in einzelne Partitionen. Diese können auf verschiedenen Knoten des Cluster berechnet werden und enthalten wiederum jede Art von Python-, Java- oder Scala-Objekten.

RDDs können auf zweierlei Weise erstellt werden: Entweder wird ein externes Dataset geladen (z. B. eine Datei aus HDFS) oder eine Sammlung von Objekten wird im Driver Programm verteilt. Sobald ein RDD erstellt ist, können zwei Arten von Operationen auf diesem durchgeführt werden: Transformationen und Aktionen. Bei einer Transformation werden die Daten eines RDD verändert und in ein neues RDD geschrieben.

Eine Transformation könnte z. B. die Filterung eines Textes nach einem bestimmten String sein. Das erste RDD enthält dann den gesamten Text, das zweite nur die Zeilen, in denen der gesuchte String vorkommt. Aktionen hin-gegen berechnen ein Ergebnis auf Basis eines RDD und speichern dieses (z. B. in HDFS) oder geben es an das Driver Programm zurück. Spark ermöglicht derzeit mehr als 80 verschiedene Transformationen von RDDs.

Abbildung 4 zeigt das Beispiel eines Spark-Programms in der Programmiersprache Scala. Zunächst wird in der ersten Zeile ein RDD erstellt, indem aus einer Datei gelesen wird. In der zweiten Zeile wird eine Transformation durchgeführt. Aus dem in Zeile 1 erstellten RDD lines_ds werden alle Zeilen herausgefiltert, die den String ORDIX enthalten. Hierdurch entsteht das zweite RDD mit der Bezeichnung lines_ordix. Die letzte Zeile enthält nun eine Aktion: Hier werden die Zeilen aus dem vorhergehenden RDD gezählt. Das Ergebnis ist nun die Anzahl aller Zeilen einer Datei, in denen der String ORDIX vorkommt.

Die RDDs werden in Form der sogenannten Lazy Evaluation berechnet. Konkret bedeutet dies, dass die Transforma­tion der Daten von einem RDD in ein anderes erst durchgeführt wird, wenn auf dem letzten RDD einer Kette eine Aktion ausgeführt wird. Dies hat zwei Vorteile: Erstens kann das Skript bis zur ersten Aktion komplett geschrieben werden, ohne dass der Entwickler nach jeder Codezeile erst das Ergebnis abwarten müsste. Zweitens kann durch Code Rewriting die Performance gesteigert werden. Dies ist aber nur möglich, wenn das Ziel des Skriptes bekannt ist.

Persistierung von RDDs

RDDs speichern in der Regel keine Daten. Sie beinhalten lediglich die Information, wie diese berechnet werden können. Bei einer Verzweigung im Programmablauf haben zwei oder mehr RDDs denselben Vorgänger. Dieses „Vorgänger-RDD“ muss daher für jeden Zweig einzeln berechnet werden. Wollte man mit dem Skript in Abbildung 4  auch zusätzlich die Zahl aller Zeilen ermitteln, die das Wort NEWS enthalten, könnte dies mit der folgenden Transformation geschehen:

val lines_news = lines_df.filter(line => line.contains("NEWS"))

Das RDD anz_lines würde somit in zwei Transformatio­nen verwendet werden und müsste, wenn es nicht persis­tiert wird, zweimal berechnet werden. Um dies zu vermeiden, können die Ergebnisse einer RDD-Transformation persistiert und somit für weitere Schritte vorgehalten werden. Dies geschieht über den Befehl rdd.persist(). Eine Persistierung der Daten kann im Speicher, im Dateisystem oder auch in beiden gleichzeitig erfolgen.

Neben einer Performance-Steigerung ermöglicht eine Persistierung auch eine interaktive Vorgehensweise: Die Daten können einmal aus dem Dateisystem in den Arbeitsspeicher gelesen werden und dort für vielfältige interaktive Analysen vorgehalten werden. Einmalig benötigte Daten werden hingegen nach deren Verwendung sofort entfernt.

Key Value RDDs & DataFrames

Neben den bereits gezeigten RDDs besitzt Spark zwei weitere Arten: Key Value RDDs und DataFrames.

Key Value RDDs werden auch Pair RDDs genannt und sind ein Spezial­fall der normalen RDDs. Sie sind besonders auf die Aufnahme von Key/Value-Wertepaaren ausgelegt. Sie unterstützen die gleichen Funktionen wie herkömmliche RDDs und bieten zusätzliche Funktionen zum Aggregieren, Gruppieren, Joinen und Sortieren der Daten.

DataFrames sind die bevorzugte Abstraktion in Spark. Sie unterstützen zahlreiche Datenformate und Speichersysteme. Ein DataFrame ist ein RDD, welches aus RowObjects besteht und zusätzlich Schemainformationen der Datentypen jeder Zeile beinhaltet. Vereinfacht entsprechen DataFrames den aus relationalen Datenbanken bekannten Tabellen. Da DataFrames jedoch ebenfalls RDDs sind, können auf diesen auch die für normale RDDs verwendeten Transformationen genutzt werden.

Darüber hinaus wird mit Spark SQL ein Interface zur Verfügung gestellt, um mit strukturierten, d. h. schemage­stützten Daten zu arbeiten. Spark SQL ermöglicht es, Daten aus einer Vielzahl von strukturierten Quellen (JSON, Hive, Parquet) zu laden und in DataFrames zu speichern.

Die Abfrage und Manipulation der Daten aus den DataFrames ist mit den von Spark zur Verfügung gestellten, an SQL angelehnten DataFrame-Funktionen möglich. Darüber hinaus bietet Spark auch eine SQL-Funktion, um DataFrames mithilfe der an den SQL-Standard ange­lehnten Hive Query Language (HQL) abzufragen. In Abbildung 5 werden Daten aus einer CSV-Datei in einen DataFrame importiert und anschließend abgefragt.

Fazit

Apache Spark ist gekennzeichnet durch seine Schnelligkeit und Vielfältigkeit in der Verarbeitung von Daten im Umfeld von Big Data. Einige Autoren der Spark-Literatur sind bereits der Ansicht, dass Apache Spark etablierte Big-Data-Technologien, wie vor allem MapReduce, ver­drängen könnte und neben Hadoop zur Haupttechno­logie im Big-Data-Umfeld aufsteigen wird. Ob dies wirklich geschieht, muss die Zukunft zeigen. Die Aussichten dafür stehen jedenfalls nicht schlecht.

Philipp Loer
()

 

Glossar

API
Das Application Programming Interface ist eine Programmierschnittstelle, die von einem Softwaresystem anderen Programmen zur Anbindung
an das System zur Verfügung gestellt wird.

DAG
Der Directed Acyclic Graph ist ein Konstrukt der Mathematik. Dieses Konstrukt ermöglicht die schnelle Ausführung von komplexen verteilten Algorithmen.

YARN
Der Yet Another Resource Negotiator ist eine Ressourcenverwaltung in Hadoop. Sie ermöglicht es, die Ressourcen eines Clusters dynamisch für verschiedene Jobs zu verwalten. So ermöglicht es YARN, durch Queues die Kapazitäten des Clusters für Jobs festzulegen.

ORC
Optimized Row Columnar ist ein spaltenorientiertes Dateiformat.

JSON
Die Java Script Object Notation ist ein kompaktes Datenformat in lesbarer Textform zum Zweck des Datenaustauschs zwischen Anwendungen.

Hive
Hive erweitert Hadoop um Data-Warehouse-Funktionalitäten, namentlich die Anfragesprache HiveQL und Indizes.

In-Memory
Eine In-Memory-Verarbeitung bedeutet, dass die Daten während der Verarbeitung nicht außerhalb des Arbeitsspeichers zwischengespeichert werden. Dies ist nicht zu verwechseln mit In-Memory-Datenbanken: Hier werden die Daten nicht nur während der Verarbeitung, sondern generell im Arbeitsspeicher gehalten.

Abbildungen:

bilder ppl 1 kl
Abb. 1: Spark-Komponenten
bilder ppl 2 kl
Abb. 2: Programmablauf Spark vs. MapReduce (MR)
bilder ppl 3 kl
Abb. 3: Spark-Architektur
val lines_df = sc.textFile("/user/spark/ordix.txt")
val lines_ordix = lines_df.fi lter(line => line.contains(
"ORDIX"))
val anz_ordix = lines_ordix.count()a
Abb. 4: Line Count
// Erstellen HiveContext
val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)
// Import HiveContext
import org.apache.spark.sql.hive.HiveContext;
// Create Table Statement
hiveCtx.sql("CREATE TABLE BUECHER(ID INT, AUTOR STRING,
TITEL STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY
',' STORED AS TEXTFILE")
// Load Local Data
hiveCtx.sql("LOAD DATA LOCAL INPATH '/home/ordix/buecher.
txt' INTO TABLE BUECHER")
// Select
hiveCtx.sql("SELECT AUTOR, TITEL FROM BUECHER").show
Abb. 5: Einfache Abfrage eines RDD mit HQL

Logo ORDIX<sup>®</sup>  news lang

Weitere Artikel

Eine Übersicht über alle Artikel finden Sie im Inhaltsverzeichnis.

Impressum

Impressum der ORDIX® news 1/2016

Links

 

[1] ORDIX® news Artikel 1/2015 –
„Big Data – Informationen neu gelebt (Teil I) - Wie big ist Big Data?“:
http://www.ordix.de/images/ordix/onews_archiv/1_2015/ORDIX_news_1_2015_opf_files/WebSearch/page0018.html

[2] ORDIX® news Artikel 2/2015 –
„Big Data – Informationen neu gelebt (Teil II) -Apache Cassandra“:
http://www.ordix.de/images/ordix/onews_archiv/2_2015/ORDIX_news_2_2015_opf_files/WebSearch/page0004.html

[3] ORDIX® news Artikel 3/2015 –
„Big Data – Informationen neu gelebt (Teil III) - Apache Hadoop – auf die elefantöse Art“:
http://www.ordix.de/images/ordix/onews_archiv/3_2015/ORDIX_news_3_2015_opf_files/WebSearch/page0042.html

Quellen

 

[Q1] Holende et al.: „Learning Spark. Lightning-Fast Big Data Analysis“; 1. Auflage; Sebastopol: O'Reilly Media, 2015

[Q2] Bonaci, Marko; Zecevic, Petar: Spark IN ACTION; 1. Auflage; Shelter Island: Manning Publications Co., 2016

[Q3] White, Tom: „Hadoop: The Definitive Guide“; 4. Auflage; Sebastopol: O'Reilly Media, 2015

Bildnachweis

 

© freepik.com | starline |Technology background with circular mesh