Skip to main content

read_kinesis streaming table-valued function

Applies to: check marked yes Databricks SQL check marked yes Databricks Runtime 13.3 LTS and above

Returns a table with records read from Kinesis from one or more streams.

Syntax

read_kinesis ( { parameter => value } [, ...] )

Arguments

read_kinesis requires named parameter invocation.

The only required argument is streamName. All other arguments are optional.

The descriptions of the arguments are brief here. For more details, see the Amazon Kinesis documentation.

There are various connection options to connect and authenticate with AWS. awsAccessKey, and awsSecretKey can either be specified in the function arguments using the secret function, manually set in the arguments, or configured as environment variables as indicated below. roleArn, roleExternalID, roleSessionName can also be used to authenticate with AWS by using instance profiles. If none of these are specified, it will use the default AWS provider chain.

For more information regarding authentication, visit Amazon Kinesis.

ParameterTypeDescription
streamNameSTRINGRequired, comma-separated list of one or more kinesis streams.
awsAccessKeySTRINGThe AWS Access key, if any. Can also be specified through the various options supported through the AWS default credential provider chain including environment variables (AWS_ACCESS_KEY_ID) and a credential profiles file.
awsSecretKeySTRINGThe secret key which corresponds to the access key. Can be specified either in the arguments or through the various options supported through the AWS default credential provider chain including environment variables (AWS_SECRET_KEY or AWS_SECRET_ACCESS_KEY) and a credentials profiles file.
roleArnSTRINGAmazon resource name of the role to assume when accessing Kinesis.
roleExternalIdSTRINGUsed when delegating access to the AWS account.
roleSessionNameSTRINGAWS role session name.
stsEndpointSTRINGAn endpoint for requesting temporary access credentials.
regionSTRINGRegion for the streams to be specified. The default is the locally resolved region.
endpointSTRINGregional endpoint for Kinesis data streams. The default is the locally resolved region.
initialPositionSTRINGStarting position for reading from in the stream. One of: ‘latest’ (default), ‘trim_horizon’, ‘earliest’, ‘at_timestamp’.
consumerModeSTRINGOne of: ‘polling’ (default), or ‘EFO’ (enhanced-fan-out).
consumerNameSTRINGThe name of the consumer. All consumers are prefixed with ‘databricks_’. The default is an empty string.
registerConsumerTimeoutIntervalSTRINGthe max timeout to wait for the Kinesis EFO consumer to be registered with the Kinesis stream before throwing an error. The default is ‘300s’.
requireConsumerDeregistrationBOOLEANtrue to de-register the EFO consumer on query termination. Default is false.
deregisterConsumerTimeoutIntervalSTRINGThe max timeout to wait for the Kinesis EFO consumer to be deregistered with the Kinesis stream before throwing an error. The default is ‘300s’.
consumerRefreshIntervalSTRINGThe interval at which the consumer is checked and refreshed. The default is ‘300s’.

The following arguments are used for controlling the read throughput and latency for Kinesis:

ParameterTypeDescription
maxRecordsPerFetchINTEGER (>0)Optional, with a default of 10,000 records to be read per API request to Kinesis.
maxFetchRateSTRINGHow fast to prefetch data per shard. A value between ‘1.0’ and ‘2.0’ that’s measured in MB/s. The default is ‘1.0’.
minFetchPeriodSTRINGThe maximum wait time between consecutive prefetch attempts. The default is ‘400ms’.
maxFetchDurationSTRINGThe maximum duration to buffer prefetched new data. The default is ’10s’.
fetchBufferSizeSTRINGThe amount of data for the next trigger. The default is ‘20gb’.
shardsPerTaskINTEGER (>0)The number of Kinesis shards to prefetch from in parallel per spark task. The default is 5.
shardFetchintervalSTRINGHow often to poll for resharding. The default is ‘1s’.
coalesceThresholdBlockSizeINTEGER (>0)The threshold at which automatic coalesce occurs. The default is 10,000,000.
coalesceBOOLEANtrue to coalesce prefetched requests. The default is true.
coalesceBinSizeINTEGER (>0)The approximate block size after coalescing. The default is 128,000,000.
reuseKinesisClientBOOLEANtrue to reuse the Kinesis client stored in the cache. The default is true except on a PE cluster.
clientRetriesINTEGER (>0)The number of retries in the retry scenario. The default is 5.

Returns

A table of Kinesis records with the following schema:

NameData typeNullableStandardDescription
partitionKeySTRINGNoA key that is used to distribute data among the shards of a stream. All data records with the same partition key will be read from the same shard.
dataBINARYNoThe kinesis data payload, base-64 encoded.
streamSTRINGNoThe name of the stream where the data was read from.
shardIdSTRINGNoA unique identifier for the shard where the data was read from.
sequenceNumberBIGINTNoThe unique identifier of the record within its shard.
approximateArrivalTimestampTIMESTAMPNoThe approximate time that the record was inserted into the stream.

The columns (stream, shardId, sequenceNumber) constitute a primary key.

Examples

SQL
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
initialPosition => 'earliest');

-- The data would now need to be queried from the testing.streaming_table

-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');

-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');