Spark Streaming + Kinesis Integration
Amazon Kinesis is a fully managed service for real-time processing of streaming data at massive scale. The Kinesis receiver creates an input DStream using the Kinesis Client Library (KCL) provided by Amazon under the Amazon Software License (ASL). The KCL builds on top of the Apache 2.0 licensed AWS Java SDK and provides load-balancing, fault-tolerance, checkpointing through the concepts of Workers, Checkpoints, and Shard Leases. Here we explain how to configure Spark Streaming to receive data from Kinesis.
Configuring Kinesis
A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or more shards per the following guide.
Configuring Spark Streaming Application
-
Linking: For Scala/Java applications using SBT/Maven project definitions, link your streaming application against the following artifact (see Linking section in the main programming guide for further information).
groupId = org.apache.spark artifactId = spark-streaming-kinesis-asl_2.12 version = 3.2.1
For Python applications, you will have to add this above library and its dependencies when deploying your application. See the Deploying subsection below. Note that by linking to this library, you will include ASL-licensed code in your application.
-
Programming: In the streaming application code, import
KinesisInputDStream
and create the input DStream of byte array as follows:import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kinesis.KinesisInitialPositions val kinesisStream = KinesisInputDStream.builder .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build()
See the API docs and the example. Refer to the Running the Example subsection for instructions on how to run the example.
import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.kinesis.KinesisInputDStream; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.kinesis.KinesisInitialPositions; KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder() .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build();
See the API docs and the example. Refer to the Running the Example subsection for instructions to run the example.
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream kinesisStream = KinesisUtils.createStream( streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
See the API docs and the example. Refer to the Running the Example subsection for instructions to run the example.
You may also provide the following settings. These are currently only supported in Scala and Java.
-
A “message handler function” that takes a Kinesis
Record
and returns a generic objectT
, in case you would like to use other data included in aRecord
such as partition key. -
CloudWatch metrics level and dimensions. See the AWS documentation about monitoring KCL for details.
import collection.JavaConverters._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kinesis.KinesisInitialPositions import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel val kinesisStream = KinesisInputDStream.builder .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet) .buildWithMessageHandler([message handler])
import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.kinesis.KinesisInputDStream; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.kinesis.KinesisInitialPositions; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import scala.collection.JavaConverters; KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder() .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(JavaConverters.asScalaSetConverter(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS).asScala().toSet()) .buildWithMessageHandler([message handler]);
-
streamingContext
: StreamingContext containing an application name used by Kinesis to tie this Kinesis application to the Kinesis stream [Kinesis app name]
: The application name that will be used to checkpoint the Kinesis sequence numbers in DynamoDB table.- The application name must be unique for a given account and region.
- If the table exists but has incorrect checkpoint information (for a different stream, or old expired sequenced numbers), then there may be temporary errors.
-
[Kinesis stream name]
: The Kinesis stream that this streaming application will pull data from. -
[endpoint URL]
: Valid Kinesis endpoints URL can be found here. -
[region name]
: Valid Kinesis region names can be found here. -
[checkpoint interval]
: The interval (e.g., Duration(2000) = 2 seconds) at which the Kinesis Client Library saves its position in the stream. For starters, set it to the same as the batch interval of the streaming application. -
[initial position]
: Can be eitherKinesisInitialPositions.TrimHorizon
orKinesisInitialPositions.Latest
orKinesisInitialPositions.AtTimestamp
(seeKinesis Checkpointing
section andAmazon Kinesis API documentation
for more details). [message handler]
: A function that takes a KinesisRecord
and outputs genericT
.
In other versions of the API, you can also specify the AWS access key and secret key directly.
-
-
Deploying: As with any Spark applications,
spark-submit
is used to launch your application. However, the details are slightly different for Scala/Java applications and Python applications.For Scala and Java applications, if you are using SBT or Maven for project management, then package
spark-streaming-kinesis-asl_2.12
and its dependencies into the application JAR. Make surespark-core_2.12
andspark-streaming_2.12
are marked asprovided
dependencies as those are already present in a Spark installation. Then usespark-submit
to launch your application (see Deploying section in the main programming guide).For Python applications which lack SBT/Maven project management,
spark-streaming-kinesis-asl_2.12
and its dependencies can be directly added tospark-submit
using--packages
(see Application Submission Guide). That is,./bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.2.1 ...
Alternatively, you can also download the JAR of the Maven artifact
spark-streaming-kinesis-asl-assembly
from the Maven repository and add it tospark-submit
with--jars
.Points to remember at runtime:
-
Kinesis data processing is ordered per partition and occurs at-least once per message.
-
Multiple applications can read from the same Kinesis stream. Kinesis will maintain the application-specific shard and checkpoint info in DynamoDB.
-
A single Kinesis stream shard is processed by one input DStream at a time.
-
A single Kinesis input DStream can read from multiple shards of a Kinesis stream by creating multiple KinesisRecordProcessor threads.
-
Multiple input DStreams running in separate processes/instances can read from a Kinesis stream.
-
You never need more Kinesis input DStreams than the number of Kinesis stream shards as each input DStream will create at least one KinesisRecordProcessor thread that handles a single shard.
-
Horizontal scaling is achieved by adding/removing Kinesis input DStreams (within a single process or across multiple processes/instances) - up to the total number of Kinesis stream shards per the previous point.
-
The Kinesis input DStream will balance the load between all DStreams - even across processes/instances.
-
The Kinesis input DStream will balance the load during re-shard events (merging and splitting) due to changes in load.
-
As a best practice, it’s recommended that you avoid re-shard jitter by over-provisioning when possible.
-
Each Kinesis input DStream maintains its own checkpoint info. See the Kinesis Checkpointing section for more details.
-
There is no correlation between the number of Kinesis stream shards and the number of RDD partitions/shards created across the Spark cluster during input DStream processing. These are 2 independent partitioning schemes.
-
Running the Example
To run the example,
-
Download a Spark binary from the download site.
-
Set up Kinesis stream (see earlier section) within AWS. Note the name of the Kinesis stream and the endpoint URL corresponding to the region where the stream was created.
-
Set up the environment variables
AWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
with your AWS credentials. -
In the Spark root directory, run the example as
./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.2.1 streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.2.1 streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
./bin/spark-submit --jars 'external/kinesis-asl-assembly/target/spark-streaming-kinesis-asl-assembly_*.jar' \ external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \ [Kinesis app name] [Kinesis stream name] [endpoint URL] [region name]
This will wait for data to be received from the Kinesis stream.
-
To generate random string data to put onto the Kinesis stream, in another terminal, run the associated Kinesis data producer.
./bin/run-example streaming.KinesisWordProducerASL [Kinesis stream name] [endpoint URL] 1000 10
This will push 1000 lines per second of 10 random numbers per line to the Kinesis stream. This data should then be received and processed by the running example.
Record De-aggregation
When data is generated using the Kinesis Producer Library (KPL), messages may be aggregated for cost savings. Spark Streaming will automatically de-aggregate records during consumption.
Kinesis Checkpointing
-
Each Kinesis input DStream periodically stores the current position of the stream in the backing DynamoDB table. This allows the system to recover from failures and continue processing where the DStream left off.
-
Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling. The provided example handles this throttling with a random-backoff-retry strategy.
-
If no Kinesis checkpoint info exists when the input DStream starts, it will start either from the oldest record available (
KinesisInitialPositions.TrimHorizon
), or from the latest tip (KinesisInitialPositions.Latest
), or (except Python) from the position denoted by the provided UTC timestamp (KinesisInitialPositions.AtTimestamp(Date timestamp)
). This is configurable.KinesisInitialPositions.Latest
could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored).KinesisInitialPositions.TrimHorizon
may lead to duplicate processing of records where the impact is dependent on checkpoint frequency and processing idempotency.
Kinesis retry configuration
spark.streaming.kinesis.retry.waitTime
: Wait time between Kinesis retries as a duration string. When reading from Amazon Kinesis, users may hitProvisionedThroughputExceededException
’s, when consuming faster than 5 transactions/second or, exceeding the maximum read rate of 2 MiB/second. This configuration can be tweaked to increase the sleep between fetches when a fetch fails to reduce these exceptions. Default is “100ms”.spark.streaming.kinesis.retry.maxAttempts
: Max number of retries for Kinesis fetches. This config can also be used to tackle the KinesisProvisionedThroughputExceededException
’s in scenarios mentioned above. It can be increased to have more number of retries for Kinesis reads. Default is 3.