Optimized S3 File Source with SQS

The Databricks S3-SQS connector uses Amazon Simple Queue Service (SQS) to provide an optimized S3 source that lets you find new files written to an S3 bucket without repeatedly listing all of the files. This provides two major advantages:

  • Lower latency: no need to list large buckets on S3, which is slow and resource intensive.
  • Lower costs: no more costly LIST API requests made to S3.

Warning

The S3-SQS source deletes messages from the SQS queue as events are acknowledged. If you would like to have other pipelines consuming from the same queue, set up a separate SQS queue for the optimized reader. You can use SNS to publish messages to multiple SQS queues.

Use the S3-SQS file source

To use the S3-SQS file source you must:

  • Set up event notifications and route them to SQS. See Configuring Amazon S3 Event Notifications.

  • Specify the fileFormat and queueUrl options and a schema. For example:

    spark.readStream \
      .format("s3-sqs") \
      .option("fileFormat", "json") \
      .option("queueUrl", ...) \
      .schema(...) \
      .load()
    

Note

Trigger.Once() is supported with this source since Databricks Runtime 4.2.

Authenticate with Amazon SQS and S3

Databricks uses Amazon’s default credential provider chain for authentication to SQS. We recommend that you launch your Databricks clusters with an IAM role that can access SQS and your S3 bucket.

This source requires sqs:ReceiveMessage, sqs:DeleteMessage, and s3:GetObject permissions. If you experience Amazon: Access Denied exceptions, check that your user or profile has these permissions. See Using Identity-Based (IAM) Policies for Amazon SQS and Bucket Policy Examples for details.

Configuration

Option Value Default Description
queueUrl The URL string for the queue. None (required param) The URL of the SQS queue.
fileFormat Supported file data sources. None (required param) The format of the files such as parquet, json, csv, text, and so on.
region Region for the SQS queue. Locally resolved region The region the queue is defined in.
sqsFetchInterval A duration string, for example, 2m for 2 minutes. "5s"

How long to wait in between fetches if the queue is empty. AWS charges per API request to SQS. Therefore if data isn’t arriving frequently, this value can be set to a long duration. SQS supports long polling with a max duration of 20 seconds. If this value is set longer than 20 seconds, we sleep for the remaining duration. As long as the queue is not empty, we fetch continuously.

If new files are created every 5 minutes, such as with Kinesis Firehose or CloudTrail logs, to reduce SQS costs you might want to set a high sqsFetchInterval.

pathRewrites A JSON string. "{}" If you use mount points, you can rewrite the prefix of the bucket/key path with the mount point. Only prefixes can be rewritten. For example, for the configuration {"<databricks-mounted-bucket>/path": "/mnt/data-warehouse"}, the path <databricks-mounted-bucket>/path/2017/08/fileA.json is rewritten to /mnt/data-warehouse/2017/08/fileA.json.
ignoreFileDeletion Boolean false If you have lifecycle configurations or you delete the source files manually, you must set this option to true.
maxFileAge Integer 25200 Determines how long (in seconds) file notifications are stored as state to prevent duplicate processing.
allowOverwrites Boolean true Whether a blob that gets overwritten should be reprocessed.

If you observe a lot of messages in the driver logs that look like Fetched 0 new events and 3 old events., where you tend to observe a lot more old events than new events, you can either reduce the trigger interval of your stream or increase the visibility timeout in your SQS queue.

Frequently asked questions (FAQ)

An SQS queue URL already has region endpoint in it, so region field doesn’t need to be set, correct?
You must explicitly set the region if your SQS queue is not in the same region as your Spark cluster.
sqsFetchInterval
  • If the value is less than 20 seconds, do we set the SQS long polling timeout to be that specific value? Yes.
  • If the value is greater than 20 seconds and if there’s data in the queue, do we keep creating SQS long polling requests with 20 second timeouts? We will make requests with long polling set to 20 seconds, but SQS will return immediately. You won’t wait 20 seconds.
  • If the value is greater than 20 seconds and if the queue is empty, do we create long polling requests with a 20 second timeout after the specified interval? We will make requests with long polling set to 20 seconds. If SQS doesn’t return anything, we will sleep the rest of the interval. We will not make any more requests to SQS for the duration of the interval, since SQS charges per REST API call.
If ignoreFileDeletion is False (default) and the object has been deleted, will it fail the whole pipeline?
Yes, if we receive an event stating that the file was deleted, it will fail the whole pipeline.
How should I set maxFileAge?
SQS provides at-least-once message delivery semantics, therefore we need to keep state for deduplication. The default setting for maxFileAge is 7 days, which is greater than the default TTL of a message in SQS, which is 4 days. If you set the retention duration of a message in your queue to be higher, set this configuration accordingly.