read_pubsub
streaming table-valued function
Applies to: Databricks SQL
Databricks Runtime 13.3 LTS and above
Returns a table with records read from Pub/Sub from a topic. Only supports streaming queries.
Syntax
read_pubsub( { parameter => value } [, ...])
Arguments
read_pubsub
requires named parameter invocation.
The only required arguments are subscriptionId
, projectId
, and topicId
. All other arguments are optional.
For full argument descriptions, see Configure options for Pub/Sub streaming read.
Databricks recommends using secrets when providing authorization options. See secret
function.
For details on configuring access to Pub/Sub, see Configure access to Pub/Sub.
Parameter | Type | Description |
---|---|---|
subscriptionId | STRING | Required, the unique identifier assigned to a Pub/Sub subscription. |
projectId | STRING | Required, the Google Cloud project ID associated with the Pub/Sub topic. |
topicId | STRING | Required, the ID or name of the Pub/Sub topic to subscribe to. |
clientEmail | STRING | The email address associated with a service account for authentication. |
clientId | STRING | The client ID associated with the service account for authentication. |
privateKeyId | STRING | The ID of the private key associated with the service account. |
privateKey | STRING | The private key associated with the service account for authentication. |
These arguments are used for further fine-tuning when reading from Pub/Sub:
Parameter | Type | Description |
---|---|---|
numFetchPartitions | STRING | Optional with default number of executors. The number of parallel Spark tasks that fetch records from a subscription. |
deleteSubscriptionOnStreamStop | BOOLEAN | Optional with default false . If set to true, the subscription passed to the stream is deleted when the streaming job ends. |
maxBytesPerTrigger | STRING | A soft limit for the batch size to be processed during each triggered micro-batch. The default is ‘none’. |
maxRecordsPerFetch | STRING | The number of records to fetch per task before processing records. The default is ‘1000’. |
maxFetchPeriod | STRING | The time duration for each task to fetch before processing records. The default is ’10s’. |
Returns
A table of Pub/Sub records with the following schema. The attributes column could be null but all other columns are not null.
Name | Data type | Nullable | Standard | Description |
---|---|---|---|---|
messageId | STRING | No | Unique identifier for the Pub/Sub message. | |
payload | BINARY | No | The content of the Pub/Sub message. | |
attributes | STRING | Yes | Key-value pairs representing the attributes of the Pub/Sub message. This is a json-encoded string. | |
publishTimestampInMillis | BIGINT | No | The timestamp when the message was published, in milliseconds. | |
sequenceNumber | BIGINT | No | The unique identifier of the record within its shard. |
Examples
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
clientEmail => secret(‘app-events’, ‘clientEmail’),
clientId => secret(‘app-events’, ‘clientId’),
privateKeyId => secret(‘app-events’, ‘privateKeyId’),
privateKey => secret(‘app-events’, ‘privateKey’)
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’
);
The data would now need to be queried from the testing.streaming_table
for further analysis.
Erroneous queries:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
maxRecordsPerFetchLimit => ‘1000001’
);