TP4-5 Spark
TP4-5 Spark
TP4-5 Spark
Big Data
Objectifs du TP
Utilisation de Spark pour réaliser des traitements par lot et des traitements en
streaming.
1/22
Atelier 4-4 - Apache Spark - TP
Big Data
Outils et Versions
Apache Hadoop [http://hadoop.apache.org/] Version: 2.7.2
Java
[http://www.oracle.com/technetwork/java/javase/downloads/index.html]
Version 1.8
Spark
Présentation
2/22
Atelier 4-4 - Apache Spark - TP
Big Data
Spark et Hadoop
Dans ce TP, nous allons exécuter Spark sur Hadoop YARN. YARN s'occupera ainsi
de la gestion des ressources pour le déclenchement et l'exécution des Jobs
Spark.
Installation
3/22
Atelier 4-4 - Apache Spark - TP
Big Data
./start-hadoop.sh
Vous pourrez vérifier que tous les démons sont lancés en tapant: jps . Un
résultat semblable au suivant pourra être visible:
880 Jps
257 NameNode
613 ResourceManager
456 SecondaryNameNode
La même opération sur les noeuds esclaves (auquels vous accédez à partir de
votre machine hôte en tapant docker exec -it hadoop-slave1 bash ) devrait
donner:
176 NodeManager
65 DataNode
311 Jps
Error
Si le message suivant s'affiche: put: `.': No such file or directory , c'est parce que
l'arborescence du répertoire principal n'est pas créée dans HDFS. Pour le faire, il suffit
d'exécuter la commande suivante avant la commande de chargement :
hadoop fs mkdir -p .
Pour vérifier que spark est bien installé, taper la commande suivante:
4/22
Atelier 4-4 - Apache Spark - TP
Big Data
spark-shell
Vous pourrez tester spark avec un code scala simple comme suit (à exécuter
ligne par ligne):
Ce code vient de (1) charger le fichier file1.txt de HDFS, (2) séparer les mots
selon les caractères d'espacement, (3) appliquer un map sur les mots obtenus
qui produit le couple (<mot>, 1), puis un reduce qui permet de faire la somme des1
des mots identiques.
5/22
Atelier 4-4 - Apache Spark - TP
Big Data
L'API de Spark
A un haut niveau d'abstraction, chaque application Spark consiste en un
programme driver qui exécute la fonction main de l'utilisateur et lance plusieurs
opérations parallèles sur le cluster. L'abstraction principale fournie par Spark est
un RDD (Resilient Distributed Dataset), qui représente une collection d'éléments
partitionnés à travers les noeuds du cluster, et sur lesquelles on peut opérer en
parallèle. Les RDDs sont créés à partir d'un fichier dans HDFS par exemple, puis
le transforment. Les utilisateurs peuvent demander à Spark de sauvegarder un
RDD en mémoire, lui permettant ainsi d'être réutilisé efficacement à travers
plusieurs opérations parallèles.
les actions, qui retournent une valeur au programme driver après avoir
exécuté un calcul sur le Dataset.
6/22
Atelier 4-4 - Apache Spark - TP
Big Data
Par exemple, un map est une transformation qui passe chaque élément du
dataset via une fonction, et retourne un nouvel RDD représentant les résultats.
Un reduce est une action qui agrège tous les éléments du RDD en utilisant une
certaine fonction et retourne le résultat final au programme.
Toutes les transformations dans Spark sont lazy, car elles ne calculent pas le
résultat immédiatement. Elles se souviennent des transformations appliquées à
un dataset de base (par ex. un fichier). Les transformations ne sont calculées
que quand une action nécessite qu'un résultat soit retourné au programme
principal. Cela permet à Spark de s'exécuter plus efficacement.
Exemple
L'exemple que nous allons présenter ici par étapes permet de relever les mots les
plus fréquents dans un fichier. Pour cela, le code suivant est utilisé:
7/22
Atelier 4-4 - Apache Spark - TP
Big Data
8/22
Atelier 4-4 - Apache Spark - TP
Big Data
9/22
Atelier 4-4 - Apache Spark - TP
Big Data
10/22
Atelier 4-4 - Apache Spark - TP
Big Data
Nous allons dans cette partie créer un projet Spark Batch en Java (un simple
WordCount), le charger sur le cluster et lancer le job.
<groupId>spark.batch</groupId>
<artifactId>wordcount</artifactId>
<version>1</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
11/22
Atelier 4-4 - Apache Spark - TP
Big Data
<version>1.7.22</version>
</dependency>
</dependencies>
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.util.Arrays;
12/22
Atelier 4-4 - Apache Spark - TP
Big Data
master est une URL d'un cluster Spark, Mesos ou YARN, ou bien une
chaîne spéciale local pour lancer le job en mode local.
Warning
Nous avons indiqué ici que notre master est local pour les besoins du test, mais plus tard,
en le packageant pour le cluster, nous allons enlever cette indication. Il est en effet
déconseillé de la hard-coder dans le programme, il faudrait plutôt l'indiquer comme option
de commande à chaque fois que nous lançons le job.
Le reste du code de l'application est la version en Java de l'exemple en scala que nous
avions fait avec spark-shell.
Pour tester le code sur votre machine, procéder aux étapes suivantes:
src/main/resources/loremipsum.txt src/main/resources/out
13/22
Atelier 4-4 - Apache Spark - TP
Big Data
Pour exécuter le code sur le cluster, modifier comme indiqué les lignes en jaune
dans ce qui suit:
14/22
Atelier 4-4 - Apache Spark - TP
Big Data
Lancer ensuite une configuration de type Maven, avec les commandes package
install. Un fichier intitulé worcount-1.jar sera créé sous le répertoire target.
Nous allons maintenant copier ce fichier dans docker. Pour cela, naviguer vers le
répertoire du projet avec votre terminal (ou plus simplement utiliser le terminal
dans IntelliJ), et taper la commande suivante:
Revenir à votre contenaire master, et lancer un job Spark en utilisant ce fichier jar
généré, avec la commande spark-submit , un script utilisé pour lancer des
applications spark sur un cluster.
Attention
Vérifiez bien que le fichier purchases existe dans le répertoire input de HDFS (et que le
répertoire output n'existe pas)!
Si ce n'est pas le cas, vous pouvez le charger avec les commandes suivantes:
Si tout se passe bien, vous devriez trouver, dans le répertoire output, deux
fichiers part-00000 et part-00001, qui ressemblent à ce qui suit:
15/22
Atelier 4-4 - Apache Spark - TP
Big Data
Mode cluster: où tout le job s'exécute dans le cluster, c'est à dire les
Spark Executors (qui exécutent les vraies tâches) et le Spark Driver (qui
ordonnance les Executors). Ce dernier sera encapsulé dans un YARN
Application Master.
Mode client : où Spark Driver s'exécute sur la machine cliente (tel que
votre propre ordinateur portable). Si votre machine s'éteint, le job
s'arrête. Ce mode est approprié pour les jobs interactifs.
Si tout se passe bien, vous devriez obtenir un répertoire output2 dans HDFS avec
les fichiers usuels.
16/22
Atelier 4-4 - Apache Spark - TP
Big Data
Erreur
En cas d'erreur ou d'interruption du job sur Yarn, vous pourrez consulter les fichiers logs
pour chercher le message d'erreur (le message affiché sur la console n'est pas assez
explicite). Pour cela, sur votre navigateur, aller à l'adresse:
http://localhost:8041/logs/userlogs et suivez toujours les derniers liens jusqu'à
stderr.
Spark Streaming
Spark est connu pour supporter également le traitement des données en
streaming. Les données peuvent être lues à partir de plusieurs sources tel que
Kafka, Flume, Kinesis ou des sockets TCP, et peuvent être traitées en utilisant des
algorithmes complexes. Ensuite, les données traitées peuvent être stockéessur
des systèmes de fichiers, des bases de données ou des dashboards. Il est
même possible de réaliser des algorithmes de machine learning et de traitement
de graphes sur les flux de données.
Environnement et Code
17/22
Atelier 4-4 - Apache Spark - TP
Big Data
1. Commencer par créer un nouveau projet Maven, avec le fichier pom suivant:
<groupId>spark.streaming</groupId>
<artifactId>stream</artifactId>
<version>1</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
18/22
Atelier 4-4 - Apache Spark - TP
Big Data
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
JavaReceiverInputDStream<String> lines =
jssc.socketTextStream("localhost", 9999);
JavaDStream<String> words =
lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator()
JavaPairDStream<String, Integer> pairs =
words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts =
pairs.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
jssc.start();
jssc.awaitTermination();
}
}
Le stream ici sera diffusé par une petite commande utilitaire qui se trouve dans
la majorité des systèmes Unix-like.
Exécuter votre classe Stream. Vous verrez défiler sur votre console des
lignes en continu: l'application est en écoute sur localhost:9999.
nc -lk 9999
19/22
Atelier 4-4 - Apache Spark - TP
Big Data
A chaque fois que vous entrez quelque chose sur le terminal, l'application
l'intercepte, et l'affichage sur l'écran de la console change, comme suit:
Ensuite, pour voir le résultat final du comptage, arrêter l'exécution en cliquant sur
le carré rouge, puis observer la console, vous verrez un affichage qui ressemble à
ceci:
20/22
Atelier 4-4 - Apache Spark - TP
Big Data
21/22