Skip to main content

read_pulsar streaming table-valued function

Applies to: check marked yes Databricks SQL check marked yes Databricks Runtime 14.1 and above

Preview

This feature is in Public Preview.

Returns a table with records read from Pulsar.

This table-valued function only supports streaming and not batch query.

Syntax

read_pulsar ( { option_key => option_value } [, ...] )

Arguments

This function requires named parameter invocation for the option keys.

The options serviceUrl and topic are mandatory.

The descriptions of the arguments are brief here. See structured streaming Pulsar documentation for extended descriptions.

OptionTypeDefaultDescription
serviceUrlSTRINGMandatoryThe URI of the Pulsar service.
topicSTRINGMandatoryThe topic to read from.
predefinedSubscriptionSTRINGNoneThe predefined subscription name used by the connector to track spark application progress.
subscriptionPrefixSTRINGNoneA prefix used by the connector to generate a random subscription to track spark application progress.
pollTimeoutMsLONG120000The timeout for reading messages from Pulsar in milliseconds.
failOnDataLossBOOLEANtrueControls whether to fail a query when data is lost (for example, topics are deleted, or messages are deleted because of retention policy).
startingOffsetsSTRINGlatestThe start point when a query is started, either earliest, latest, or a JSON string that specifies a specific offset. If latest, the reader reads the newest records after it starts running. If earliest, the reader reads from the earliest offset. The user can also specify a JSON string that specifies a specific offset.
startingTimeSTRINGNoneWhen specified, Pulsar source will read messages starting from the position of the specified startingTime.

The following arguments are used for authentication of the pulsar client:

OptionTypeDefaultDescription
pulsarClientAuthPluginClassNameSTRINGNoneName of the authentication plugin.
pulsarClientAuthParamsSTRINGNoneParameters for the authentication plugin.
pulsarClientUseKeyStoreTlsSTRINGNoneWhether to use KeyStore for tls authentication.
pulsarClientTlsTrustStoreTypeSTRINGNoneTrustStore file type for tls authentication.
pulsarClientTlsTrustStorePathSTRINGNoneTrustStore file path for tls authentication.
pulsarClientTlsTrustStorePasswordSTRINGNoneTrustStore password for tls authentication.

These arguments are used for configuration and authentication of pulsar admission control, pulsar admin configuration is only required when admission control is enabled(when maxBytesPerTrigger is set)

OptionTypeDefaultDescription
maxBytesPerTriggerBIGINTNoneA soft limit of the maximum number of bytes we want to process per microbatch. If this is specified, admin.url also needs to be specified.
adminUrlSTRINGNoneThe Pulsar serviceHttpUrl configuration. Only needed when maxBytesPerTrigger is specified.
pulsarAdminAuthPluginSTRINGNoneName of the authentication plugin.
pulsarAdminAuthParamsSTRINGNoneParameters for the authentication plugin.
pulsarClientUseKeyStoreTlsSTRINGNoneWhether to use KeyStore for tls authentication.
pulsarAdminTlsTrustStoreTypeSTRINGNoneTrustStore file type for tls authentication.
pulsarAdminTlsTrustStorePathSTRINGNoneTrustStore file path for tls authentication.
pulsarAdminTlsTrustStorePasswordSTRINGNoneTrustStore password for tls authentication.

Returns

A table of pulsar records with the following schema.

  • __key STRING NOT NULL: Pulsar message key.

  • value BINARY NOT NULL: Pulsar message value.

    Note: For topics with Avro or JSON schema, instead of loading content into a binary value field, the content will be expanded to preserve the field names and field types of the Pulsar topic.

  • __topic STRING NOT NULL: Pulsar topic name.

  • __messageId BINARY NOT NULL: Pulsar message id.

  • __publishTime TIMESTAMP NOT NULL: Pulsar message publish time.

  • __eventTime TIMESTAMP NOT NULL: Pulsar message event time.

  • __messageProperties MAP<STRING, STRING>: Pulsar message properties.

Examples

SQL
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');

-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);

The data can now to be queried from the testing.streaming_table for further analysis.