Skip to content

Commit eca401e

Browse files
brkyvzzsxwing
authored andcommitted
[SPARK-11985][STREAMING][KINESIS][DOCS] Update Kinesis docs
- Provide example on `message handler` - Provide bit on KPL record de-aggregation - Fix typos Author: Burak Yavuz <brkyvz@gmail.com> Closes apache#9970 from brkyvz/kinesis-docs. (cherry picked from commit 2377b70) Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
1 parent bd33d4e commit eca401e

File tree

1 file changed

+45
-9
lines changed

1 file changed

+45
-9
lines changed

docs/streaming-kinesis-integration.md

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
2323

2424
**Note that by linking to this library, you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your application.**
2525

26-
2. **Programming:** In the streaming application code, import `KinesisUtils` and create the input DStream as follows:
26+
2. **Programming:** In the streaming application code, import `KinesisUtils` and create the input DStream of byte array as follows:
2727

2828
<div class="codetabs">
2929
<div data-lang="scala" markdown="1">
@@ -36,7 +36,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
3636
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
3737

3838
See the [API docs](api/scala/index.html#org.apache.spark.streaming.kinesis.KinesisUtils$)
39-
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala). Refer to the Running the Example section for instructions on how to run the example.
39+
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala). Refer to the [Running the Example](#running-the-example) subsection for instructions on how to run the example.
4040

4141
</div>
4242
<div data-lang="java" markdown="1">
@@ -49,7 +49,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
4949
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2);
5050

5151
See the [API docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisUtils.html)
52-
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java). Refer to the next subsection for instructions to run the example.
52+
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java). Refer to the [Running the Example](#running-the-example) subsection for instructions to run the example.
5353

5454
</div>
5555
<div data-lang="python" markdown="1">
@@ -60,18 +60,47 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
6060
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
6161

6262
See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kinesis.KinesisUtils)
63-
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py). Refer to the next subsection for instructions to run the example.
63+
and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py). Refer to the [Running the Example](#running-the-example) subsection for instructions to run the example.
6464

6565
</div>
6666
</div>
6767

68-
- `streamingContext`: StreamingContext containg an application name used by Kinesis to tie this Kinesis application to the Kinesis stream
68+
You may also provide a "message handler function" that takes a Kinesis `Record` and returns a generic object `T`, in case you would like to use other data included in a `Record` such as partition key. This is currently only supported in Scala and Java.
6969

70-
- `[Kineiss app name]`: The application name that will be used to checkpoint the Kinesis
70+
<div class="codetabs">
71+
<div data-lang="scala" markdown="1">
72+
73+
import org.apache.spark.streaming.Duration
74+
import org.apache.spark.streaming.kinesis._
75+
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
76+
77+
val kinesisStream = KinesisUtils.createStream[T](
78+
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
79+
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2,
80+
[message handler])
81+
82+
</div>
83+
<div data-lang="java" markdown="1">
84+
85+
import org.apache.spark.streaming.Duration;
86+
import org.apache.spark.streaming.kinesis.*;
87+
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
88+
89+
JavaReceiverInputDStream<T> kinesisStream = KinesisUtils.createStream(
90+
streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
91+
[region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2,
92+
[message handler], [class T]);
93+
94+
</div>
95+
</div>
96+
97+
- `streamingContext`: StreamingContext containg an application name used by Kinesis to tie this Kinesis application to the Kinesis stream
98+
99+
- `[Kinesis app name]`: The application name that will be used to checkpoint the Kinesis
71100
sequence numbers in DynamoDB table.
72101
- The application name must be unique for a given account and region.
73102
- If the table exists but has incorrect checkpoint information (for a different stream, or
74-
old expired sequenced numbers), then there may be temporary errors.
103+
old expired sequenced numbers), then there may be temporary errors.
75104

76105
- `[Kinesis stream name]`: The Kinesis stream that this streaming application will pull data from.
77106

@@ -83,6 +112,8 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
83112

84113
- `[initial position]`: Can be either `InitialPositionInStream.TRIM_HORIZON` or `InitialPositionInStream.LATEST` (see Kinesis Checkpointing section and Amazon Kinesis API documentation for more details).
85114

115+
- `[message handler]`: A function that takes a Kinesis `Record` and outputs generic `T`.
116+
86117
In other versions of the API, you can also specify the AWS access key and secret key directly.
87118

88119
3. **Deploying:** Package `spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).
@@ -99,7 +130,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
99130
<img src="img/streaming-kinesis-arch.png"
100131
title="Spark Streaming Kinesis Architecture"
101132
alt="Spark Streaming Kinesis Architecture"
102-
width="60%"
133+
width="60%"
103134
/>
104135
<!-- Images are downsized intentionally to improve quality on retina displays -->
105136
</p>
@@ -165,11 +196,16 @@ To run the example,
165196

166197
This will push 1000 lines per second of 10 random numbers per line to the Kinesis stream. This data should then be received and processed by the running example.
167198

199+
#### Record De-aggregation
200+
201+
When data is generated using the [Kinesis Producer Library (KPL)](http://docs.aws.amazon.com/kinesis/latest/dev/developing-producers-with-kpl.html), messages may be aggregated for cost savings. Spark Streaming will automatically
202+
de-aggregate records during consumption.
203+
168204
#### Kinesis Checkpointing
169205
- Each Kinesis input DStream periodically stores the current position of the stream in the backing DynamoDB table. This allows the system to recover from failures and continue processing where the DStream left off.
170206

171207
- Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling. The provided example handles this throttling with a random-backoff-retry strategy.
172208

173209
- If no Kinesis checkpoint info exists when the input DStream starts, it will start either from the oldest record available (InitialPositionInStream.TRIM_HORIZON) or from the latest tip (InitialPostitionInStream.LATEST). This is configurable.
174-
- InitialPositionInStream.LATEST could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored).
210+
- InitialPositionInStream.LATEST could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored).
175211
- InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records where the impact is dependent on checkpoint frequency and processing idempotency.

0 commit comments

Comments
 (0)