Spark Guide (PDFDrive)
Spark Guide (PDFDrive)
Spark Guide (PDFDrive)
Important Notice
© 2010-2017 Cloudera, Inc. All rights reserved.
Cloudera, the Cloudera logo, and any other product or service names or slogans contained
in this document are trademarks of Cloudera and its suppliers or licensors, and may not
be copied, imitated or used, in whole or in part, without the prior written permission
of Cloudera or the applicable trademark holder.
Hadoop and the Hadoop elephant logo are trademarks of the Apache Software
Foundation. All other trademarks, registered trademarks, product names and company
names or logos mentioned in this document are the property of their respective owners.
Reference to any products, services, processes or other information, by trade name,
trademark, manufacturer, supplier or otherwise does not constitute or imply
endorsement, sponsorship or recommendation thereof by us.
Complying with all applicable copyright laws is the responsibility of the user. Without
limiting the rights under copyright, no part of this document may be reproduced, stored
in or introduced into a retrieval system, or transmitted in any form or by any means
(electronic, mechanical, photocopying, recording, or otherwise), or for any purpose,
without the express written permission of Cloudera.
The information in this document is subject to change without notice. Cloudera shall
not be liable for any damages resulting from technical errors or omissions which may
be present in this document, or from use of this document.
Cloudera, Inc.
395 Page Mill Road
Palo Alto, CA 94306
info@cloudera.com
US: 1-888-789-1488
Intl: 1-650-362-0488
www.cloudera.com
Release Information
Note:
This page contains information related to Spark 1.6, which is included with CDH. For information about
the separately available parcel for Cloudera Distribution of Apache Spark 2, see the documentation
for Cloudera Distribution of Apache Spark 2.
Unsupported Features
The following Spark features are not supported:
• Spark SQL:
– Thrift JDBC/ODBC server
– Spark SQL CLI
• Spark Dataset API
• SparkR
• GraphX
• Spark on Scala 2.11
• Mesos cluster manager
Related Information
• Managing Spark
• Monitoring Spark Applications
• Spark Authentication
• Spark EncryptionSpark Encryption
• Cloudera Spark forum
• Apache Spark documentation
Spark Guide | 5
Running Your First Spark Application
$ SPARK_HOME/bin/spark-shell
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version ...
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67)
Type in expressions to have them evaluated.
Type :help for more information
...
SQL context available as sqlContext.
scala>
• Python:
$ SPARK_HOME/bin/pyspark
Python 2.6.6 (r266:84292, Jul 23 2015, 15:22:56)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-11)] on linux2
Type "help", "copyright", "credits" or "license" for more information
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version ...
/_/
3. Within a shell, run the word count application using the following code examples, substituting for namenode_host,
path/to/input, and path/to/output:
• Scala
6 | Spark Guide
Running Your First Spark Application
• Python
Spark Guide | 7
Spark Application Overview
Invoking an action inside a Spark application triggers the launch of a job to fulfill it. Spark examines the dataset on
which that action depends and formulates an execution plan. The execution plan assembles the dataset transformations
into stages. A stage is a collection of tasks that run the same code, each on a different subset of the data.
8 | Spark Guide
Developing Spark Applications
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SparkWordCount {
def main(args: Array[String]) {
// create Spark context with Spark configuration
val sc = new SparkContext(new SparkConf().setAppName("Spark Count"))
// get threshold
val threshold = args(1).toInt
// count characters
val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).reduceByKey(_ + _)
Spark Guide | 9
Developing Spark Applications
System.out.println(charCounts.collect().mkString(", "))
}
}
import sys
if __name__ == "__main__":
# get threshold
threshold = int(sys.argv[2])
# count characters
charCounts = filtered.flatMap(lambda pair:pair[0]).map(lambda c: c).map(lambda c: (c,
1)).reduceByKey(lambda v1,v2:v1 +v2)
list = charCounts.collect()
print repr(list)[1:-1]
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.SparkConf;
import scala.Tuple2;
// get threshold
final int threshold = Integer.parseInt(args[1]);
10 | Spark Guide
Developing Spark Applications
}
).reduceByKey(
new Function2() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
}
);
// count characters
JavaPairRDD<Character, Integer> charCounts = filtered.flatMap(
new FlatMapFunction<Tuple2<String, Integer>, Character>() {
@Override
public Iterable<Character> call(Tuple2<String, Integer> s) {
Collection<Character> chars = new ArrayList<Character>(s._1().length());
for (char c : s._1().toCharArray()) {
chars.add(c);
}
return chars;
}
}
).mapToPair(
new PairFunction<Character, Character, Integer>() {
@Override
public Tuple2<Character, Integer> call(Character c) {
return new Tuple2<Character, Integer>(c, 1);
}
}
).reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
}
);
System.out.println(charCounts.collect());
}
}
Because Java 7 does not support anonymous functions, this Java program is considerably more verbose than Scala and
Python, but still requires a fraction of the code needed in an equivalent MapReduce program. Java 8 supports anonymous
functions and their use can further streamline the Java application.
<plugin>
<groupId>org.scala-tools</groupId>
<xrefrtifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
Spark Guide | 11
Developing Spark Applications
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<xrefrtifactId>scala-library</artifactId>
<version>2.10.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<xrefrtifactId>spark-core_2.10</artifactId>
<version>1.6.0-cdh5.7.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
$ mvn package
If you use the example input file, the output should look something like:
(e,6), (p,2), (a,4), (t,2), (i,1), (b,1), (u,1), (h,1), (o,2), (n,4), (f,1), (v,1),
(r,2), (l,1), (c,1)
12 | Spark Guide
Developing Spark Applications
If you use the example input file, the output should look something like:
(e,6), (p,2), (a,4), (t,2), (i,1), (b,1), (u,1), (h,1), (o,2), (n,4), (f,1), (v,1),
(r,2), (l,1), (c,1)
[(u'a', 4), (u'c', 1), (u'e', 6), (u'i', 1), (u'o', 2), (u'u', 1), (u'b', 1), (u'f',
1), (u'h', 1), (u'l', 1), (u'n', 4), (u'p', 2), (u'r', 2), (u't', 2), (u'v', 1)]
3. Create a Kafka word count Python program adapted from the Spark Streaming example kafka_wordcount.py. This
version divides the input stream into batches of 10 seconds and counts the words in each batch:
import sys
Spark Guide | 13
Developing Spark Applications
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 10)
ssc.start()
ssc.awaitTermination()
4. Submit the application using spark-submit with dynamic allocation disabled and pass in your ZooKeeper server
and topic wordcounttopic. To run locally, you must specify at least two worker threads: one to receive and one
to process data.
hello
hello
hello
hello
hello
hello
gb
gb
gb
gb
gb
gb
Depending on how fast you type, in the Spark Streaming application window you will see output like:
-------------------------------------------
Time: 2016-01-06 14:18:00
-------------------------------------------
14 | Spark Guide
Developing Spark Applications
(u'hello', 6)
(u'gb', 2)
-------------------------------------------
Time: 2016-01-06 14:18:10
-------------------------------------------
(u'gb', 4)
import sys
checkpoint = "hdfs://ns1/user/systest/checkpoint"
sparkConf = SparkConf()
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
sc = SparkContext(appName="PythonStreamingKafkaWordCount",conf=sparkConf)
ssc = StreamingContext(sc, 10)
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
exit(-1)
Spark Guide | 15
Developing Spark Applications
Note:
Hive and Impala tables and related SQL syntax are interchangeable in most respects. Because Spark
uses the underlying Hive infrastructure, with Spark SQL you write DDL statements, DML statements,
and queries using the HiveQL syntax. For interactive query performance, you can access the same
tables through Impala using impala-shell or the Impala JDBC and ODBC interfaces.
If you use spark-shell, a HiveContext is already created for you and is available as the sqlContext variable.
If you use spark-submit, use code like the following at the start of the program:
Python:
The host from which the Spark application is submitted or on which spark-shell or pyspark runs must have a Hive
gateway role defined in Cloudera Manager and client configurations deployed.
When a Spark job accesses a Hive view, Spark must have privileges to read the data files in the underlying Hive tables.
Currently, Spark cannot use fine-grained privileges based on the columns or the WHERE clause in the view definition.
If Spark does not have the required privileges on the underlying data files, a SparkSQL query against the view returns
an empty result set, rather than an error.
16 | Spark Guide
Developing Spark Applications
• JSON:
• Parquet:
$ spark-shell
6. Create DataFrames containing the contents of the sample_07 and sample_08 tables:
Spark Guide | 17
Developing Spark Applications
+-------+--------------------+---------+------+
| code| description|total_emp|salary|
+-------+--------------------+---------+------+
|11-1011| Chief executives| 299160|151370|
|29-1022|Oral and maxillof...| 5040|178440|
|29-1023| Orthodontists| 5350|185340|
|29-1024| Prosthodontists| 380|169360|
|29-1061| Anesthesiologists| 31030|192780|
|29-1062|Family and genera...| 113250|153640|
|29-1063| Internists, general| 46260|167270|
|29-1064|Obstetricians and...| 21340|183600|
|29-1067| Surgeons| 50260|191410|
|29-1069|Physicians and su...| 237400|155150|
+-------+--------------------+---------+------+
8. Create the DataFrame df_09 by joining df_07 and df_08, retaining only the code and description columns.
+-------+--------------------+
| code| description|
+-------+--------------------+
|00-0000| All Occupations|
|11-0000|Management occupa...|
|11-1011| Chief executives|
|11-1021|General and opera...|
|11-1031| Legislators|
|11-2011|Advertising and p...|
|11-2021| Marketing managers|
|11-2022| Sales managers|
|11-2031|Public relations ...|
|11-3011|Administrative se...|
|11-3021|Computer and info...|
|11-3031| Financial managers|
|11-3041|Compensation and ...|
|11-3042|Training and deve...|
|11-3049|Human resources m...|
|11-3051|Industrial produc...|
|11-3061| Purchasing managers|
|11-3071|Transportation, s...|
|11-9011|Farm, ranch, and ...|
|11-9012|Farmers and ranchers|
+-------+--------------------+
scala> df_09.write.saveAsTable("sample_09")
18 | Spark Guide
Developing Spark Applications
The equivalent program in Python, that you could submit using spark-submit, would be:
if __name__ == "__main__":
df_09.show()
df_09.write.saveAsTable("sample_09")
tbls = sqlContext.sql("show tables")
tbls.show()
Instead of displaying the tables using Beeline, the show tables query is run using the Spark SQL API.
Performance and Storage Considerations for Spark SQL DROP TABLE PURGE
The PURGE clause in the Hive DROP TABLE statement causes the underlying data files to be removed immediately,
without being transferred into a temporary holding area (the HDFS trashcan).
Although the PURGE clause is recognized by the Spark SQL DROP TABLE statement, this clause is currently not passed
along to the Hive statement that performs the “drop table” operation behind the scenes. Therefore, if you know the
PURGE behavior is important in your application for performance, storage, or security reasons, do the DROP TABLE
directly in Hive, for example through the beeline shell, rather than through Spark SQL.
The immediate deletion aspect of the PURGE clause could be significant in cases such as:
• If the cluster is running low on storage space and it is important to free space immediately, rather than waiting
for the HDFS trashcan to be periodically emptied.
• If the underlying data files reside on the Amazon S3 filesystem. Moving files to the HDFS trashcan from S3 involves
physically copying the files, meaning that the default DROP TABLE behavior on S3 involves significant performance
overhead.
• If the underlying data files contain sensitive information and it is important to remove them entirely, rather than
leaving them to be cleaned up by the periodic emptying of the trashcan.
• If restrictions on HDFS encryption zones prevent files from being moved to the HDFS trashcan. This restriction
primarily applies to CDH 5.7 and lower. With CDH 5.8 and higher, each HDFS encryption zone has its own HDFS
trashcan, so the normal DROP TABLE behavior works correctly without the PURGE clause.
Spark Guide | 19
Developing Spark Applications
$ wget --no-check-certificate \
https://raw.githubusercontent.com/apache/spark/branch-1.6/data/mllib/sample_movielens_data.txt
$ hdfs dfs -copyFromLocal sample_movielens_data.txt /user/hdfs
2. Run the Spark MLlib MovieLens example application, which calculates recommendations based on movie reviews:
20 | Spark Guide
Developing Spark Applications
com.github.fommil.netlib.NativeRefLAPACK
Test RMSE = 1.5378651281107205.
You see this on a system with no libgfortran. The same error occurs after installing libgfortran on RHEL 6
because it installs version 4.4, not 4.6+.
After installing libgfortran 4.8 on RHEL 7, you should see something like this:
<property>
<name>fs.s3a.access.key</name>
<value>...</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
Spark Guide | 21
Developing Spark Applications
<value>...</value>
</property>
sc.hadoopConfiguration.set("fs.s3a.access.key", "...")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "...")
22 | Spark Guide
Developing Spark Applications
scala> df_07.write.parquet("s3a://s3-to-ec2/sample_07.parquet")
sqlContext.setConf("spark.sql.avro.compression.codec","codec")
Spark Guide | 23
Developing Spark Applications
The supported codec values are uncompressed, snappy, and deflate. Specify the level to use with deflate
compression in spark.sql.avro.deflate.level. For an example, see Figure 6: Writing Deflate Compressed
Records on page 25.
Spark SQL
You can write SQL queries to query an Avro file. You must first create a temporary table and then query it:
24 | Spark Guide
Developing Spark Applications
Limitations
Because Spark is converting data types, keep the following in mind:
• Enumerated types are erased - Avro enumerated types become strings when they are read into Spark because
Spark does not support enumerated types.
• Unions on output - Spark writes everything as unions of the given type along with a null option.
• Avro schema changes - Spark reads everything into an internal representation. Even if you just read and then write
the data, the schema for the output is different.
• Spark schema reordering - Spark reorders the elements in its schema when writing them to disk so that the
elements being partitioned on are the last elements. For an example, see Figure 7: Writing Partitioned Data on
page 26.
API Examples
This section provides examples of using the spark-avro API in all supported languages.
Scala Examples
The easiest way to work with Avro data files in Spark applications is by using the DataFrame API. The spark-avro
library includes avro methods in SQLContext for reading and writing Avro files:
import com.databricks.spark.avro._
import com.databricks.spark.avro._
import com.databricks.spark.avro._
Spark Guide | 25
Developing Spark Applications
import com.databricks.spark.avro._
import sqlContext.implicits._
val df = Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0)).toDF("year", "month", "title", "rating")
import com.databricks.spark.avro._
df.printSchema()
df.filter("year = 2011").collect().foreach(println)
This code automatically detects the partitioned data and joins it all, so it is treated the same as unpartitioned data.
This also queries only the directory required, to decrease disk I/O.
root
|-- title: string (nullable = true)
|-- rating: double (nullable = true)
|-- year: integer (nullable = true)
|-- month: integer (nullable = true)
[Git,2.0,2011,7]
import com.databricks.spark.avro._
26 | Spark Guide
Developing Spark Applications
df.write.options(parameters).avro("output dir")
Java Example
Use the DataFrame API to query Avro files in Java. This example is almost identical to Figure 5: Scala Example with
Format on page 25:
import org.apache.spark.sql.*;
Python Example
Use the DataFrame API to query Avro files in Python. This example is almost identical to Figure 5: Scala Example with
Format on page 25:
sqlContext.setConf("spark.sql.parquet.compression.codec","codec")
The supported codec values are: uncompressed, gzip, lzo, and snappy. The default is gzip.
For an example of writing Parquet files to Amazon S3, see Reading and Writing Data Sources From and To Amazon S3
on page 22.
Spark Guide | 27
Developing Spark Applications
Building Applications
Follow these best practices when building Spark Scala and Java applications:
• Compile against the same version of Spark that you are running.
• Build a single assembly JAR ("Uber" JAR) that includes all dependencies. In Maven, add the Maven assembly plug-in
to build a JAR containing all dependencies:
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
This plug-in manages the merge procedure for all available JAR files during the build. Exclude Spark, Hadoop, and
Kafka (CDH 5.5 and higher) classes from the assembly JAR, because they are already available on the cluster and
contained in the runtime classpath. In Maven, specify Spark, Hadoop, and Kafka dependencies with scope provided.
For example:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.0-cdh5.5.0</version>
<scope>provided</scope>
</dependency>
$ mkdir libs
$ cd libs
28 | Spark Guide
Developing Spark Applications
$ curl http://lizier.me/joseph/software/jidt/download.php?file=infodynamics-dist-1.3.zip
> infodynamics-dist.1.3.zip
$ unzip infodynamics-dist-1.3.zip
$ mvn deploy:deploy-file \
-Durl=file:///HOME/.m2/repository -Dfile=libs/infodynamics.jar \
-DgroupId=org.jlizier.infodynamics -DartifactId=infodynamics -Dpackaging=jar -Dversion=1.3
<dependency>
<groupId>org.jlizier.infodynamics</groupId>
<artifactId>infodynamics</artifactId>
<version>1.3</version>
</dependency>
6. Add the Maven assembly plug-in to the plugins section in the pom.xml file.
7. Package the library JARs in a module:
2. In the Environment tab of the Spark Web UI application (http://driver_host:4040/environment/), validate that
the spark.jars property contains the library. For example:
Spark Guide | 29
Developing Spark Applications
3. In the Spark shell, test that you can import some of the required Java classes from the third-party library. For
example, if you use the JIDT library, import MatrixUtils:
$ spark-shell
...
scala> import infodynamics.utils.MatrixUtils;
spark-submit \
--class com.cloudera.example.YarnExample \
--master yarn \
--deploy-mode cluster \
--conf "spark.eventLog.dir=hdfs:///user/spark/eventlog" \
lib/yarn-example.jar \
10
– Python:
spark.master spark://mysparkmaster.acme.com:7077
spark.eventLog.enabled true
spark.eventLog.dir hdfs:///user/spark/eventlog
# Set spark executor memory
30 | Spark Guide
Developing Spark Applications
spark.executor.memory 2g
spark.logConf true
Cloudera recommends placing configuration properties that you want to use for every application in
spark-defaults.conf. See Application Properties for more information.
Important:
• If you use Cloudera Manager, do not use these command-line instructions.
• This information applies specifically to CDH 5.7.x. If you use a lower version of CDH, see the
documentation for that version located at Cloudera Documentation.
To configure properties for all Spark applications using the command line, edit the file
SPARK_HOME/conf/spark-defaults.conf.
Spark Guide | 31
Developing Spark Applications
Important:
• If you use Cloudera Manager, do not use these command-line instructions.
• This information applies specifically to CDH 5.7.x. If you use a lower version of CDH, see the
documentation for that version located at Cloudera Documentation.
To specify logging properties for all users on a machine by using the command line, edit the file
SPARK_HOME/conf/log4j.properties. To set it just for yourself or for a specific application, copy
SPARK_HOME/conf/log4j.properties.template to log4j.properties in your working directory or any
directory in your application's classpath.
32 | Spark Guide
Running Spark Applications
spark-submit Syntax
Example: Running SparkPi on YARN on page 38 demonstrates how to run one of the sample applications, SparkPi,
packaged with Spark. It computes an approximation to the value of pi.
Option Description
application jar Path to a JAR file containing a Spark application and all dependencies. The
path must be globally visible inside your cluster; see Advanced Dependency
Management.
python file Path to a Python file containing a Spark application. The path must be globally
visible inside your cluster; see Advanced Dependency Management.
application arguments Arguments to pass to the main method of your main class.
Spark Guide | 33
Running Spark Applications
spark-submit Options
You specify spark-submit options using the form --optionvalue instead of --option=value. (Use a space instead
of an equals sign.)
Option Description
class For Java and Scala applications, the fully qualified classname of the class
containing the main method of the application. For example,
org.apache.spark.examples.SparkPi.
conf Spark configuration property in key=value format. For values that contain
spaces, surround "key=value" with quotes (as shown).
deploy-mode Deployment mode: cluster and client. In cluster mode, the driver runs on
worker hosts. In client mode, the driver runs locally as an external client. Use
cluster mode with production jobs; client mode is more appropriate for
interactive and debugging uses, where you want to see your application output
immediately. To see the effect of the deployment mode when running on
YARN, see Deployment Modes on page 35.
Default: client.
driver-class-path Configuration and classpath entries to pass to the driver. JARs added with
--jars are automatically included in the classpath.
driver-cores Number of cores used by the driver in cluster mode.
Default: 1.
driver-memory Maximum heap size (represented as a JVM string; for example 1024m, 2g,
and so on) to allocate to the driver. Alternatively, you can use the
spark.driver.memory property.
Master Description
local Run Spark locally with one worker thread (that is, no parallelism).
34 | Spark Guide
Running Spark Applications
Master Description
local[K] Run Spark locally with K worker threads. (Ideally, set this to the number of
cores on your host.)
local[*] Run Spark locally with as many worker threads as logical cores on your host.
spark://host:port Run using the Spark Standalone cluster manager with the Spark Master on
the specified host and port (7077 by default).
yarn Run using a YARN cluster manager. The cluster location is determined by
HADOOP_CONF_DIR or YARN_CONF_DIR. See Configuring the Environment
on page 37.
Deployment Modes
In YARN, each application instance has an ApplicationMaster process, which is the first container started for that
application. The application is responsible for requesting resources from the ResourceManager. Once the resources
are allocated, the application instructs NodeManagers to start containers on its behalf. ApplicationMasters eliminate
the need for an active client: the process starting the application can terminate, and coordination continues from a
process managed by YARN running on the cluster.
For the option to specify the deployment mode, see spark-submit Options on page 34.
Spark Guide | 35
Running Spark Applications
Cluster mode is not well suited to using Spark interactively. Spark applications that require user input, such as
spark-shell and pyspark, require the Spark driver to run inside the client process that initiates the Spark application.
36 | Spark Guide
Running Spark Applications
Spark Guide | 37
Running Spark Applications
If you are using a Cloudera Manager deployment, these properties are configured automatically.
The command prints status until the job finishes or you press control-C. Terminating the spark-submit process
in cluster mode does not terminate the Spark application as it does in client mode. To monitor the status of the running
application, run yarn application -list.
38 | Spark Guide
Running Spark Applications
Option Description
archives Comma-separated list of archives to be extracted into the working directory
of each executor. The path must be globally visible inside your cluster; see
Advanced Dependency Management.
executor-cores Number of processor cores to allocate on each executor. Alternatively, you
can use the spark.executor.cores property.
executor-memory Maximum heap size to allocate to each executor. Alternatively, you can use
the spark.executor.memory property.
num-executors Total number of YARN containers to allocate for this application. Alternatively,
you can use the spark.executor.instances property.
queue YARN queue to submit to. For more information, see Assigning Applications
and Queries to Resource Pools.
Default: default.
During initial installation, Cloudera Manager tunes properties according to your cluster environment.
In addition to the command-line options, the following properties are available:
Property Description
spark.yarn.driver.memoryOverhead Amount of extra off-heap memory that can be requested from YARN per
driver. Combined with spark.driver.memory, this is the total memory that
YARN can use to create a JVM for a driver process.
spark.yarn.executor.memoryOverhead Amount of extra off-heap memory that can be requested from YARN, per
executor process. Combined with spark.executor.memory, this is the total
memory YARN can use to create a JVM for an executor process.
Dynamic Allocation
Dynamic allocation allows Spark to dynamically scale the cluster resources allocated to your application based on the
workload. When dynamic allocation is enabled and a Spark application has a backlog of pending tasks, it can request
executors. When the application becomes idle, its executors are released and can be acquired by other applications.
Starting with CDH 5.5, dynamic allocation is enabled by default. Table 5: Dynamic Allocation Properties on page 40
describes properties to control dynamic allocation.
If you set spark.dynamicAllocation.enabled to false or use the --num-executors command-line argument
or set the spark.executor.instances property when running a Spark application, dynamic allocation is disabled.
For more information on how dynamic allocation works, see resource allocation policy.
When Spark dynamic resource allocation is enabled, all resources are allocated to the first submitted job available
causing subsequent applications to be queued up. To allow applications to acquire resources in parallel, allocate
Spark Guide | 39
Running Spark Applications
resources to pools and run the applications in those pools and enable applications running in pools to be preempted.
See Dynamic Resource Pools.
If you are using Spark Streaming, see the recommendation in Spark Streaming and Dynamic Allocation on page 13.
Property Description
spark.dynamicAllocation.executorIdleTimeout The length of time executor must be idle before it is removed.
Default: 60 s.
Default: true.
spark.dynamicAllocation.initialExecutors The initial number of executors for a Spark application when dynamic allocation
is enabled.
Default: 1.
Default: 0.
Default: Integer.MAX_VALUE.
spark.dynamicAllocation.schedulerBacklogTimeout The length of time pending tasks must be backlogged before new executors
are requested.
Default: 1 s.
You must manually upload the JAR each time you upgrade Spark to a new minor CDH release.
2. Set spark.yarn.jar to the HDFS path:
spark.yarn.jar=hdfs://namenode:8020/user/spark/share/lib/spark-assembly.jar
Using PySpark
Apache Spark provides APIs in non-JVM languages such as Python. Many data scientists use Python because it has a
rich variety of numerical libraries with a statistical, machine-learning, or optimization focus.
40 | Spark Guide
Running Spark Applications
Self-Contained Dependencies
In a common situation, a custom Python package contains functionality you want to apply to each element of an RDD.
You can use a map() function call to make sure that each Spark executor imports the required package, before calling
any of the functions inside that package. The following shows a simple example:
def import_my_special_package(x):
import my.special.package
return x
int_rdd = sc.parallelize([1, 2, 3, 4])
int_rdd.map(lambda x: import_my_special_package(x))
int_rdd.collect()
You create a simple RDD of four elements and call it int_rdd. Then you apply the function
import_my_special_package to every element of the int_rdd. This function imports my.special.package and
then returns the original argument passed to it. Calling this function as part of a map() operation ensures that each
Spark executor imports my.special.package when needed.
If you only need a single file inside my.special.package, you can direct Spark to make this available to all executors
by using the --py-files option in your spark-submit command and specifying the local path to the file. You can
also specify this programmatically by using the sc.addPyFiles() function. If you use functionality from a package
that spans multiple files, you can make an egg for the package, because the --py-files flag also accepts a path to
an egg file.
If you have a self-contained dependency, you can make the required Python dependency available to your executors
in two ways:
• If you depend on only a single file, you can use the --py-files command-line option, or programmatically add
the file to the SparkContext with sc.addPyFiles(path) and specify the local path to that Python file.
• If you have a dependency on a self-contained module (a module with no other dependencies), you can create an
egg or zip file of that module and use either the --py-files command-line option or programmatically add the
module to theSparkContext with sc.addPyFiles(path) and specify the local path to that egg or zip file.
Complex Dependencies
Some operations rely on complex packages that also have many dependencies. For example, the following code snippet
imports the Python pandas data analysis library:
def import_pandas(x):
import pandas
return x
pandas depends on NumPy, SciPy, and many other packages. Although pandas is too complex to distribute as a *.py
file, you can create an egg for it and its dependencies and send that to executors.
Spark Guide | 41
Running Spark Applications
# Install python-devel:
yum install python-devel
# Install non-Python dependencies required by SciPy that are not installed by default:
yum install atlas atlas-devel lapack-devel blas-devel
# install virtualenv:
pip install virtualenv
42 | Spark Guide
Running Spark Applications
if [ -z "${PYSPARK_PYTHON}" ]; then
export PYSPARK_PYTHON=
fi
Important:
Cloudera does not support IPython or Jupyter notebooks on CDH. The instructions that were formerly
here have been removed to avoid confusion about the support status of these components.
Shuffle Overview
A Spark dataset comprises a fixed number of partitions, each of which comprises a number of records. For the datasets
returned by narrow transformations, such as map and filter, the records required to compute the records in a single
partition reside in a single partition in the parent dataset. Each object is only dependent on a single object in the parent.
Operations such as coalesce can result in a task processing multiple input partitions, but the transformation is still
considered narrow because the input records used to compute any single output record can still only reside in a limited
subset of the partitions.
Spark also supports transformations with wide dependencies, such as groupByKey and reduceByKey. In these
dependencies, the data required to compute the records in a single partition can reside in many partitions of the parent
dataset. To perform these transformations, all of the tuples with the same key must end up in the same partition,
processed by the same task. To satisfy this requirement, Spark performs a shuffle, which transfers data around the
cluster and results in a new stage with a new set of partitions.
Spark Guide | 43
Running Spark Applications
sc.textFile("someFile.txt").map(mapFunc).flatMap(flatMapFunc).filter(filterFunc).count()
It runs a single action, count, which depends on a sequence of three transformations on a dataset derived from a text
file. This code runs in a single stage, because none of the outputs of these three transformations depend on data that
comes from different partitions than their inputs.
In contrast, this Scala code finds how many times each character appears in all the words that appear more than 1,000
times in a text file:
This example has three stages. The two reduceByKey transformations each trigger stage boundaries, because computing
their outputs requires repartitioning the data by keys.
A final example is this more complicated transformation graph, which includes a join transformation with multiple
dependencies:
The pink boxes show the resulting stage graph used to run it:
At each stage boundary, data is written to disk by tasks in the parent stages and then fetched over the network by
tasks in the child stage. Because they incur high disk and network I/O, stage boundaries can be expensive and should
be avoided when possible. The number of data partitions in a parent stage may be different than the number of
partitions in a child stage. Transformations that can trigger a stage boundary typically accept a numPartitions
argument, which specifies into how many partitions to split the data in the child stage. Just as the number of reducers
is an important parameter in MapReduce jobs, the number of partitions at stage boundaries can determine an
application's performance. Tuning the Number of Partitions on page 48 describes how to tune this number.
44 | Spark Guide
Running Spark Applications
This results in unnecessary object creation because a new set must be allocated for each record.
Instead, use aggregateByKey, which performs the map-side aggregation more efficiently:
• flatMap-join-groupBy. When two datasets are already grouped by key and you want to join them and keep
them grouped, use cogroup. This avoids the overhead associated with unpacking and repacking the groups.
rdd1 = someRdd.reduceByKey(...)
rdd2 = someOtherRdd.reduceByKey(...)
rdd3 = rdd1.join(rdd2)
Because no partitioner is passed to reduceByKey, the default partitioner is used, resulting in rdd1 and rdd2 both
being hash-partitioned. These two reduceByKey transformations result in two shuffles. If the datasets have the same
number of partitions, a join requires no additional shuffling. Because the datasets are partitioned identically, the set
of keys in any single partition of rdd1 can only occur in a single partition of rdd2. Therefore, the contents of any single
output partition of rdd3 depends only on the contents of a single partition in rdd1 and single partition in rdd2, and
a third shuffle is not required.
For example, if someRdd has four partitions, someOtherRdd has two partitions, and both the reduceByKeys use
three partitions, the set of tasks that run would look like this:
Spark Guide | 45
Running Spark Applications
If rdd1 and rdd2 use different partitioners or use the default (hash) partitioner with different numbers of partitions,
only one of the datasets (the one with the fewer number of partitions) needs to be reshuffled for the join:
To avoid shuffles when joining two datasets, you can use broadcast variables. When one of the datasets is small enough
to fit in memory in a single executor, it can be loaded into a hash table on the driver and then broadcast to every
executor. A map transformation can then reference the hash table to do lookups.
46 | Spark Guide
Running Spark Applications
while not generating enough partitions to use all available cores. In this case, invoking repartition with a high number
of partitions (which triggers a shuffle) after loading the data allows the transformations that follow to use more of the
cluster's CPU.
Another example arises when using the reduce or aggregate action to aggregate data into the driver. When
aggregating over a high number of partitions, the computation can quickly become bottlenecked on a single thread in
the driver merging all the results together. To lighten the load on the driver, first use reduceByKey or aggregateByKey
to perform a round of distributed aggregation that divides the dataset into a smaller number of partitions. The values
in each partition are merged with each other in parallel, before being sent to the driver for a final round of aggregation.
See treeReduce and treeAggregate for examples of how to do that.
This method is especially useful when the aggregation is already grouped by a key. For example, consider an application
that counts the occurrences of each word in a corpus and pulls the results into the driver as a map. One approach,
which can be accomplished with the aggregate action, is to compute a local map at each partition and then merge
the maps at the driver. The alternative approach, which can be accomplished with aggregateByKey, is to perform
the count in a fully distributed way, and then simply collectAsMap the results to the driver.
Secondary Sort
The repartitionAndSortWithinPartitions transformation repartitions the dataset according to a partitioner
and, within each resulting partition, sorts records by their keys. This transformation pushes sorting down into the
shuffle machinery, where large amounts of data can be spilled efficiently and sorting can be combined with other
operations.
For example, Apache Hive on Spark uses this transformation inside its join implementation. It also acts as a vital
building block in the secondary sort pattern, in which you group records by key and then, when iterating over the
values that correspond to a key, have them appear in a particular order. This scenario occurs in algorithms that need
to group events by user and then analyze the events for each user, based on the time they occurred.
Spark Guide | 47
Running Spark Applications
• The --executor-memory/spark.executor.memory property controls the executor heap size, but executors
can also use some memory off heap, for example, Java NIO direct buffers. The value of the
spark.yarn.executor.memoryOverhead property is added to the executor memory to determine the full
memory request to YARN for each executor. It defaults to max(384, .07 * spark.executor.memory).
• YARN may round the requested memory up slightly. The yarn.scheduler.minimum-allocation-mb and
yarn.scheduler.increment-allocation-mb properties control the minimum and increment request values,
respectively.
The following diagram (not to scale with defaults) shows the hierarchy of memory properties in Spark and YARN:
48 | Spark Guide
Running Spark Applications
As described in Spark Execution Model on page 8, Spark groups datasets into stages. The number of tasks in a stage
is the same as the number of partitions in the last dataset in the stage. The number of partitions in a dataset is the
same as the number of partitions in the datasets on which it depends, with the following exceptions:
• The coalesce transformation creates a dataset with fewer partitions than its parent dataset.
• The union transformation creates a dataset with the sum of its parents' number of partitions.
• The cartesian transformation creates a dataset with the product of its parents' number of partitions.
Datasets with no parents, such as those produced by textFile or hadoopFile, have their partitions determined by
the underlying MapReduce InputFormat used. Typically, there is a partition for each HDFS block being read. The
number of partitions for datasets produced by parallelize are specified in the method, or
spark.default.parallelism if not specified. To determine the number of partitions in an dataset, call
rdd.partitions().size().
If the number of tasks is smaller than number of slots available to run them, CPU usage is suboptimal. In addition, more
memory is used by any aggregation operations that occur in each task. In join, cogroup, or *ByKey operations,
objects are held in hashmaps or in-memory buffers to group or sort. join, cogroup, and groupByKey use these data
structures in the tasks for the stages that are on the fetching side of the shuffles they trigger. reduceByKey and
aggregateByKey use data structures in the tasks for the stages on both sides of the shuffles they trigger. If the records
in these aggregation operations exceed memory, the following issues can occur:
• Increased garbage collection, which can lead to pauses in computation.
• Spilling data to disk, causing disk I/O and sorting, which leads to job stalls.
To increase the number of partitions if the stage is reading from Hadoop:
• Use the repartition transformation, which triggers a shuffle.
• Configure your InputFormat to create more splits.
• Write the input data to HDFS with a smaller block size.
If the stage is receiving input from another stage, the transformation that triggered the stage boundary accepts a
numPartitions argument:
Determining the optimal value for X requires experimentation. Find the number of partitions in the parent dataset,
and then multiply that by 1.5 until performance stops improving.
You can also calculate X using a formula, but some quantities in the formula are difficult to calculate. The main goal is
to run enough tasks so that the data destined for each task fits in the memory available to that task. The memory
available to each task is:
The in-memory size of the total shuffle data is more difficult to determine. The closest heuristic is to find the ratio
between shuffle spill memory and the shuffle spill disk for a stage that ran. Then, multiply the total shuffle write by
this number. However, this can be compounded if the stage is performing a reduction:
Then, round up slightly, because too many partitions is usually better than too few.
When in doubt, err on the side of a larger number of tasks (and thus partitions). This contrasts with recommendations
for MapReduce, which unlike Spark, has a high startup overhead for tasks.
Spark Guide | 49
Running Spark Applications
50 | Spark Guide
Spark and Hadoop Integration
Spark Guide | 51
Spark and Hadoop Integration
In CDH 5.4, to enable dynamic allocation when running the action, specify the following in the Oozie workflow:
<spark-opts>--conf spark.dynamicAllocation.enabled=true
--conf spark.shuffle.service.enabled=true
--conf spark.dynamicAllocation.minExecutors=1
</spark-opts>
If you have enabled the shuffle service in Cloudera Manager, you do not need to specify
spark.shuffle.service.enabled.
<dependency>
<groupId>org.apache.crunch</groupId>
<artifactId>crunch-core</artifactId>
<version>${crunch.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.crunch</groupId>
<artifactId>crunch-spark</artifactId>
<version>${crunch.version}</version>
<scope>provided</scope>
</dependency>
2. Use SparkPipeline where you would have used MRPipeline in the declaration of your Crunch pipeline.
SparkPipeline takes either a String that contains the connection string for the Spark master (local for local
mode, yarn for YARN) or a JavaSparkContext instance.
3. As you would for a Spark application, use spark-submit start the pipeline with your Crunch application
app-jar-with-dependencies.jar file.
For an example, see Crunch demo. After building the example, run with the following command:
52 | Spark Guide