Examples with practical guide for pyspark
Examples with practical guide for pyspark
1 documentation
pyspark.sql module
Module Context
Important classes of Spark SQL and DataFrames:
A SparkSession can be used create DataFrame, register DataFrame as tables, execute SQL over
tables, cache tables, and read parquet files. To create a SparkSession, use the following builder
pattern:
builder
A class attribute having a Builder to construct SparkSession instances
appName(name) [source]
Sets a name for the application, which will be shown in the Spark web UI.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 1/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
enableHiveSupport() [source]
Enables Hive support, including connectivity to a persistent Hive metastore, support for
Hive serdes, and Hive user-defined functions.
getOrCreate() [source]
Gets an existing SparkSession or, if there is no existing one, creates a new one based
on the options set in this builder.
This method first checks whether there is a valid global default SparkSession, and if yes,
return that one. If no valid global default SparkSession exists, the method creates a new
SparkSession and assigns the newly created SparkSession as the global default.
In case an existing SparkSession is returned, the config options specified in this builder
will be applied to the existing SparkSession.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 2/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
master(master) [source]
Sets the Spark master URL to connect to, such as “local” to run locally, “local[4]” to run
locally with 4 cores, or “spark://master:7077” to run on a Spark standalone cluster.
catalog
Interface through which the user may create, drop, alter or query underlying databases,
tables, functions etc.
Returns: Catalog
conf
Runtime configuration interface for Spark.
This is the interface through which the user can get and set all Spark and Hadoop
configurations that are relevant to Spark SQL. When getting the value of a config, this defaults
to the value set in the underlying SparkContext, if any.
When schema is a list of column names, the type of each column will be inferred from data.
When schema is None, it will try to infer the schema (column names and types) from data,
which should be an RDD of Row, or namedtuple, or dict.
If schema inference is needed, samplingRatio is used to determined the ratio of rows used
for schema inference. The first row will be used if samplingRatio is None.
Parameters: data – an RDD of any kind of SQL data representation(e.g. row, tuple, int,
boolean, etc.), or list, or pandas.DataFrame.
schema – a pyspark.sql.types.DataType or a datatype string or a list
of column names, default is None. The data type string format equals to
pyspark.sql.types.DataType.simpleString, except that top level
struct type can omit the struct<> and atomic types use typeName() as
their format, e.g. use byte instead of tinyint for
pyspark.sql.types.ByteType. We can also use int as a short name for
IntegerType.
samplingRatio – the sample ratio of rows used for inferring
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 3/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
>>> spark.createDataFrame(df.toPandas()).collect()
[Row(name='Alice', age=1)]
>>> spark.createDataFrame(pandas.DataFrame([[1, 2]])).collect()
[Row(0=1, 1=2)]
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 4/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
newSession() [source]
Returns a new SparkSession as new session, that has separate SQLConf, registered
temporary views and UDFs, but shared SparkContext and table cache.
>>> spark.range(3).collect()
[Row(id=0), Row(id=1), Row(id=2)]
read
Returns a DataFrameReader that can be used to read data in as a DataFrame.
Returns: DataFrameReader
readStream
Returns a DataStreamReader that can be used to read data streams as a streaming
DataFrame.
Note: Evolving.
Returns: DataStreamReader
sparkContext
Returns the underlying SparkContext.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 5/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
sql(sqlQuery) [source]
Returns a DataFrame representing the result of the given query.
Returns: DataFrame
>>> df.createOrReplaceTempView("table1")
>>> df2 = spark.sql("SELECT field1 AS f1, field2 as f2 from table1")
>>> df2.collect()
[Row(f1=1, f2='row1'), Row(f1=2, f2='row2'), Row(f1=3, f2='row3')]
stop() [source]
Stop the underlying SparkContext.
streams
Returns a StreamingQueryManager that allows managing all the StreamingQuery
StreamingQueries active on this context.
Note: Evolving.
Returns: StreamingQueryManager
table(tableName) [source]
Returns the specified table as a DataFrame.
Returns: DataFrame
>>> df.createOrReplaceTempView("table1")
>>> df2 = spark.table("table1")
>>> sorted(df.collect()) == sorted(df2.collect())
True
udf
Returns a UDFRegistration for UDF registration.
Returns: UDFRegistration
version
The version of Spark on which this application is running.
The entry point for working with structured data (rows and columns) in Spark, in Spark 1.x.
As of Spark 2.0, this is replaced by SparkSession. However, we are keeping the class here for
backward compatibility.
A SQLContext can be used create DataFrame, register DataFrame as tables, execute SQL over
tables, cache tables, and read parquet files.
cacheTable(tableName) [source]
Caches the specified table in-memory.
clearCache() [source]
Removes all cached tables from the in-memory cache.
When schema is a list of column names, the type of each column will be inferred from data.
When schema is None, it will try to infer the schema (column names and types) from data,
which should be an RDD of Row, or namedtuple, or dict.
If schema inference is needed, samplingRatio is used to determined the ratio of rows used
for schema inference. The first row will be used if samplingRatio is None.
Parameters: data – an RDD of any kind of SQL data representation(e.g. Row, tuple,
int, boolean, etc.), or list, or pandas.DataFrame.
schema – a pyspark.sql.types.DataType or a datatype string or a list
of column names, default is None. The data type string format equals to
pyspark.sql.types.DataType.simpleString, except that top level
struct type can omit the struct<> and atomic types use typeName() as
their format, e.g. use byte instead of tinyint for
pyspark.sql.types.ByteType. We can also use int as a short name for
pyspark.sql.types.IntegerType.
samplingRatio – the sample ratio of rows used for inferring
verifySchema – verify data types of every row against schema.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 7/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
Returns: DataFrame
>>> sqlContext.createDataFrame(df.toPandas()).collect()
[Row(name='Alice', age=1)]
>>> sqlContext.createDataFrame(pandas.DataFrame([[1, 2]])).collect()
[Row(0=1, 1=2)]
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 8/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
The data source is specified by the source and a set of options. If source is not specified,
the default data source configured by spark.sql.sources.default will be used.
Optionally, a schema can be provided as the schema of the returned DataFrame and created
external table.
Returns: DataFrame
dropTempTable(tableName) [source]
Remove the temp table from catalog.
If the key is not set and defaultValue is set, return defaultValue. If the key is not set and
defaultValue is not set, return the system default value.
>>> sqlContext.getConf("spark.sql.shuffle.partitions")
'200'
>>> sqlContext.getConf("spark.sql.shuffle.partitions", u"10")
'10'
>>> sqlContext.setConf("spark.sql.shuffle.partitions", u"50")
>>> sqlContext.getConf("spark.sql.shuffle.partitions", u"10")
'50'
Parameters: sc – SparkContext
newSession() [source]
Returns a new SQLContext as new session, that has separate SQLConf, registered
temporary views and UDFs, but shared SparkContext and table cache.
>>> sqlContext.range(3).collect()
[Row(id=0), Row(id=1), Row(id=2)]
read
Returns a DataFrameReader that can be used to read data in as a DataFrame.
Returns: DataFrameReader
readStream
Returns a DataStreamReader that can be used to read data streams as a streaming
DataFrame.
Note: Evolving.
Returns: DataStreamReader
Temporary tables exist only during the lifetime of this instance of SQLContext.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 10/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
sql(sqlQuery) [source]
Returns a DataFrame representing the result of the given query.
Returns: DataFrame
streams
Returns a StreamingQueryManager that allows managing all the StreamingQuery
StreamingQueries active on this context.
Note: Evolving.
table(tableName) [source]
Returns the specified table or view as a DataFrame.
Returns: DataFrame
tableNames(dbName=None) [source]
Returns a list of names of tables in the database dbName.
Parameters: dbName – string, name of the database to use. Default to the current
database.
Returns: list of table names, in string
tables(dbName=None) [source]
Returns a DataFrame containing names of tables in the given database.
The returned DataFrame has two columns: tableName and isTemporary (a column with
BooleanType indicating if a table is a temporary one or not).
udf
Returns a UDFRegistration for UDF registration.
Returns: UDFRegistration
uncacheTable(tableName) [source]
Removes the specified table from the in-memory cache.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 12/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
Configuration for Hive is read from hive-site.xml on the classpath. It supports running both SQL
and HiveQL commands.
refreshTable(tableName) [source]
Invalidate and refresh all the cached the metadata of the given table. For performance
reasons, Spark SQL or the external data source library it uses might cache certain metadata
about a table, such as the location of blocks. When those change outside of Spark SQL, users
should call this function to invalidate the cache.
returnType can be optionally specified when f is a Python function but not when f is a user-
defined function. Please see below.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 13/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
Spark uses the return type of the given user-defined function as the return
type of the registered user-defined function. returnType should not be
specified. In this case, this API works as if register(name, f).
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 14/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
In addition to a name and the function itself, the return type can be optionally specified. When
the return type is not specified we would infer it via reflection.
>>> spark.udf.registerJavaFunction(
... "javaStringLength2", "test.org.apache.spark.sql.JavaStringLength")
>>> spark.sql("SELECT javaStringLength2('test')").collect()
[Row(UDF:javaStringLength2(test)=4)]
>>> spark.udf.registerJavaFunction(
... "javaStringLength3", "test.org.apache.spark.sql.JavaStringLength", "i
>>> spark.sql("SELECT javaStringLength3('test')").collect()
[Row(UDF:javaStringLength3(test)=4)]
A DataFrame is equivalent to a relational table in Spark SQL, and can be created using various
functions in SparkSession:
people = spark.read.parquet("...")
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 15/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
Once created, it can be manipulated using the various domain-specific-language (DSL) functions
defined in: DataFrame, Column.
To select a column from the data frame, use the apply method:
ageCol = people.age
agg(*exprs) [source]
Aggregate on the entire DataFrame without groups (shorthand for df.groupBy.agg()).
alias(alias) [source]
Returns a new DataFrame with an alias set.
The result of this algorithm has the following deterministic bound: If the DataFrame has N
elements and if we request the quantile at probability p up to error err, then the algorithm will
return a sample x from the DataFrame so that the exact rank of x is close to (p * N). More
precisely,
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 16/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
This method implements a variation of the Greenwald-Khanna algorithm (with some speed
optimizations). The algorithm was first present in [[http://dx.doi.org/10.1145/375663.375670
Space-efficient Online Computation of Quantile Summaries]] by Greenwald and Khanna.
Note that null values will be ignored in numerical columns before calculation. For columns
only containing null values, an empty list is returned.
Parameters: col – str, list. Can be a single column name, or a list of names for multiple
columns.
probabilities – a list of quantile probabilities Each number must belong to
[0, 1]. For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
relativeError – The relative target precision to achieve (>= 0). If set to
zero, the exact quantiles are computed, which could be very expensive.
Note that values greater than 1 are accepted but give the same result as
1.
Returns: the approximate quantiles at the given probabilities. If the input col is a
string, the output is a list of floats. If the input col is a list or tuple of strings,
the output is also a list, but each element in it is a list of floats, i.e., the output
is a list of list of floats.
cache() [source]
Persists the DataFrame with the default storage level (MEMORY_AND_DISK).
Note: The default storage level has changed to MEMORY_AND_DISK to match Scala in 2.0.
checkpoint(eager=True) [source]
Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the
logical plan of this DataFrame, which is especially useful in iterative algorithms where the plan
may grow exponentially. It will be saved to files inside the checkpoint directory set with
SparkContext.setCheckpointDir().
Note: Experimental
coalesce(numPartitions) [source]
Returns a new DataFrame that has exactly numPartitions partitions.
Similar to coalesce defined on an RDD, this operation results in a narrow dependency, e.g. if
you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of the
100 new partitions will claim 10 of the current partitions. If a larger number of partitions is
requested, it will stay at the current number of partitions.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 17/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
However, if you’re doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your
computation taking place on fewer nodes than you like (e.g. one node in the case of
numPartitions = 1). To avoid this, you can call repartition(). This will add a shuffle step, but
means the current upstream partitions will be executed in parallel (per whatever the current
partitioning is).
>>> df.coalesce(1).rdd.getNumPartitions()
1
colRegex(colName) [source]
Selects column based on the column name specified as a regex and returns it as Column.
collect() [source]
Returns all the records as a list of Row.
>>> df.collect()
[Row(age=2, name='Alice'), Row(age=5, name='Bob')]
columns
Returns all column names as a list.
>>> df.columns
['age', 'name']
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 18/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
count() [source]
Returns the number of rows in this DataFrame.
>>> df.count()
2
createGlobalTempView(name) [source]
Creates a global temporary view with this DataFrame.
The lifetime of this temporary view is tied to this Spark application. throws
TempTableAlreadyExistsException, if the view name already exists in the catalog.
>>> df.createGlobalTempView("people")
>>> df2 = spark.sql("select * from global_temp.people")
>>> sorted(df.collect()) == sorted(df2.collect())
True
>>> df.createGlobalTempView("people")
Traceback (most recent call last):
...
AnalysisException: u"Temporary table 'people' already exists;"
>>> spark.catalog.dropGlobalTempView("people")
createOrReplaceGlobalTempView(name) [source]
Creates or replaces a global temporary view using the given name.
>>> df.createOrReplaceGlobalTempView("people")
>>> df2 = df.filter(df.age > 3)
>>> df2.createOrReplaceGlobalTempView("people")
>>> df3 = spark.sql("select * from global_temp.people")
>>> sorted(df3.collect()) == sorted(df2.collect())
True
>>> spark.catalog.dropGlobalTempView("people")
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 19/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
createOrReplaceTempView(name) [source]
Creates or replaces a local temporary view with this DataFrame.
The lifetime of this temporary table is tied to the SparkSession that was used to create this
DataFrame.
>>> df.createOrReplaceTempView("people")
>>> df2 = df.filter(df.age > 3)
>>> df2.createOrReplaceTempView("people")
>>> df3 = spark.sql("select * from people")
>>> sorted(df3.collect()) == sorted(df2.collect())
True
>>> spark.catalog.dropTempView("people")
createTempView(name) [source]
Creates a local temporary view with this DataFrame.
The lifetime of this temporary table is tied to the SparkSession that was used to create this
DataFrame. throws TempTableAlreadyExistsException, if the view name already exists in
the catalog.
>>> df.createTempView("people")
>>> df2 = spark.sql("select * from people")
>>> sorted(df.collect()) == sorted(df2.collect())
True
>>> df.createTempView("people")
Traceback (most recent call last):
...
AnalysisException: u"Temporary table 'people' already exists;"
>>> spark.catalog.dropTempView("people")
crossJoin(other) [source]
Returns the cartesian product with another DataFrame.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 20/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
Computes a pair-wise frequency table of the given columns. Also known as a contingency
table. The number of distinct values for each column should be less than 1e4. At most 1e6
non-zero pair frequencies will be returned. The first column of each row will be the distinct
values of col1 and the column names will be the distinct values of col2. The name of the first
column will be $col1_$col2. Pairs that have no occurrences will have zero as their counts.
DataFrame.crosstab() and DataFrameStatFunctions.crosstab() are aliases.
Parameters: col1 – The name of the first column. Distinct items will make the first item
of each row.
col2 – The name of the second column. Distinct items will make the
column names of the DataFrame.
cube(*cols) [source]
Create a multi-dimensional cube for the current DataFrame using the specified columns, so
we can run aggregation on them.
describe(*cols) [source]
Computes basic statistics for numeric and string columns.
This include count, mean, stddev, min, and max. If no columns are given, this function
computes statistics for all numerical or string columns.
Note: This function is meant for exploratory data analysis, as we make no guarantee
about the backward compatibility of the schema of the resulting DataFrame.
>>> df.describe(['age']).show()
+-------+------------------+
|summary| age|
+-------+------------------+
| count| 2|
| mean| 3.5|
| stddev|2.1213203435596424|
| min| 2|
| max| 5|
+-------+------------------+
>>> df.describe().show()
+-------+------------------+-----+
|summary| age| name|
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 21/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
+-------+------------------+-----+
| count| 2| 2|
| mean| 3.5| null|
| stddev|2.1213203435596424| null|
| min| 2|Alice|
| max| 5| Bob|
+-------+------------------+-----+
Use summary for expanded statistics and control over which statistics to compute.
distinct() [source]
Returns a new DataFrame containing the distinct rows in this DataFrame.
>>> df.distinct().count()
2
drop(*cols) [source]
Returns a new DataFrame that drops the specified column. This is a no-op if schema doesn’t
contain the given column name(s).
Parameters: cols – a string name of the column to drop, or a Column to drop, or a list of
string name of the columns to drop.
>>> df.drop('age').collect()
[Row(name='Alice'), Row(name='Bob')]
>>> df.drop(df.age).collect()
[Row(name='Alice'), Row(name='Bob')]
dropDuplicates(subset=None) [source]
Return a new DataFrame with duplicate rows removed, optionally only considering certain
columns.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 22/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
For a static batch DataFrame, it just drops duplicate rows. For a streaming DataFrame, it will
keep all data across triggers as intermediate state to drop duplicates rows. You can use
withWatermark() to limit how late the duplicate data can be and system will accordingly limit
the state. In addition, too late data older than watermark will be dropped to avoid any
possibility of duplicates.
drop_duplicates(subset=None)
drop_duplicates() is an alias for dropDuplicates().
Parameters: how – ‘any’ or ‘all’. If ‘any’, drop a row if it contains any nulls. If ‘all’, drop a
row only if all its values are null.
thresh – int, default None If specified, drop rows that have less than
thresh non-null values. This overwrites the how parameter.
subset – optional list of column names to consider.
>>> df4.na.drop().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10| 80|Alice|
+---+------+-----+
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 23/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
dtypes
Returns all column names and their data types as a list.
>>> df.dtypes
[('age', 'int'), ('name', 'string')]
explain(extended=False) [source]
Prints the (logical and physical) plans to the console for debugging purpose.
Parameters: extended – boolean, default False. If False, prints only the physical plan.
>>> df.explain()
== Physical Plan ==
Scan ExistingRDD[age#0,name#1]
>>> df.explain(True)
== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
...
== Physical Plan ==
...
Parameters: value – int, long, float, string, bool or dict. Value to replace null values
with. If the value is a dict, then subset is ignored and value must be a
mapping from column name (string) to replacement value. The
replacement value must be an int, long, float, boolean, or string.
subset – optional list of column names to consider. Columns specified in
subset that do not have matching data type are ignored. For example, if
value is a string, and subset contains a non-string column, then the non-
string column is simply ignored.
>>> df4.na.fill(50).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10| 80|Alice|
| 5| 50| Bob|
| 50| 50| Tom|
| 50| 50| null|
+---+------+-----+
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 24/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
>>> df5.na.fill(False).show()
+----+-------+-----+
| age| name| spy|
+----+-------+-----+
| 10| Alice|false|
| 5| Bob|false|
|null|Mallory| true|
+----+-------+-----+
filter(condition) [source]
Filters rows using the given condition.
first() [source]
Returns the first row as a Row.
>>> df.first()
Row(age=2, name='Alice')
foreach(f) [source]
Applies the f function to all Row of this DataFrame.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 25/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
foreachPartition(f) [source]
Applies the f function to each partition of this DataFrame.
Note: This function is meant for exploratory data analysis, as we make no guarantee
about the backward compatibility of the schema of the resulting DataFrame.
Parameters: cols – Names of the columns to calculate frequent items for as a list or
tuple of strings.
support – The frequency with which to consider an item ‘frequent’. Default
is 1%. The support must be greater than 1e-4.
groupBy(*cols) [source]
Groups the DataFrame using the specified columns, so we can run aggregation on them. See
GroupedData for all the available aggregate functions.
Parameters: cols – list of columns to group by. Each element should be a column name
(string) or an expression (Column).
>>> df.groupBy().avg().collect()
[Row(avg(age)=3.5)]
>>> sorted(df.groupBy('name').agg({'age': 'mean'}).collect())
[Row(name='Alice', avg(age)=2.0), Row(name='Bob', avg(age)=5.0)]
>>> sorted(df.groupBy(df.name).avg().collect())
[Row(name='Alice', avg(age)=2.0), Row(name='Bob', avg(age)=5.0)]
>>> sorted(df.groupBy(['name', df.age]).count().collect())
[Row(name='Alice', age=2, count=1), Row(name='Bob', age=5, count=1)]
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 26/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
groupby(*cols)
groupby() is an alias for groupBy().
head(n=None) [source]
Returns the first n rows.
Note: This method should only be used if the resulting array is expected to be small, as
all the data is loaded into the driver’s memory.
>>> df.head()
Row(age=2, name='Alice')
>>> df.head(1)
[Row(age=2, name='Alice')]
intersect(other) [source]
Return a new DataFrame containing rows only in both this frame and another frame.
isLocal() [source]
Returns True if the collect() and take() methods can be run locally (without any Spark
executors).
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 27/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
isStreaming
Returns true if this Dataset contains one or more sources that continuously return data as it
arrives. A Dataset that reads data from a streaming source must be executed as a
StreamingQuery using the start() method in DataStreamWriter. Methods that return a
single answer, (e.g., count() or collect()) will throw an AnalysisException when there is
a streaming source present.
Note: Evolving
The following performs a full outer join between df1 and df2.
limit(num) [source]
Limits the result count to the number specified.
>>> df.limit(1).collect()
[Row(age=2, name='Alice')]
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 28/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
>>> df.limit(0).collect()
[]
localCheckpoint(eager=True) [source]
Returns a locally checkpointed version of this Dataset. Checkpointing can be used to truncate
the logical plan of this DataFrame, which is especially useful in iterative algorithms where the
plan may grow exponentially. Local checkpoints are stored in the executors using the caching
subsystem and therefore they are not reliable.
Note: Experimental
na
Returns a DataFrameNaFunctions for handling missing values.
orderBy(*cols, **kwargs)
Returns a new DataFrame sorted by the specified column(s).
>>> df.sort(df.age.desc()).collect()
[Row(age=5, name='Bob'), Row(age=2, name='Alice')]
>>> df.sort("age", ascending=False).collect()
[Row(age=5, name='Bob'), Row(age=2, name='Alice')]
>>> df.orderBy(df.age.desc()).collect()
[Row(age=5, name='Bob'), Row(age=2, name='Alice')]
>>> from pyspark.sql.functions import *
>>> df.sort(asc("age")).collect()
[Row(age=2, name='Alice'), Row(age=5, name='Bob')]
>>> df.orderBy(desc("age"), "name").collect()
[Row(age=5, name='Bob'), Row(age=2, name='Alice')]
>>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
[Row(age=5, name='Bob'), Row(age=2, name='Alice')]
Note: The default storage level has changed to MEMORY_AND_DISK to match Scala in 2.0.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 29/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
printSchema() [source]
Prints out the schema in the tree format.
>>> df.printSchema()
root
|-- age: integer (nullable = true)
|-- name: string (nullable = true)
Parameters: weights – list of doubles as weights with which to split the DataFrame.
Weights will be normalized if they don’t sum up to 1.0.
seed – The seed for sampling.
>>> splits[1].count()
3
rdd
Returns the content as an pyspark.RDD of Row.
registerTempTable(name) [source]
Registers this DataFrame as a temporary table using the given name.
The lifetime of this temporary table is tied to the SparkSession that was used to create this
DataFrame.
>>> df.registerTempTable("people")
>>> df2 = spark.sql("select * from people")
>>> sorted(df.collect()) == sorted(df2.collect())
True
>>> spark.catalog.dropTempView("people")
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 30/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
Changed in version 1.6: Added optional arguments to specify the partitioning columns. Also
made numPartitions optional if partitioning columns are specified.
>>> df.repartition(10).rdd.getNumPartitions()
10
>>> data = df.union(df).repartition("age")
>>> data.show()
+---+-----+
|age| name|
+---+-----+
| 5| Bob|
| 5| Bob|
| 2|Alice|
| 2|Alice|
+---+-----+
>>> data = data.repartition(7, "age")
>>> data.show()
+---+-----+
|age| name|
+---+-----+
| 2|Alice|
| 5| Bob|
| 2|Alice|
| 5| Bob|
+---+-----+
>>> data.rdd.getNumPartitions()
7
>>> data = data.repartition("name", "age")
>>> data.show()
+---+-----+
|age| name|
+---+-----+
| 5| Bob|
| 5| Bob|
| 2|Alice|
| 2|Alice|
+---+-----+
Parameters: to_replace – bool, int, long, float, string, list or dict. Value to be replaced.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 31/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
rollup(*cols) [source]
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 32/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
Create a multi-dimensional rollup for the current DataFrame using the specified columns, so
we can run aggregation on them.
Note: This is not guaranteed to provide exactly the fraction specified of the total count of
the given DataFrame.
>>> df = spark.range(10)
>>> df.sample(0.5, 3).count()
4
>>> df.sample(fraction=0.5, seed=3).count()
4
>>> df.sample(withReplacement=True, fraction=0.5, seed=3).count()
1
>>> df.sample(1.0).count()
10
>>> df.sample(fraction=1.0).count()
10
>>> df.sample(False, fraction=1.0).count()
10
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 33/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
schema
Returns the schema of this DataFrame as a pyspark.sql.types.StructType.
>>> df.schema
StructType(List(StructField(age,IntegerType,true),StructField(name,StringType
select(*cols) [source]
Projects a set of expressions and returns a new DataFrame.
Parameters: cols – list of column names (string) or expressions (Column). If one of the
column names is ‘*’, that column is expanded to include all columns in the
current DataFrame.
>>> df.select('*').collect()
[Row(age=2, name='Alice'), Row(age=5, name='Bob')]
>>> df.select('name', 'age').collect()
[Row(name='Alice', age=2), Row(name='Bob', age=5)]
>>> df.select(df.name, (df.age + 10).alias('age')).collect()
[Row(name='Alice', age=12), Row(name='Bob', age=15)]
selectExpr(*expr) [source]
Projects a set of SQL expressions and returns a new DataFrame.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 34/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
>>> df
DataFrame[age: int, name: string]
>>> df.show()
+---+-----+
|age| name|
+---+-----+
| 2|Alice|
| 5| Bob|
+---+-----+
>>> df.show(truncate=3)
+---+----+
|age|name|
+---+----+
| 2| Ali|
| 5| Bob|
+---+----+
>>> df.show(vertical=True)
-RECORD 0-----
age | 2
name | Alice
-RECORD 1-----
age | 5
name | Bob
>>> df.sort(df.age.desc()).collect()
[Row(age=5, name='Bob'), Row(age=2, name='Alice')]
>>> df.sort("age", ascending=False).collect()
[Row(age=5, name='Bob'), Row(age=2, name='Alice')]
>>> df.orderBy(df.age.desc()).collect()
[Row(age=5, name='Bob'), Row(age=2, name='Alice')]
>>> from pyspark.sql.functions import *
>>> df.sort(asc("age")).collect()
[Row(age=2, name='Alice'), Row(age=5, name='Bob')]
>>> df.orderBy(desc("age"), "name").collect()
[Row(age=5, name='Bob'), Row(age=2, name='Alice')]
>>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
[Row(age=5, name='Bob'), Row(age=2, name='Alice')]
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 35/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
Returns a new DataFrame with each partition sorted by the specified column(s).
stat
Returns a DataFrameStatFunctions for statistic functions.
storageLevel
Get the DataFrame’s current storage level.
>>> df.storageLevel
StorageLevel(False, False, False, False, 1)
>>> df.cache().storageLevel
StorageLevel(True, True, False, True, 1)
>>> df2.persist(StorageLevel.DISK_ONLY_2).storageLevel
StorageLevel(True, False, False, False, 2)
subtract(other) [source]
Return a new DataFrame containing rows in this frame but not in another frame.
summary(*statistics) [source]
Computes specified statistics for numeric and string columns. Available statistics are: - count -
mean - stddev - min - max - arbitrary approximate percentiles specified as a percentage (eg,
75%)
If no statistics are given, this function computes count, mean, stddev, min, approximate
quartiles (percentiles at 25%, 50%, and 75%), and max.
Note: This function is meant for exploratory data analysis, as we make no guarantee
about the backward compatibility of the schema of the resulting DataFrame.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 36/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
>>> df.summary().show()
+-------+------------------+-----+
|summary| age| name|
+-------+------------------+-----+
| count| 2| 2|
| mean| 3.5| null|
| stddev|2.1213203435596424| null|
| min| 2|Alice|
| 25%| 2| null|
| 50%| 2| null|
| 75%| 5| null|
| max| 5| Bob|
+-------+------------------+-----+
take(num) [source]
Returns the first num rows as a list of Row.
>>> df.take(2)
[Row(age=2, name='Alice'), Row(age=5, name='Bob')]
toDF(*cols) [source]
Returns a new class:DataFrame that with new specified column names
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 37/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
toJSON(use_unicode=True) [source]
Converts a DataFrame into a RDD of string.
Each row is turned into a JSON document as one element in the returned RDD.
>>> df.toJSON().first()
'{"age":2,"name":"Alice"}'
toLocalIterator() [source]
Returns an iterator that contains all of the rows in this DataFrame. The iterator will consume as
much memory as the largest partition in this DataFrame.
>>> list(df.toLocalIterator())
[Row(age=2, name='Alice'), Row(age=5, name='Bob')]
toPandas() [source]
Returns the contents of this DataFrame as Pandas pandas.DataFrame.
Note: This method should only be used if the resulting Pandas’s DataFrame is expected
to be small, as all the data is loaded into the driver’s memory.
>>> df.toPandas()
age name
0 2 Alice
1 5 Bob
union(other) [source]
Return a new DataFrame containing union of rows in this and another frame.
This is equivalent to UNION ALL in SQL. To do a SQL-style set union (that does deduplication
of elements), use this function followed by distinct().
Also as standard in SQL, this function resolves columns by position (not by name).
unionAll(other) [source]
Return a new DataFrame containing union of rows in this and another frame.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 38/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
This is equivalent to UNION ALL in SQL. To do a SQL-style set union (that does deduplication
of elements), use this function followed by distinct().
Also as standard in SQL, this function resolves columns by position (not by name).
unionByName(other) [source]
Returns a new DataFrame containing union of rows in this and another frame.
This is different from both UNION ALL and UNION DISTINCT in SQL. To do a SQL-style set
union (that does deduplication of elements), use this function followed by distinct().
The difference between this function and union() is that this function resolves columns by
name (not by position):
unpersist(blocking=False) [source]
Marks the DataFrame as non-persistent, and remove all blocks for it from memory and disk.
where(condition)
where() is an alias for filter().
The column expression must be an expression over this DataFrame; attempting to add a
column from some other dataframe will raise an error.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 39/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
The current watermark is computed by looking at the MAX(eventTime) seen across all of the
partitions in the query minus a user specified delayThreshold. Due to the cost of coordinating
this value across partitions, the actual watermark used is only guaranteed to be at least
delayThreshold behind the actual event time. In some cases we may still process records that
arrive more than delayThreshold late.
Parameters: eventTime – the name of the column that contains the event time of the
row.
delayThreshold – the minimum delay to wait to data to arrive late, relative
to the latest record that has been processed in the form of an interval (e.g.
“1 minute” or “5 hours”).
Note: Evolving
write
Interface for saving the content of the non-streaming DataFrame out into external storage.
Returns: DataFrameWriter
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 40/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
writeStream
Interface for saving the content of the streaming DataFrame out into external storage.
Note: Evolving.
Returns: DataStreamWriter
Note: Experimental
agg(*exprs) [source]
Compute aggregates and returns the result as a DataFrame.
The available aggregate functions are avg, max, min, sum, count.
If exprs is a single dict mapping from string to string, then the key is the column to perform
aggregation on, and the value is the aggregate function.
Parameters: exprs – a dict mapping from column name (string) to aggregate functions
(string), or a list of Column.
apply(udf) [source]
Maps each group of the current DataFrame using a pandas udf and returns the result as a
DataFrame.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 41/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
This function does not support partial aggregation, and requires shuffling all the data in the
DataFrame.
Note: Experimental
avg(*cols) [source]
Computes average values for each numeric columns for each group.
Parameters: cols – list of column names (string). Non-numeric columns are ignored.
>>> df.groupBy().avg('age').collect()
[Row(avg(age)=3.5)]
>>> df3.groupBy().avg('age', 'height').collect()
[Row(avg(age)=3.5, avg(height)=82.5)]
count() [source]
Counts the number of records for each group.
>>> sorted(df.groupBy(df.age).count().collect())
[Row(age=2, count=1), Row(age=5, count=1)]
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 42/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
max(*cols) [source]
Computes the max value for each numeric columns for each group.
>>> df.groupBy().max('age').collect()
[Row(max(age)=5)]
>>> df3.groupBy().max('age', 'height').collect()
[Row(max(age)=5, max(height)=85)]
mean(*cols) [source]
Computes average values for each numeric columns for each group.
Parameters: cols – list of column names (string). Non-numeric columns are ignored.
>>> df.groupBy().mean('age').collect()
[Row(avg(age)=3.5)]
>>> df3.groupBy().mean('age', 'height').collect()
[Row(avg(age)=3.5, avg(height)=82.5)]
min(*cols) [source]
Computes the min value for each numeric column for each group.
Parameters: cols – list of column names (string). Non-numeric columns are ignored.
>>> df.groupBy().min('age').collect()
[Row(min(age)=2)]
>>> df3.groupBy().min('age', 'height').collect()
[Row(min(age)=2, min(height)=80)]
# Compute the sum of earnings for each year by course with each course as a separate
column
>>> df4.groupBy("year").pivot("course").sum("earnings").collect()
[Row(year=2012, Java=20000, dotNET=15000), Row(year=2013, Java=30000, dotNET=
sum(*cols) [source]
Compute the sum for each numeric columns for each group.
Parameters: cols – list of column names (string). Non-numeric columns are ignored.
>>> df.groupBy().sum('age').collect()
[Row(sum(age)=7)]
>>> df3.groupBy().sum('age', 'height').collect()
[Row(sum(age)=7, sum(height)=165)]
df.colName
df["colName"]
Parameters: alias – strings of desired column names (collects all positional arguments
passed)
metadata – a dict of information to be stored in metadata attribute of the
corresponding :class: StructField (optional, keyword only argument)
>>> df.select(df.age.alias("age2")).collect()
[Row(age2=2), Row(age2=5)]
>>> df.select(df.age.alias("age3", metadata={'max': 99})).schema['age3'].meta
99
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 44/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
asc()
Returns a sort expression based on the ascending order of the given column name
astype(dataType)
astype() is an alias for cast().
bitwiseAND(other)
Compute bitwise AND of this expression with another expression.
Parameters: other – a value or Column to calculate bitwise and(&) against this Column.
bitwiseOR(other)
Compute bitwise OR of this expression with another expression.
Parameters: other – a value or Column to calculate bitwise or(|) against this Column.
bitwiseXOR(other)
Compute bitwise XOR of this expression with another expression.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 45/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
Parameters: other – a value or Column to calculate bitwise xor(^) against this Column.
cast(dataType) [source]
Convert the column into type dataType.
>>> df.select(df.age.cast("string").alias('ages')).collect()
[Row(ages='2'), Row(ages='5')]
>>> df.select(df.age.cast(StringType()).alias('ages')).collect()
[Row(ages='2'), Row(ages='5')]
contains(other)
Contains the other element. Returns a boolean Column based on a string match.
>>> df.filter(df.name.contains('o')).collect()
[Row(age=5, name='Bob')]
desc()
Returns a sort expression based on the descending order of the given column name.
endswith(other)
String ends with. Returns a boolean Column based on a string match.
>>> df.filter(df.name.endswith('ice')).collect()
[Row(age=2, name='Alice')]
>>> df.filter(df.name.endswith('ice$')).collect()
[]
eqNullSafe(other)
Equality test that is safe for null values.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 46/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
Note: Unlike Pandas, PySpark doesn’t consider NaN values to be NULL. See the NaN
Semantics for details.
getField(name) [source]
An expression that gets a field by name in a StructField.
getItem(key) [source]
An expression that gets an item at position ordinal out of a list, or gets an item by key out of
a dict.
isNotNull()
True if the current expression is NOT null.
isNull()
True if the current expression is null.
isin(*cols) [source]
A boolean expression that is evaluated to true if the value of this expression is contained by
the evaluated values of the arguments.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 48/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
like(other)
SQL like expression. Returns a boolean Column based on a SQL LIKE match.
>>> df.filter(df.name.like('Al%')).collect()
[Row(age=2, name='Alice')]
name(*alias, **kwargs)
name() is an alias for alias().
otherwise(value) [source]
Evaluates a list of conditions and returns one of multiple possible result expressions. If
Column.otherwise() is not invoked, None is returned for unmatched conditions.
over(window) [source]
Define a windowing column.
rlike(other)
SQL RLIKE expression (LIKE with Regex). Returns a boolean Column based on a regex
match.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 49/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
>>> df.filter(df.name.rlike('ice$')).collect()
[Row(age=2, name='Alice')]
startswith(other)
String starts with. Returns a boolean Column based on a string match.
>>> df.filter(df.name.startswith('Al')).collect()
[Row(age=2, name='Alice')]
>>> df.filter(df.name.startswith('^Al')).collect()
[]
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 50/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
cacheTable(tableName) [source]
Caches the specified table in-memory.
clearCache() [source]
Removes all cached tables from the in-memory cache.
The data source is specified by the source and a set of options. If source is not specified,
the default data source configured by spark.sql.sources.default will be used.
Optionally, a schema can be provided as the schema of the returned DataFrame and created
external table.
Returns: DataFrame
The data source is specified by the source and a set of options. If source is not specified,
the default data source configured by spark.sql.sources.default will be used. When path
is specified, an external table is created from the data at the given path. Otherwise a
managed table is created.
Optionally, a schema can be provided as the schema of the returned DataFrame and created
table.
Returns: DataFrame
currentDatabase() [source]
Returns the current default database in this session.
dropGlobalTempView(viewName) [source]
Drops the global temporary view with the given view name in the catalog. If the view has been
cached before, then it will also be uncached. Returns true if this view is dropped successfully,
false otherwise.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 51/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
dropTempView(viewName) [source]
Drops the local temporary view with the given view name in the catalog. If the view has been
cached before, then it will also be uncached. Returns true if this view is dropped successfully,
false otherwise.
Note that, the return type of this method was None in Spark 2.0, but changed to Boolean in
Spark 2.1.
isCached(tableName) [source]
Returns true if the table is currently cached in-memory.
Note: the order of arguments here is different from that of its JVM counterpart because Python
does not support method overloading.
listDatabases() [source]
Returns a list of databases available across all sessions.
listFunctions(dbName=None) [source]
Returns a list of functions registered in the specified database.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 52/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
If no database is specified, the current database is used. This includes all temporary
functions.
listTables(dbName=None) [source]
Returns a list of tables/views in the specified database.
If no database is specified, the current database is used. This includes all temporary views.
recoverPartitions(tableName) [source]
Recovers all the partitions of the given table and update the catalog.
refreshByPath(path) [source]
Invalidates and refreshes all the cached data (and the associated metadata) for any
DataFrame that contains the given data source path.
refreshTable(tableName) [source]
Invalidates and refreshes all the cached data and metadata of the given table.
setCurrentDatabase(dbName) [source]
Sets the current default database in this session.
uncacheTable(tableName) [source]
Removes the specified table from the in-memory cache.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 53/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
Row can be used to create a row object by using named arguments, the fields will be sorted by
names. It is not allowed to omit a named argument to represent the value is None or missing. This
should be explicitly set to None in this case.
Row also can be used to create another Row like class, then it could be used to create Row
objects, such as
asDict(recursive=False) [source]
Return as an dict
Parameters: how – ‘any’ or ‘all’. If ‘any’, drop a row if it contains any nulls. If ‘all’, drop a
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 54/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
>>> df4.na.drop().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10| 80|Alice|
+---+------+-----+
Parameters: value – int, long, float, string, bool or dict. Value to replace null values
with. If the value is a dict, then subset is ignored and value must be a
mapping from column name (string) to replacement value. The
replacement value must be an int, long, float, boolean, or string.
subset – optional list of column names to consider. Columns specified in
subset that do not have matching data type are ignored. For example, if
value is a string, and subset contains a non-string column, then the non-
string column is simply ignored.
>>> df4.na.fill(50).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10| 80|Alice|
| 5| 50| Bob|
| 50| 50| Tom|
| 50| 50| null|
+---+------+-----+
>>> df5.na.fill(False).show()
+----+-------+-----+
| age| name| spy|
+----+-------+-----+
| 10| Alice|false|
| 5| Bob|false|
|null|Mallory| true|
+----+-------+-----+
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 55/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
| 50| null|unknown|
+---+------+-------+
Parameters: to_replace – bool, int, long, float, string, list or dict. Value to be replaced.
If the value is a dict, then value is ignored or can be omitted, and
to_replace must be a mapping between a value and a replacement.
value – bool, int, long, float, string, list or None. The replacement value
must be a bool, int, long, float, string or None. If value is a list, value
should be of the same length and type as to_replace. If value is a scalar
and to_replace is a sequence, then value is used as a replacement for
each item in to_replace.
subset – optional list of column names to consider. Columns specified in
subset that do not have matching data type are ignored. For example, if
value is a string, and subset contains a non-string column, then the non-
string column is simply ignored.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 56/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
|null| null| Tom|
|null| null|null|
+----+------+----+
The result of this algorithm has the following deterministic bound: If the DataFrame has N
elements and if we request the quantile at probability p up to error err, then the algorithm will
return a sample x from the DataFrame so that the exact rank of x is close to (p * N). More
precisely,
This method implements a variation of the Greenwald-Khanna algorithm (with some speed
optimizations). The algorithm was first present in [[http://dx.doi.org/10.1145/375663.375670
Space-efficient Online Computation of Quantile Summaries]] by Greenwald and Khanna.
Note that null values will be ignored in numerical columns before calculation. For columns
only containing null values, an empty list is returned.
Parameters: col – str, list. Can be a single column name, or a list of names for multiple
columns.
probabilities – a list of quantile probabilities Each number must belong to
[0, 1]. For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
relativeError – The relative target precision to achieve (>= 0). If set to
zero, the exact quantiles are computed, which could be very expensive.
Note that values greater than 1 are accepted but give the same result as
1.
Returns: the approximate quantiles at the given probabilities. If the input col is a
string, the output is a list of floats. If the input col is a list or tuple of strings,
the output is also a list, but each element in it is a list of floats, i.e., the output
is a list of list of floats.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 57/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
Parameters: col1 – The name of the first column. Distinct items will make the first item
of each row.
col2 – The name of the second column. Distinct items will make the
column names of the DataFrame.
Note: This function is meant for exploratory data analysis, as we make no guarantee
about the backward compatibility of the schema of the resulting DataFrame.
Parameters: cols – Names of the columns to calculate frequent items for as a list or
tuple of strings.
support – The frequency with which to consider an item ‘frequent’. Default
is 1%. The support must be greater than 1e-4.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 58/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
For example:
>>> # ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
>>> window = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window
>>> # PARTITION BY country ORDER BY date RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWIN
>>> window = Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3)
Note: Experimental
currentRow = 0
Creates a WindowSpec with the frame boundaries defined, from start (inclusive) to end
(inclusive).
Both start and end are relative from the current row. For example, “0” means “current row”,
while “-1” means one off before the current row, and “5” means the five off after the current
row.
Both start and end are relative positions from the current row. For example, “0” means
“current row”, while “-1” means the row before the current row, and “5” means the fifth row
after the current row.
unboundedFollowing = 9223372036854775807
unboundedPreceding = -9223372036854775808
Note: Experimental
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 60/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
orderBy(*cols) [source]
Defines the ordering columns in a WindowSpec.
partitionBy(*cols) [source]
Defines the partitioning columns in a WindowSpec.
Both start and end are relative from the current row. For example, “0” means “current row”,
while “-1” means one off before the current row, and “5” means the five off after the current
row.
Both start and end are relative positions from the current row. For example, “0” means
“current row”, while “-1” means the row before the current row, and “5” means the fifth row
after the current row.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 61/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
min(sys.maxsize, 9223372036854775807).
This function will go through the input once to determine the input schema if inferSchema is
enabled. To avoid going through the entire data once, disable inferSchema option or specify
the schema explicitly using schema.
Parameters: path – string, or list of strings, for input path(s), or RDD of Strings storing
CSV rows.
schema – an optional pyspark.sql.types.StructType for the input
schema or a DDL-formatted string (For example col0 INT, col1
DOUBLE).
sep – sets a single character as a separator for each field and value. If
None is set, it uses the default value, ,.
encoding – decodes the CSV files by the given encoding type. If None is
set, it uses the default value, UTF-8.
quote – sets a single character used for escaping quoted values where
the separator can be part of the value. If None is set, it uses the default
value, ". If you would like to turn off quotations, you need to set an empty
string.
escape – sets a single character used for escaping quotes inside an
already quoted value. If None is set, it uses the default value, \.
comment – sets a single character used for skipping lines beginning with
this character. By default (None), it is disabled.
header – uses the first line as names of columns. If None is set, it uses
the default value, false.
inferSchema – infers the input schema automatically from data. It
requires one extra pass over the data. If None is set, it uses the default
value, false.
ignoreLeadingWhiteSpace – A flag indicating whether or not leading
whitespaces from values being read should be skipped. If None is set, it
uses the default value, false.
ignoreTrailingWhiteSpace – A flag indicating whether or not trailing
whitespaces from values being read should be skipped. If None is set, it
uses the default value, false.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 62/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, PERMISSIVE.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 63/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
>>> df = spark.read.csv('python/test_support/sql/ages.csv')
>>> df.dtypes
[('_c0', 'string'), ('_c1', 'string')]
>>> rdd = sc.textFile('python/test_support/sql/ages.csv')
>>> df2 = spark.read.csv(rdd)
>>> df2.dtypes
[('_c0', 'string'), ('_c1', 'string')]
format(source) [source]
Specifies the input data source format.
Parameters: source – string, name of the data source, e.g. ‘json’, ‘parquet’.
>>> df = spark.read.format('json').load('python/test_support/sql/people.json
>>> df.dtypes
[('age', 'bigint'), ('name', 'string')]
Partitions of the table will be retrieved in parallel if either column or predicates is specified.
lowerBound`, ``upperBound and numPartitions is needed when column is specified.
Note: Don’t create too many partitions in parallel on a large cluster; otherwise Spark
might crash your external database systems.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 64/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
JSON Lines (newline-delimited JSON) is supported by default. For JSON (one record per file),
set the multiLine parameter to true.
If the schema parameter is not specified, this function goes through the input once to
determine the input schema.
Parameters: path – string represents path to the JSON dataset, or a list of paths, or
RDD of Strings storing JSON objects.
schema – an optional pyspark.sql.types.StructType for the input
schema or a DDL-formatted string (For example col0 INT, col1
DOUBLE).
primitivesAsString – infers all primitive values as a string type. If None is
set, it uses the default value, false.
prefersDecimal – infers all floating-point values as a decimal type. If the
values do not fit in decimal, then it infers them as doubles. If None is set, it
uses the default value, false.
allowComments – ignores Java/C++ style comment in JSON records. If
None is set, it uses the default value, false.
allowUnquotedFieldNames – allows unquoted JSON field names. If
None is set, it uses the default value, false.
allowSingleQuotes – allows single quotes in addition to double quotes. If
None is set, it uses the default value, true.
allowNumericLeadingZero – allows leading zeros in numbers (e.g.
00012). If None is set, it uses the default value, false.
allowBackslashEscapingAnyCharacter – allows accepting quoting of all
character using backslash quoting mechanism. If None is set, it uses the
default value, false.
mode –
allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, PERMISSIVE.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 65/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
Parameters: path – optional string or a list of string for file-system backed data
sources.
format – optional string for format of the data source. Default to ‘parquet’.
schema – optional pyspark.sql.types.StructType for the input schema
or a DDL-formatted string (For example col0 INT, col1 DOUBLE).
options – all other string options
>>> df = spark.read.format("parquet").load('python/test_support/sql/parquet_p
... opt1=True, opt2=1, opt3='str')
>>> df.dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 66/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
>>> df = spark.read.format('json').load(['python/test_support/sql/people.json
... 'python/test_support/sql/people1.json'])
>>> df.dtypes
[('age', 'bigint'), ('aka', 'string'), ('name', 'string')]
options(**options) [source]
Adds input options for the underlying data source.
orc(path) [source]
Loads ORC files, returning the result as a DataFrame.
Note: Currently ORC support is only available together with Hive support.
>>> df = spark.read.orc('python/test_support/sql/orc_partitioned')
>>> df.dtypes
[('a', 'bigint'), ('b', 'int'), ('c', 'int')]
parquet(*paths) [source]
Loads Parquet files, returning the result as a DataFrame.
You can set the following Parquet-specific option(s) for reading Parquet files:
mergeSchema: sets whether we should merge schemas collected from all Parquet
part-files. This will override spark.sql.parquet.mergeSchema. The default value
is specified in spark.sql.parquet.mergeSchema.
>>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
>>> df.dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 67/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
schema(schema) [source]
Specifies the input schema.
Some data sources (e.g. JSON) can infer the input schema automatically from data. By
specifying the schema here, the underlying data source can skip the schema inference step,
and thus speed up data loading.
table(tableName) [source]
Returns the specified table as a DataFrame.
>>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
>>> df.createOrReplaceTempView('tmpTable')
>>> spark.read.table('tmpTable').dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
Each line in the text file is a new row in the resulting DataFrame.
>>> df = spark.read.text('python/test_support/sql/text-test.txt')
>>> df.collect()
[Row(value='hello'), Row(value='this')]
>>> df = spark.read.text('python/test_support/sql/text-test.txt', wholetext=T
>>> df.collect()
[Row(value='hello\nthis')]
>>> (df.write.format('parquet')
... .bucketBy(100, 'year', 'month')
... .mode("overwrite")
... .saveAsTable('bucketed_table'))
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 69/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
header – writes the names of columns as the first line. If None is set, it
uses the default value, false.
nullValue – sets the string representation of a null value. If None is set, it
uses the default value, empty string.
dateFormat – sets the string that indicates a date format. Custom date
formats follow the formats at java.text.SimpleDateFormat. This applies
to date type. If None is set, it uses the default value, yyyy-MM-dd.
timestampFormat – sets the string that indicates a timestamp format.
Custom date formats follow the formats at java.text.SimpleDateFormat.
This applies to timestamp type. If None is set, it uses the default value,
yyyy-MM-dd'T'HH:mm:ss.SSSXXX.
ignoreLeadingWhiteSpace – a flag indicating whether or not leading
whitespaces from values being written should be skipped. If None is set, it
uses the default value, true.
ignoreTrailingWhiteSpace – a flag indicating whether or not trailing
whitespaces from values being written should be skipped. If None is set, it
uses the default value, true.
charToEscapeQuoteEscaping – sets a single character used for
escaping the escape for the quote character. If None is set, the default
value is escape character when escape and quote characters are
different, \ otherwise..
format(source) [source]
Specifies the underlying output data source.
Parameters: source – string, name of the data source, e.g. ‘json’, ‘parquet’.
It requires that the schema of the class:DataFrame is the same as the schema of the table.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 70/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
Note: Don’t create too many partitions in parallel on a large cluster; otherwise Spark
might crash your external database systems.
mode(saveMode) [source]
Specifies the behavior when data or table already exists.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 71/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
Options include:
options(**options) [source]
Adds output options for the underlying data source.
Note: Currently ORC support is only available together with Hive support.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 72/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
partitionBy(*cols) [source]
Partitions the output by the given columns on the file system.
If specified, the output is laid out on the file system similar to Hive’s partitioning scheme.
The data source is specified by the format and a set of options. If format is not specified,
the default data source configured by spark.sql.sources.default will be used.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 73/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
In the case the table already exists, behavior of this function depends on the save mode,
specified by the mode function (default to throwing an exception). When mode is Overwrite,
the schema of the DataFrame does not need to be the same as that of the existing table.
>>> (df.write.format('parquet')
... .bucketBy(100, 'year', 'month')
... .sortBy('day')
... .mode("overwrite")
... .saveAsTable('sorted_bucketed_table'))
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 74/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
The DataFrame must have only one column that is of string type. Each row becomes a new
line in the output file.
pyspark.sql.types module
class pyspark.sql.types.DataType [source]
Base class for data types.
fromInternal(obj) [source]
Converts an internal SQL object into a native Python object.
json() [source]
jsonValue() [source]
needConversion() [source]
Does this type need to conversion between Python object and internal SQL object.
simpleString() [source]
toInternal(obj) [source]
Converts a Python object into an internal SQL object.
The data type representing None, used for the types that cannot be inferred.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 75/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
EPOCH_ORDINAL = 719163
fromInternal(v) [source]
needConversion() [source]
toInternal(d) [source]
fromInternal(ts) [source]
needConversion() [source]
toInternal(dt) [source]
The DecimalType must have fixed precision (the maximum total number of digits) and scale (the
number of digits on the right of dot). For example, (5, 2) can support the value from [-999.99 to
999.99].
The precision can be up to 38, the scale must less or equal to precision.
When create a DecimalType, the default precision and scale is (10, 0). When infer schema from
decimal.Decimal objects, it will be DecimalType(38, 18).
jsonValue() [source]
simpleString() [source]
simpleString() [source]
simpleString() [source]
simpleString() [source]
simpleString() [source]
fromInternal(obj) [source]
jsonValue() [source]
needConversion() [source]
simpleString() [source]
toInternal(obj) [source]
fromInternal(obj) [source]
jsonValue() [source]
needConversion() [source]
simpleString() [source]
toInternal(obj) [source]
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 77/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
fromInternal(obj) [source]
jsonValue() [source]
needConversion() [source]
simpleString() [source]
toInternal(obj) [source]
typeName() [source]
Iterating a StructType will iterate its StructFields. A contained StructField can be accessed
by name or position.
fieldNames() [source]
Returns all field names in a list.
fromInternal(obj) [source]
jsonValue() [source]
needConversion() [source]
simpleString() [source]
toInternal(obj) [source]
pyspark.sql.functions module
A collections of builtin functions
pyspark.sql.functions.abs(col)
Computes the absolute value.
pyspark.sql.functions.acos(col)
Returns: inverse cosine of col, as if computed by java.lang.Math.acos()
Parameters: rsd – maximum estimation error allowed (default = 0.05). For rsd < 0.01, it is
more efficient to use countDistinct()
>>> df.agg(approx_count_distinct(df.age).alias('distinct_ages')).collect()
[Row(distinct_ages=2)]
pyspark.sql.functions.array(*cols) [source]
Creates a new array column.
Parameters: cols – list of column names (string) or list of Column expressions that have the
same data type.
pyspark.sql.functions.asc(col)
Returns a sort expression based on the ascending order of the given column name.
pyspark.sql.functions.ascii(col)
Computes the numeric value of the first character of the string column.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 80/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
pyspark.sql.functions.asin(col)
Returns: inverse sine of col, as if computed by java.lang.Math.asin()
pyspark.sql.functions.atan(col)
Returns: inverse tangent of col, as if computed by java.lang.Math.atan()
pyspark.sql.functions.atan2(col1, col2)
Parameters: col1 – coordinate on y-axis
col2 – coordinate on x-axis
Returns: the theta component of the point (r, theta) in polar coordinates that corresponds
to the point (x, y) in Cartesian coordinates, as if computed by
java.lang.Math.atan2()
pyspark.sql.functions.avg(col)
Aggregate function: returns the average of the values in a group.
pyspark.sql.functions.base64(col)
Computes the BASE64 encoding of a binary column and returns it as a string column.
pyspark.sql.functions.bin(col) [source]
Returns the string representation of the binary value of the given column.
>>> df.select(bin(df.age).alias('c')).collect()
[Row(c='10'), Row(c='101')]
pyspark.sql.functions.bitwiseNOT(col)
Computes bitwise not.
pyspark.sql.functions.broadcast(df) [source]
Marks a DataFrame as small enough for use in broadcast joins.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 81/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
pyspark.sql.functions.cbrt(col)
Computes the cube-root of the given value.
pyspark.sql.functions.ceil(col)
Computes the ceiling of the given value.
pyspark.sql.functions.coalesce(*cols) [source]
Returns the first column that is not null.
>>> cDf = spark.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b")
>>> cDf.show()
+----+----+
| a| b|
+----+----+
|null|null|
| 1|null|
|null| 2|
+----+----+
pyspark.sql.functions.col(col)
Returns a Column based on the given column name.
pyspark.sql.functions.collect_list(col)
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 82/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
pyspark.sql.functions.collect_set(col)
Aggregate function: returns a set of objects with duplicate elements eliminated.
pyspark.sql.functions.column(col)
Returns a Column based on the given column name.
pyspark.sql.functions.concat(*cols) [source]
Concatenates multiple input columns together into a single column. If all inputs are binary, concat
returns an output as binary. Otherwise, it returns as string.
>>> a = range(20)
>>> b = [2 * x for x in range(20)]
>>> df = spark.createDataFrame(zip(a, b), ["a", "b"])
>>> df.agg(corr("a", "b").alias('c')).collect()
[Row(c=1.0)]
pyspark.sql.functions.cos(col)
Parameters: col – angle in radians
Returns: cosine of the angle, as if computed by java.lang.Math.cos().
pyspark.sql.functions.cosh(col)
Parameters: col – hyperbolic angle
Returns: hyperbolic cosine of the angle, as if computed by java.lang.Math.cosh()
pyspark.sql.functions.count(col)
Aggregate function: returns the number of items in a group.
>>> a = [1] * 10
>>> b = [1] * 10
>>> df = spark.createDataFrame(zip(a, b), ["a", "b"])
>>> df.agg(covar_pop("a", "b").alias('c')).collect()
[Row(c=0.0)]
Returns a new Column for the sample covariance of col1 and col2.
>>> a = [1] * 10
>>> b = [1] * 10
>>> df = spark.createDataFrame(zip(a, b), ["a", "b"])
>>> df.agg(covar_samp("a", "b").alias('c')).collect()
[Row(c=0.0)]
pyspark.sql.functions.crc32(col) [source]
Calculates the cyclic redundancy check value (CRC32) of a binary column and returns the value
as a bigint.
pyspark.sql.functions.create_map(*cols) [source]
Creates a new map column.
Parameters: cols – list of column names (string) or list of Column expressions that are
grouped as key-value pairs, e.g. (key1, value1, key2, value2, …).
pyspark.sql.functions.cume_dist()
Window function: returns the cumulative distribution of values within a window partition, i.e. the
fraction of rows that are below the current row.
pyspark.sql.functions.current_date() [source]
Returns the current date as a DateType column.
pyspark.sql.functions.current_timestamp() [source]
Returns the current timestamp as a TimestampType column.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 85/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
[Row(next_date=datetime.date(2015, 4, 9))]
A pattern could be for instance dd.MM.yyyy and could return a string like ‘18.03.1993’. All pattern
letters of the Java class java.text.SimpleDateFormat can be used.
Note: Use when ever possible specialized functions like year. These benefit from a specialized
implementation.
Parameters: format – ‘year’, ‘yyyy’, ‘yy’, ‘month’, ‘mon’, ‘mm’, ‘day’, ‘dd’, ‘hour’, ‘minute’,
‘second’, ‘week’, ‘quarter’
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 86/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
pyspark.sql.functions.dayofmonth(col) [source]
Extract the day of the month of a given date as integer.
pyspark.sql.functions.dayofweek(col) [source]
Extract the day of the week of a given date as integer.
pyspark.sql.functions.dayofyear(col) [source]
Extract the day of the year of a given date as integer.
pyspark.sql.functions.degrees(col)
Converts an angle measured in radians to an approximately equivalent angle measured in
degrees. :param col: angle in radians :return: angle in degrees, as if computed by
java.lang.Math.toDegrees()
pyspark.sql.functions.dense_rank()
Window function: returns the rank of rows within a window partition, without any gaps.
The difference between rank and dense_rank is that dense_rank leaves no gaps in ranking
sequence when there are ties. That is, if you were ranking a competition using dense_rank and
had three people tie for second place, you would say that all three were in second place and that
the next person came in third. Rank would give me sequential numbers, making the person that
came in third place (after the ties) would register as coming in fifth.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 87/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
pyspark.sql.functions.desc(col)
Returns a sort expression based on the descending order of the given column name.
pyspark.sql.functions.exp(col)
Computes the exponential of the given value.
pyspark.sql.functions.explode(col) [source]
Returns a new row for each element in the given array or map.
pyspark.sql.functions.explode_outer(col) [source]
Returns a new row for each element in the given array or map. Unlike explode, if the array/map is
null or empty then null is produced.
>>> df = spark.createDataFrame(
... [(1, ["foo", "bar"], {"x": 1.0}), (2, [], {}), (3, None, None)],
... ("id", "an_array", "a_map")
... )
>>> df.select("id", "an_array", explode_outer("a_map")).show()
+---+----------+----+-----+
| id| an_array| key|value|
+---+----------+----+-----+
| 1|[foo, bar]| x| 1.0|
| 2| []|null| null|
| 3| null|null| null|
+---+----------+----+-----+
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 88/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
pyspark.sql.functions.expm1(col)
Computes the exponential of the given value minus one.
pyspark.sql.functions.expr(str) [source]
Parses the expression string into the column that it represents
>>> df.select(expr("length(name)")).collect()
[Row(length(name)=5), Row(length(name)=3)]
pyspark.sql.functions.factorial(col) [source]
Computes the factorial of the given value.
The function by default returns the first values it sees. It will return the first non-null value it sees
when ignoreNulls is set to true. If all values are null, then null is returned.
pyspark.sql.functions.floor(col)
Computes the floor of the given value.
pyspark.sql.functions.format_number(col, d) [source]
Formats the number X to a format like ‘#,–#,–#.–’, rounded to d decimal places with HALF_EVEN
round mode, and returns the result as a string.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 89/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
Note: Since Spark 2.3, the DDL-formatted string or a JSON format string is also supported for
schema.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 90/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
>>> data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value
>>> df = spark.createDataFrame(data, ("key", "jstring"))
>>> df.select(df.key, get_json_object(df.jstring, '$.f1').alias("c0"), \
... get_json_object(df.jstring, '$.f2').alias("c1") ).collect(
[Row(key='1', c0='value1', c1='value2'), Row(key='2', c0='value12', c1=None)]
pyspark.sql.functions.greatest(*cols) [source]
Returns the greatest value of the list of column names, skipping null values. This function takes at
least 2 parameters. It will return null iff all parameters are null.
pyspark.sql.functions.grouping(col) [source]
Aggregate function: indicates whether a specified column in a GROUP BY list is aggregated or not,
returns 1 for aggregated or 0 for not aggregated in the result set.
pyspark.sql.functions.grouping_id(*cols) [source]
Aggregate function: returns the level of grouping, equals to
Note: The list of columns should match with grouping columns exactly, or empty (means all the
grouping columns).
pyspark.sql.functions.hash(*cols) [source]
Calculates the hash code of given columns, and returns the result as an int column.
pyspark.sql.functions.hex(col) [source]
Computes hex value of the given column, which could be pyspark.sql.types.StringType,
pyspark.sql.types.BinaryType, pyspark.sql.types.IntegerType or
pyspark.sql.types.LongType.
pyspark.sql.functions.hour(col) [source]
Extract the hours of a given date as integer.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 92/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
pyspark.sql.functions.hypot(col1, col2)
Computes sqrt(a^2 + b^2) without intermediate overflow or underflow.
pyspark.sql.functions.initcap(col) [source]
Translate the first letter of each word to upper case in the sentence.
pyspark.sql.functions.input_file_name() [source]
Creates a string column for the file name of the current Spark task.
Note: The position is not zero based, but 1 based index. Returns 0 if substr could not be found
in str.
pyspark.sql.functions.isnan(col) [source]
An expression that returns true iff the column is NaN.
pyspark.sql.functions.isnull(col) [source]
An expression that returns true iff the column is null.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 93/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
>>> data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value
>>> df = spark.createDataFrame(data, ("key", "jstring"))
>>> df.select(df.key, json_tuple(df.jstring, 'f1', 'f2')).collect()
[Row(key='1', c0='value1', c1='value2'), Row(key='2', c0='value12', c1=None)]
pyspark.sql.functions.kurtosis(col)
Aggregate function: returns the kurtosis of the values in a group.
The function by default returns the last values it sees. It will return the last non-null value it sees
when ignoreNulls is set to true. If all values are null, then null is returned.
pyspark.sql.functions.last_day(date) [source]
Returns the last day of the month which the given date belongs to.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 94/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
Window function: returns the value that is offset rows after the current row, and defaultValue if
there is less than offset rows after the current row. For example, an offset of one will return the
next row at any given point in the window partition.
pyspark.sql.functions.least(*cols) [source]
Returns the least value of the list of column names, skipping null values. This function takes at
least 2 parameters. It will return null iff all parameters are null.
pyspark.sql.functions.length(col) [source]
Computes the character length of string data or number of bytes of binary data. The length of
character data includes the trailing spaces. The length of binary data includes binary zeros.
pyspark.sql.functions.lit(col)
Creates a Column of literal value.
Note: The position is not zero based, but 1 based index. Returns 0 if substr could not be found
in str.
If there is only one argument, then this takes the natural logarithm of the argument.
pyspark.sql.functions.log10(col)
Computes the logarithm of the given value in Base 10.
pyspark.sql.functions.log1p(col)
Computes the natural logarithm of the given value plus one.
pyspark.sql.functions.log2(col) [source]
Returns the base-2 logarithm of the argument.
pyspark.sql.functions.lower(col)
Converts a string column to lower case.
pyspark.sql.functions.ltrim(col)
Trim the spaces from left end for the specified string value.
pyspark.sql.functions.map_keys(col) [source]
Collection function: Returns an unordered array containing the keys of the map.
pyspark.sql.functions.map_values(col) [source]
Collection function: Returns an unordered array containing the values of the map.
pyspark.sql.functions.max(col)
Aggregate function: returns the maximum value of the expression in a group.
pyspark.sql.functions.md5(col) [source]
Calculates the MD5 digest and returns the value as a 32 character hex string.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 97/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
pyspark.sql.functions.mean(col)
Aggregate function: returns the average of the values in a group.
pyspark.sql.functions.min(col)
Aggregate function: returns the minimum value of the expression in a group.
pyspark.sql.functions.minute(col) [source]
Extract the minutes of a given date as integer.
pyspark.sql.functions.monotonically_increasing_id() [source]
A column that generates monotonically increasing 64-bit integers.
The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.
The current implementation puts the partition ID in the upper 31 bits, and the record number within
each partition in the lower 33 bits. The assumption is that the data frame has less than 1 billion
partitions, and each partition has less than 8 billion records.
As an example, consider a DataFrame with two partitions, each with 3 records. This expression
would return the following IDs: 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.
pyspark.sql.functions.month(col) [source]
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 98/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
pyspark.sql.functions.ntile(n) [source]
Window function: returns the ntile group id (from 1 to n inclusive) in an ordered window partition.
For example, if n is 4, the first quarter of the rows will get value 1, the second quarter will get 2, the
third quarter will get 3, and the last quarter will get 4.
Parameters: n – an integer
Note: Experimental
1. SCALAR
A scalar UDF defines a transformation: One or more pandas.Series -> A pandas.Series. The
returnType should be a primitive data type, e.g., DoubleType(). The length of the returned
pandas.Series must be of the same as the input pandas.Series.
Note: The length of pandas.Series within a scalar UDF is not that of the whole input
column, but is the length of an internal batch used for each call to the function. Therefore,
this can be used, for example, to ensure the length of each returned pandas.Series, and
can not be used as the column length.
2. GROUPED_MAP
Note: The user-defined functions are considered deterministic by default. Due to optimization,
duplicate invocations may be eliminated or the function may even be invoked more times than it
is present in the query. If your function is not deterministic, call asNondeterministic on the user
defined function. E.g.:
Note: The user-defined functions do not support conditional expressions or short circuiting in
boolean expressions and it ends up with being executed all internally. If the functions can fail on
special rows, the workaround is to incorporate the condition into the functions.
Note: The user-defined functions do not take keyword arguments on the calling side.
pyspark.sql.functions.percent_rank()
Window function: returns the relative rank (i.e. percentile) of rows within a window partition.
pyspark.sql.functions.posexplode(col) [source]
Returns a new row for each element with position in the given array or map.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 101/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
>>> eDF.select(posexplode(eDF.mapfield)).show()
+---+---+-----+
|pos|key|value|
+---+---+-----+
| 0| a| b|
+---+---+-----+
pyspark.sql.functions.posexplode_outer(col) [source]
Returns a new row for each element with position in the given array or map. Unlike posexplode, if
the array/map is null or empty then the row (null, null) is produced.
>>> df = spark.createDataFrame(
... [(1, ["foo", "bar"], {"x": 1.0}), (2, [], {}), (3, None, None)],
... ("id", "an_array", "a_map")
... )
>>> df.select("id", "an_array", posexplode_outer("a_map")).show()
+---+----------+----+----+-----+
| id| an_array| pos| key|value|
+---+----------+----+----+-----+
| 1|[foo, bar]| 0| x| 1.0|
| 2| []|null|null| null|
| 3| null|null|null| null|
+---+----------+----+----+-----+
>>> df.select("id", "a_map", posexplode_outer("an_array")).show()
+---+----------+----+----+
| id| a_map| pos| col|
+---+----------+----+----+
| 1|[x -> 1.0]| 0| foo|
| 1|[x -> 1.0]| 1| bar|
| 2| []|null|null|
| 3| null|null|null|
+---+----------+----+----+
pyspark.sql.functions.pow(col1, col2)
Returns the value of the first argument raised to the power of the second argument.
pyspark.sql.functions.quarter(col) [source]
Extract the quarter of a given date as integer.
pyspark.sql.functions.radians(col)
Converts an angle measured in degrees to an approximately equivalent angle measured in
radians. :param col: angle in degrees :return: angle in radians, as if computed by
java.lang.Math.toRadians()
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 102/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
pyspark.sql.functions.rand(seed=None) [source]
Generates a random column with independent and identically distributed (i.i.d.) samples from
U[0.0, 1.0].
pyspark.sql.functions.randn(seed=None) [source]
Generates a column with independent and identically distributed (i.i.d.) samples from the standard
normal distribution.
pyspark.sql.functions.rank()
Window function: returns the rank of rows within a window partition.
The difference between rank and dense_rank is that dense_rank leaves no gaps in ranking
sequence when there are ties. That is, if you were ranking a competition using dense_rank and
had three people tie for second place, you would say that all three were in second place and that
the next person came in third. Rank would give me sequential numbers, making the person that
came in third place (after the ties) would register as coming in fifth.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 103/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
Replace all substrings of the specified string value that match regexp with rep.
pyspark.sql.functions.repeat(col, n) [source]
Repeats a string column n times, and returns it as a new string column.
pyspark.sql.functions.reverse(col)
Reverses the string column and returns it as a new string column.
pyspark.sql.functions.rint(col)
Returns the double value that is closest in value to the argument and is equal to a mathematical
integer.
pyspark.sql.functions.row_number()
Window function: returns a sequential number starting at 1 within a window partition.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 104/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
pyspark.sql.functions.rtrim(col)
Trim the spaces from right end for the specified string value.
pyspark.sql.functions.second(col) [source]
Extract the seconds of a given date as integer.
pyspark.sql.functions.sha1(col) [source]
Returns the hex string result of SHA-1.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 105/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
pyspark.sql.functions.signum(col)
Computes the signum of the given value.
pyspark.sql.functions.sin(col)
Parameters: col – angle in radians
Returns: sine of the angle, as if computed by java.lang.Math.sin()
pyspark.sql.functions.sinh(col)
Parameters: col – hyperbolic angle
Returns: hyperbolic sine of the given value, as if computed by java.lang.Math.sinh()
pyspark.sql.functions.size(col) [source]
Collection function: returns the length of the array or map stored in the column.
pyspark.sql.functions.skewness(col)
Aggregate function: returns the skewness of the values in a group.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 106/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
pyspark.sql.functions.soundex(col) [source]
Returns the SoundEx encoding for a string
pyspark.sql.functions.spark_partition_id() [source]
A column for partition ID.
Note: This is indeterministic because it depends on data partitioning and task scheduling.
>>> df.repartition(1).select(spark_partition_id().alias("pid")).collect()
[Row(pid=0), Row(pid=0)]
pyspark.sql.functions.sqrt(col)
Computes the square root of the specified float value.
pyspark.sql.functions.stddev(col)
Aggregate function: returns the unbiased sample standard deviation of the expression in a group.
pyspark.sql.functions.stddev_pop(col)
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 107/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
pyspark.sql.functions.stddev_samp(col)
Aggregate function: returns the unbiased sample standard deviation of the expression in a group.
pyspark.sql.functions.struct(*cols) [source]
Creates a new struct column.
pyspark.sql.functions.sum(col)
Aggregate function: returns the sum of all values in the expression.
pyspark.sql.functions.sumDistinct(col)
Aggregate function: returns the sum of distinct values in the expression.
pyspark.sql.functions.tan(col)
Parameters: col – angle in radians
Returns: tangent of the given value, as if computed by java.lang.Math.tan()
pyspark.sql.functions.tanh(col)
Parameters: col – hyperbolic angle
Returns: hyperbolic tangent of the given value, as if computed by java.lang.Math.tanh()
pyspark.sql.functions.toDegrees(col)
pyspark.sql.functions.toRadians(col)
Parameters: col – name of column containing the struct, array of the structs, the map or
array of the maps.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 109/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
options – options to control converting. accepts the same options as the json
datasource
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 110/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
A function translate any character in the srcCol by a character in matching. The characters in
replace is corresponding to the characters in matching. The translate will happen when any
character in the string matching with the character in the matching.
pyspark.sql.functions.trim(col)
Trim the spaces from both ends for the specified string column.
Note: The user-defined functions are considered deterministic by default. Due to optimization,
duplicate invocations may be eliminated or the function may even be invoked more times than it
is present in the query. If your function is not deterministic, call asNondeterministic on the user
defined function. E.g.:
Note: The user-defined functions do not support conditional expressions or short circuiting in
boolean expressions and it ends up with being executed all internally. If the functions can fail on
special rows, the workaround is to incorporate the condition into the functions.
Note: The user-defined functions do not take keyword arguments on the calling side.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 111/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
pyspark.sql.functions.unbase64(col)
Decodes a BASE64 encoded string column and returns it as a binary column.
pyspark.sql.functions.unhex(col) [source]
Inverse of hex. Interprets each pair of characters as a hexadecimal number and converts to the
byte representation of number.
pyspark.sql.functions.upper(col)
Converts a string column to upper case.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 112/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
pyspark.sql.functions.var_pop(col)
Aggregate function: returns the population variance of the values in a group.
pyspark.sql.functions.var_samp(col)
Aggregate function: returns the unbiased variance of the values in a group.
pyspark.sql.functions.variance(col)
Aggregate function: returns the population variance of the values in a group.
pyspark.sql.functions.weekofyear(col) [source]
Extract the week number of a given date as integer.
Durations are provided as strings, e.g. ‘1 second’, ‘1 day 12 hours’, ‘2 minutes’. Valid interval
strings are ‘week’, ‘day’, ‘hour’, ‘minute’, ‘second’, ‘millisecond’, ‘microsecond’. If the
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 113/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window
intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the
hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes.
The output column will be a struct called ‘window’ by default with the nested columns ‘start’ and
‘end’, where ‘start’ and ‘end’ will be of pyspark.sql.types.TimestampType.
pyspark.sql.functions.year(col) [source]
Extract the year of a given date as integer.
pyspark.sql.streaming module
class pyspark.sql.streaming.StreamingQuery(jsq) [source]
A handle to a query that is executing continuously in the background as new data arrives. All these
methods are thread-safe.
Note: Evolving
awaitTermination(timeout=None) [source]
Waits for the termination of this query, either by query.stop() or by an exception. If the query
has terminated with an exception, then the exception will be thrown. If timeout is set, it returns
whether the query has terminated or not within the timeout seconds.
If the query has terminated, then all subsequent calls to this method will either return
immediately (if the query was terminated by stop()), or throw the exception immediately (if
the query has terminated with exception).
exception() [source]
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 114/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
explain(extended=False) [source]
Prints the (logical and physical) plans to the console for debugging purpose.
Parameters: extended – boolean, default False. If False, prints only the physical plan.
>>> sq = sdf.writeStream.format('memory').queryName('query_explain').start()
>>> sq.processAllAvailable() # Wait a bit to generate the runtime plans.
>>> sq.explain()
== Physical Plan ==
...
>>> sq.explain(True)
== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
...
== Physical Plan ==
...
>>> sq.stop()
id
Returns the unique id of this query that persists across restarts from checkpoint data. That is,
this id is generated when a query is started for the first time, and will be the same every time it
is restarted from checkpoint data. There can only be one query with the same id active in a
Spark cluster. Also see, runId.
isActive
Whether this streaming query is currently active or not.
lastProgress
Returns the most recent StreamingQueryProgress update of this streaming query or None if
there were no progress updates :return: a map
name
Returns the user-specified name of the query, or null if not specified. This name can be
specified in the org.apache.spark.sql.streaming.DataStreamWriter as
dataframe.writeStream.queryName(“query”).start(). This name, if set, must be unique across
all active queries.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 115/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
processAllAvailable() [source]
Blocks until all available data in the source has been processed and committed to the sink.
This method is intended for testing.
Note: In the case of continually arriving data, this method may block forever. Additionally,
this method is only guaranteed to block until data that has been synchronously appended
data to a stream source prior to invocation. (i.e. getOffset must immediately reflect the
addition).
recentProgress
Returns an array of the most recent [[StreamingQueryProgress]] updates for this query. The
number of progress updates retained for each stream is configured by Spark session
configuration spark.sql.streaming.numRecentProgressUpdates.
runId
Returns the unique id of this query that does not persist across restarts. That is, every query
that is started (or restarted from checkpoint) will have a different runId.
status
Returns the current status of the query.
stop() [source]
Stop this streaming query.
Note: Evolving
active
Returns a list of active queries associated with this SQLContext
>>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
>>> sqm = spark.streams
>>> # get the list of active streaming queries
>>> [q.name for q in sqm.active]
['this_query']
>>> sq.stop()
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 116/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
awaitAnyTermination(timeout=None) [source]
Wait until any of the queries on the associated SQLContext has terminated since the creation
of the context, or since resetTerminated() was called. If any query was terminated with an
exception, then the exception will be thrown. If timeout is set, it returns whether the query has
terminated or not within the timeout seconds.
If a query has terminated, then subsequent calls to awaitAnyTermination() will either return
immediately (if the query was terminated by query.stop()), or throw the exception
immediately (if the query was terminated with exception). Use resetTerminated() to clear
past terminations and wait for new terminations.
In the case where multiple queries have terminated since resetTermination() was called, if
any query has terminated with exception, then awaitAnyTermination() will throw any of the
exception. For correctly documenting exceptions across multiple queries, users need to stop
all of them after any of them terminates with exception, and then check the query.exception()
for each query.
get(id) [source]
Returns an active query from this SQLContext or throws exception if an active query with this
name doesn’t exist.
>>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
>>> sq.name
'this_query'
>>> sq = spark.streams.get(sq.id)
>>> sq.isActive
True
>>> sq = sqlContext.streams.get(sq.id)
>>> sq.isActive
True
>>> sq.stop()
resetTerminated() [source]
Forget about past terminated queries so that awaitAnyTermination() can be used again to
wait for new terminations.
>>> spark.streams.resetTerminated()
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 117/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
Note: Evolving.
This function will go through the input once to determine the input schema if inferSchema is
enabled. To avoid going through the entire data once, disable inferSchema option or specify
the schema explicitly using schema.
Note: Evolving.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 118/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, PERMISSIVE.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 119/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
format(source) [source]
Specifies the input data source format.
Note: Evolving.
Parameters: source – string, name of the data source, e.g. ‘json’, ‘parquet’.
>>> s = spark.readStream.format("text")
JSON Lines (newline-delimited JSON) is supported by default. For JSON (one record per file),
set the multiLine parameter to true.
If the schema parameter is not specified, this function goes through the input once to
determine the input schema.
Note: Evolving.
Parameters: path – string represents path to the JSON dataset, or RDD of Strings
storing JSON objects.
schema – an optional pyspark.sql.types.StructType for the input
schema or a DDL-formatted string (For example col0 INT, col1
DOUBLE).
primitivesAsString – infers all primitive values as a string type. If None is
set, it uses the default value, false.
prefersDecimal – infers all floating-point values as a decimal type. If the
values do not fit in decimal, then it infers them as doubles. If None is set, it
uses the default value, false.
allowComments – ignores Java/C++ style comment in JSON records. If
None is set, it uses the default value, false.
allowUnquotedFieldNames – allows unquoted JSON field names. If
None is set, it uses the default value, false.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 120/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, PERMISSIVE.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 121/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
Note: Evolving.
Note: Evolving.
>>> s = spark.readStream.option("x", 1)
options(**options) [source]
Adds input options for the underlying data source.
Note: Evolving.
orc(path) [source]
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 122/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
Note: Evolving.
parquet(path) [source]
Loads a Parquet file stream, returning the result as a DataFrame.
You can set the following Parquet-specific option(s) for reading Parquet files:
mergeSchema: sets whether we should merge schemas collected from all Parquet
part-files. This will override spark.sql.parquet.mergeSchema. The default value
is specified in spark.sql.parquet.mergeSchema.
Note: Evolving.
schema(schema) [source]
Specifies the input schema.
Some data sources (e.g. JSON) can infer the input schema automatically from data. By
specifying the schema here, the underlying data source can skip the schema inference step,
and thus speed up data loading.
Note: Evolving.
>>> s = spark.readStream.schema(sdf_schema)
>>> s = spark.readStream.schema("col0 INT, col1 DOUBLE")
text(path) [source]
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 123/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
Loads a text file stream and returns a DataFrame whose schema starts with a string column
named “value”, and followed by partitioned columns if there are any.
Each line in the text file is a new row in the resulting DataFrame.
Note: Evolving.
Note: Evolving.
format(source) [source]
Specifies the underlying output data source.
Note: Evolving.
Parameters: source – string, name of the data source, which for now can be ‘parquet’.
Note: Evolving.
options(**options) [source]
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 124/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
Note: Evolving.
outputMode(outputMode) [source]
Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
Options include:
Note: Evolving.
partitionBy(*cols) [source]
Partitions the output by the given columns on the file system.
If specified, the output is laid out on the file system similar to Hive’s partitioning scheme.
Note: Evolving.
queryName(queryName) [source]
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 125/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
Specifies the name of the StreamingQuery that can be started with start(). This name must
be unique among all the currently active queries in the associated SparkSession.
Note: Evolving.
The data source is specified by the format and a set of options. If format is not specified,
the default data source configured by spark.sql.sources.default will be used.
Note: Evolving.
>>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
>>> sq.isActive
True
>>> sq.name
'this_query'
>>> sq.stop()
>>> sq.isActive
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 126/127
21/10/2022, 15:23 pyspark.sql module — PySpark 2.3.1 documentation
False
>>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start(
... queryName='that_query', outputMode="append", format='memory')
>>> sq.name
'that_query'
>>> sq.isActive
True
>>> sq.stop()
Note: Evolving.
https://spark.apache.org/docs/2.3.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession 127/127