Skip to main content

read_pubsub streaming table-valued function

Applies to: check marked yes Databricks SQL check marked yes Databricks Runtime 13.3 LTS and above

Returns a table with records read from Pub/Sub from a topic. Only supports streaming queries.

Syntax

read_pubsub( { parameter => value } [, ...])

Arguments

read_pubsub requires named parameter invocation.

The only required arguments are subscriptionId, projectId, and topicId. All other arguments are optional.

For full argument descriptions, see Configure options for Pub/Sub streaming read.

Databricks recommends using secrets when providing authorization options. See secret function.

For details on configuring access to Pub/Sub, see Configure access to Pub/Sub.

ParameterTypeDescription
subscriptionIdSTRINGRequired, the unique identifier assigned to a Pub/Sub subscription.
projectIdSTRINGRequired, the Google Cloud project ID associated with the Pub/Sub topic.
topicIdSTRINGRequired, the ID or name of the Pub/Sub topic to subscribe to.
clientEmailSTRINGThe email address associated with a service account for authentication.
clientIdSTRINGThe client ID associated with the service account for authentication.
privateKeyIdSTRINGThe ID of the private key associated with the service account.
privateKeySTRINGThe private key associated with the service account for authentication.

These arguments are used for further fine-tuning when reading from Pub/Sub:

ParameterTypeDescription
numFetchPartitionsSTRINGOptional with default number of executors. The number of parallel Spark tasks that fetch records from a subscription.
deleteSubscriptionOnStreamStopBOOLEANOptional with default false. If set to true, the subscription passed to the stream is deleted when the streaming job ends.
maxBytesPerTriggerSTRINGA soft limit for the batch size to be processed during each triggered micro-batch. The default is ‘none’.
maxRecordsPerFetchSTRINGThe number of records to fetch per task before processing records. The default is ‘1000’.
maxFetchPeriodSTRINGThe time duration for each task to fetch before processing records. The default is ’10s’.

Returns

A table of Pub/Sub records with the following schema. The attributes column could be null but all other columns are not null.

NameData typeNullableStandardDescription
messageIdSTRINGNoUnique identifier for the Pub/Sub message.
payloadBINARYNoThe content of the Pub/Sub message.
attributesSTRINGYesKey-value pairs representing the attributes of the Pub/Sub message. This is a json-encoded string.
publishTimestampInMillisBIGINTNoThe timestamp when the message was published, in milliseconds.
sequenceNumberBIGINTNoThe unique identifier of the record within its shard.

Examples

SQL
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
clientEmail => secret(‘app-events’, ‘clientEmail’),
clientId => secret(‘app-events’, ‘clientId’),
privateKeyId => secret(‘app-events’, ‘privateKeyId’),
privateKey => secret(‘app-events’, ‘privateKey’)
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’
);

The data would now need to be queried from the testing.streaming_table for further analysis.

Erroneous queries:

SQL
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234,
projectId => ‘app-events-project’
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
maxRecordsPerFetchLimit =>1000001
);