Skip to main content

Monitoring Structured Streaming queries on Databricks

Databricks provides built-in monitoring for Structured Streaming applications through the Spark UI under the Streaming tab.

Distinguish Structured Streaming queries in the Spark UI

Provide your streams a unique query name by adding .queryName(<query-name>) to your writeStream code to easily distinguish which metrics belong to which stream in the Spark UI.

Push Structured Streaming metrics to external services

Streaming metrics can be pushed to external services for alerting or dashboarding use cases by using Apache Spark‘s Streaming Query Listener interface. In Databricks Runtime 11.3 LTS and above, StreamingQueryListener is available in Python and Scala.

important

The following limitations apply for workloads using Unity Catalog-enabled compute access modes:

  • StreamingQueryListener requires Databricks Runtime 15.1 or above to use credentials or interact with objects managed by Unity Catalog on compute with dedicated access mode.
  • StreamingQueryListener requires Databricks Runtime 16.1 or above for Scala workloads configured with standard access mode (formerly shared access mode).
note

Processing latency with listeners can significantly affect query processing speeds. It’s advised to limit processing logic in these listeners and opt for writing to fast-response systems like Kafka for efficiency.

The following code provides basic examples of the syntax for implementing a listener:

Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}

/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}

/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}

/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Defining observable metrics in Structured Streaming

Observable metrics are named arbitrary aggregate functions that can be defined on a query (DataFrame). As soon as the execution of a DataFrame reaches a completion point (that is, finishes a batch query or reaches a streaming epoch), a named event is emitted that contains the metrics for the data processed since the last completion point.

You can observe these metrics by attaching a listener to the Spark session. The listener depends on the execution mode:

  • Batch mode: Use QueryExecutionListener.

    QueryExecutionListener is called when the query completes. Access the metrics using the QueryExecution.observedMetrics map.

  • Streaming, or microbatch: Use StreamingQueryListener.

    StreamingQueryListener is called when the streaming query completes an epoch. Access the metrics using the StreamingQueryProgress.observedMetrics map. Databricks does not support continuous execution streaming.

For example:

Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})

StreamingQueryListener object metrics

MetricDescription
idA unique query ID that persists across restarts.
runIdA query id that is unique for every start/restart. See StreamingQuery.runId().
nameThe user-specified name of the query. Name is null if no name is specified.
timestampThe timestamp for the execution of the microbatch.
batchIdA unique ID for the current batch of data being processed. In the case of retries after a failure, a given batch ID may be executed more than once. Similarly, when there is no data to be processed, the batch ID is not incremented.
numInputRowsThe aggregate (across all sources) number of records processed in a trigger.
inputRowsPerSecondThe aggregate (across all sources) rate of arriving data.
processedRowsPerSecondThe aggregate (across all sources) rate at which Spark is processing data.

durationMs object

Information about the time it takes to complete various stages of the microbatch execution process.

MetricDescription
durationMs.addBatchThe time taken to execute the microbatch. This excludes the time Spark takes to plan the microbatch.
durationMs.getBatchThe time it takes to retrieve the metadata about the offsets from the source.
durationMs.latestOffsetThe latest offset consumed for the microbatch. This progress object refers to the time taken to retrieve the latest offset from sources.
durationMs.queryPlanningThe time taken to generate the execution plan.
durationMs.triggerExecutionThe time it takes to plan and execute the microbatch.
durationMs.walCommitThe time taken to commit the new available offsets.

eventTime object

Information about the event time value seen within the data being processed in the microbatch. This data is used by the watermark to figure out how to trim the state for processing stateful aggregations defined in the Structured Streaming job.

MetricDescription
eventTime.avgThe average event time seen in that trigger.
eventTime.maxThe maximum event time seen in that trigger.
eventTime.minThe minimum event time seen in that trigger.
eventTime.watermarkThe value of the watermark used in that trigger.

stateOperators object

Information about the stateful operations that are defined in the Structured Streaming job and the aggregations that are produced from them.

MetricDescription
stateOperators.operatorNameThe name of the stateful operator to which the metrics relate, such as symmetricHashJoin, dedupe, stateStoreSave.
stateOperators.numRowsTotalThe total number of rows in state as a result of a stateful operator or aggregation.
stateOperators.numRowsUpdatedThe total number of rows updated in state as a result of a stateful operator or aggregation.
stateOperators.allUpdatesTimeMsThis metric is currently not measurable by Spark and is planned to be removed in future updates.
stateOperators.numRowsRemovedThe total number of rows removed from state as a result of a stateful operator or aggregation.
stateOperators.allRemovalsTimeMsThis metric is currently not measurable by Spark and is planned to be removed in future updates.
stateOperators.commitTimeMsThe time taken to commit all updates (puts and removes) and return a new version.
stateOperators.memoryUsedBytesMemory used by the state store.
stateOperators.numRowsDroppedByWatermarkThe number of rows that are considered too late to be included in a stateful aggregation. Streaming aggregations only: The number of rows dropped post-aggregation (not raw input rows). This number is not precise, but provides an indication that there is late data being dropped.
stateOperators.numShufflePartitionsThe number of shuffle partitions for this stateful operator.
stateOperators.numStateStoreInstancesThe actual state store instance that the operator has initialized and maintained. For many stateful operators, this is the same as the number of partitions. However, stream-stream joins initialize four state store instances per partition.

stateOperators.customMetrics object

Information collected from RocksDB capturing metrics about its performance and operations with respect to the stateful values it maintains for the Structured Streaming job. For more information, see Configure RocksDB state store on Databricks.

MetricDescription
customMetrics.rocksdbBytesCopiedThe number of bytes copied as tracked by the RocksDB File Manager.
customMetrics.rocksdbCommitCheckpointLatencyThe time in milliseconds taking a snapshot of native RocksDB and write it to a local directory.
customMetrics.rocksdbCompactLatencyThe time in milliseconds compacting (optional) during the checkpoint commit.
customMetrics.rocksdbCommitFileSyncLatencyMsThe time in milliseconds syncing the native RocksDB snapshot to external storage (the checkpoint location).
customMetrics.rocksdbCommitFlushLatencyThe time in milliseconds flushing the RocksDB in-memory changes to the local disk.
customMetrics.rocksdbCommitPauseLatencyThe time in milliseconds stopping the background worker threads as part of the checkpoint commit, such as for compaction.
customMetrics.rocksdbCommitWriteBatchLatencyThe time in milliseconds applying the staged writes in in-memory structure (WriteBatch) to native RocksDB.
customMetrics.rocksdbFilesCopiedThe number of files copied as tracked by the RocksDB File Manager.
customMetrics.rocksdbFilesReusedThe number of files reused as tracked by the RocksDB File Manager.
customMetrics.rocksdbGetCountThe number of get calls to the DB (does not include gets from WriteBatch - in-memory batch used for staging writes).
customMetrics.rocksdbGetLatencyThe average time in nanoseconds for the underlying native RocksDB::Get call.
customMetrics.rocksdbReadBlockCacheHitCountThe count of cache hits from the block cache in RocksDB that are useful in avoiding local disk reads.
customMetrics.rocksdbReadBlockCacheMissCountThe count of the block cache in RocksDB is not useful in avoiding local disk reads.
customMetrics.rocksdbSstFileSizeThe size of all Static Sorted Table (SST) file - the tabular structure RocksDB uses to store data.
customMetrics.rocksdbTotalBytesReadThe number of uncompressed bytes read by get operations.
customMetrics.rocksdbTotalBytesReadByCompactionThe number of bytes that the compaction process reads from the disk.
customMetrics.rocksdbTotalBytesReadThroughIteratorThe total number of bytes of uncompressed data read using an iterator. Some stateful operations (for example, timeout processing in FlatMapGroupsWithState and watermarking) require reading data in DB through an iterator.
customMetrics.rocksdbTotalBytesWrittenThe total number of uncompressed bytes written by put operations.
customMetrics.rocksdbTotalBytesWrittenByCompactionThe total number of bytes the compaction process writes to the disk.
customMetrics.rocksdbTotalCompactionLatencyMsThe time in milliseconds for RocksDB compactions, including background compactions and the optional compaction initiated during the commit.
customMetrics.rocksdbTotalFlushLatencyMsThe total flush time, including background flushing. Flush operations are processes by which the MemTable is flushed to storage once it’s full. MemTables are the first level where data is stored in RocksDB.
customMetrics.rocksdbZipFileBytesUncompressedThe size in bytes of the uncompressed zip files as reported by the File Manager. The File Manager manages the physical SST file disk space utilization and deletion.

sources object (Kafka)

MetricDescription
sources.descriptionA detailed description of the Kafka source, specifying the exact Kafka topic being read from. For example: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.
sources.startOffset objectThe starting offset number within the Kafka topic at which the streaming job started.
sources.endOffset objectThe last offset processed by the microbatch. This could be equal to latestOffset for an ongoing microbatch execution.
sources.latestOffset objectThe latest offset figured by the microbatch. The microbatching process might not process all offsets when there is throttling, which results in endOffset and latestOffset differiong.
sources.numInputRowsThe number of input rows processed from this source.
sources.inputRowsPerSecondThe rate at which data is arriving for processing from this source.
sources.processedRowsPerSecondThe rate at which Spark is processing data from this source.

sources.metrics object (Kafka)

MetricDescription
sources.metrics.avgOffsetsBehindLatestThe average number of offsets that the streaming query is behind the latest available offset among all the subscribed topics.
sources.metrics.estimatedTotalBytesBehindLatestThe estimated number of bytes that the query process has not consumed from the subscribed topics.
sources.metrics.maxOffsetsBehindLatestThe maximum number of offsets that the streaming query is behind the latest available offset among all the subscribed topics.
sources.metrics.minOffsetsBehindLatestThe minimum number of offsets that the streaming query is behind the latest available offset among all the subscribed topics.

sink object (Kafka)

MetricDescription
sink.descriptionThe description of the Kafka sink to which the streaming query is writing, detailing the specific Kafka sink implementation being used. For example: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.
sink.numOutputRowsThe number of rows that were written to the output table or sink as part of the microbatch. For some situations, this value can be “-1” and generally can be interpreted as “unknown”.

sources object (Delta Lake)

MetricDescription
sources.descriptionThe description of the source from which the streaming query is reading from. For example: “DeltaSource[table]”.
sources.[startOffset/endOffset].sourceVersionThe version of serialization with which this offset is encoded.
sources.[startOffset/endOffset].reservoirIdThe ID of the table being read. This is used to detect misconfiguration when restarting a query.
sources.[startOffset/endOffset].reservoirVersionThe version of the table that is currently processing.
sources.[startOffset/endOffset].indexThe index in the sequence of AddFiles in this version. This is used to break large commits into multiple batches. This index is created by sorting on modificationTimestamp and path.
sources.[startOffset/endOffset].isStartingVersionIdentifies whether current offset marks the start of a new streaming query rather than the processing of changes that occurred after the initial data was processed. When starting a new query, all data present in the table at the start is processed first, and then any new data that arrives.
sources.latestOffsetThe latest offset processed by the microbatch query.
sources.numInputRowsThe number of input rows processed from this source.
sources.inputRowsPerSecondThe rate at which data is arriving for processing from this source.
sources.processedRowsPerSecondThe rate at which Spark is processing data from this source.
sources.metrics.numBytesOutstandingThe combined size of the outstanding files (files tracked by RocksDB). This is the backlog metric for Delta and Auto Loader as the streaming source.
sources.metrics.numFilesOutstandingThe number of outstanding files to be processed. This is the backlog metric for Delta and Auto Loader as the streaming source.

sink object (Delta Lake)

MetricDescription
sink.descriptionThe description of the Delta sink, detailing the specific Delta sink implementation being used. For example: “DeltaSink[table]”.
sink.numOutputRowsThe number of rows is always “-1” because Spark can’t infer output rows for DSv1 sinks, which is the classification for the Delta Lake sink.

sources object (Kinesis)

MetricDescription
descriptionThe description of the Kinesis source, specifying the exact Kinesis stream from which the streaming query is reading. For example: “KinesisV2[stream]”.

sources metrics object (Kinesis)

MetricDescription
avgMsBehindLatestThe average number of milliseconds a consumer has fallen behind the beginning of a stream.
maxMsBehindLatestThe maximum number of milliseconds a consumer has fallen behind the beginning of a stream.
minMsBehindLatestThe minimum number of milliseconds a consumer has fallen behind the beginning of a stream.
totalPrefetchedBytesThe number of bytes left to process. This is the backlog metric for Kinesis as a source.

For more information, see What metrics does Kinesis report?.

Examples

Example Kafka-to-Kafka StreamingQueryListener event

Python
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}

Example Delta Lake-to-Delta Lake StreamingQueryListener event

Python
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}

Example Kinesis-to-Delta Lake StreamingQueryListener event

Python
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}

Example Kafka+Delta Lake-to-Delta Lake StreamingQueryListener event

Python
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}

Example rate source to Delta Lake StreamingQueryListener event

Python
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}