In this blog post, we’ll be looking at how one could leverage AWS Simple Queue Service (Standard queue) to achieve high concurrency while processing with no duplicates. Also we compare it with other AWS services like DynamoDB, SQS FIFO queue and Kinesis in terms of cost and performance.
A simple use case for the below architecture could be building an end-end messaging service, or sending out transactional emails. In both the above use cases, a highly concurrent processing with no duplicates is needed.
We have a Lambda function that writes messages to the Standard queue in batches. This writer function is invoked when a file is posted to S3. While there are messages in the queue, Lambda polls the queue, reads messages in batches and invokes the Processor function synchronously with an event that contains queue messages. The processing function is invoked once for each batch. When the function successfully processes a batch, Lambda deletes its messages from the queue. If at all the function fails processing(raise error) the batch is put back in the queue. Now, the Standard queue is configured with redrive policy to move messages to a Dead Letter Queue (DLQ) when receive request reaches the Maximum receive count(MRC). We set the MRC to 1 to ensure deduplication.
Setting up Standard Queue with Dead Letter Queue
We need two queues one for processing, second for moving failed messages into it. First create the failed_messages queue. As it is needed while creating the message processing queue. Create a new queue, give it a name (failed_messages), select type as Standard and choose Configure Queue.
According to the needs, set the queue attributes like visibility timeout, message retention period etc.
For processing messages, Create a new queue, give it a name, select type as standard and choose Configure Queue.
Set the Default Visibility Timeout to 5min and Dead Letter Queue Settings to setup the redrive policy to move failed messages into failed_messages queue created earlier.
From the SQS homepage, select processing queue, and select Redrive Policy, If setup correctly you should see the ARN of failed_messages queue there.
Creating the Writer and Processor lambda functions:
Writer.py
# Write batch messages to queue
import csv
import boto3
s3 = boto3.resource('s3')
# Update this dummy URL
processing_queue_url = "https://sqs.us-west-2.amazonaws.com/85XXXXXXX205/ToBeProcessed"
def lambda_handler(event, context):
try:
if 'Records' in event:
bucket_name = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']
bucket = s3.Bucket(bucket_name)
obj = bucket.Object(key=key)
# get the object
response = obj.get()['Body'].read().decode('utf-8').split('\n')
resp = list(response)
if resp[-1] == '':
#removing header metadata and extra newline
total_records = len(resp) - 2
else:
#removing header metadata
total_records = len(resp) - 1
print("total record count is :", total_records)
batch_size = 0
record_count = 0
messages = []
# Write to SQS
for row in csv.DictReader(response):
record_count += 1
record = {}
for k,v in row.items():
record[k] = v
# Replace below with appropriate column with all values as unique
unique_id = record['ANY_COLUMN_WITH_ALL_VALUES_UNIQUE']
batch_size += 1
messages.append(
{
'Id': unique_id,
'MessageBody': json.dumps(record)
})
if (batch_size == 10):
batch_size = 0
try:
response = sqs.send_message_batch(
QueueUrl = processing_queue_url,
Entries = messages
)
print("response:", response)
if 'Failed' in response:
print('failed_count:', len(response['Failed']))
except Exception as e:
print("error:",e)
messages = []
# Handling last batch
if(record_count == total_records):
print("batch size is :", batch_size)
batch_size = 0
try:
response = sqs.send_message_batch(
QueueUrl = processing_queue_url,
Entries = messages
)
print("response:", response)
if 'Failed' in response:
print('failed count is :', len(response['Failed']))
except Exception as e:
print("error:",e)
messages = []
print('record count is :', record_count)
except Exception as e:
return e
Processor.py
# Process queue messages
def handler(event, context):
if 'Records' in event:
try:
messages = event['Records']
for message in messages:
print("message to be processed :", message)
result = message['body']
result = json.loads(result)
print("result:",result)
return {
'statusCode': 200,
'body': 'All messages processed successfully'
}
except Exception as e:
print(e)
return str(e)
Setting up S3 as trigger to Writer lambda
Setting up SQS trigger to processor Lambda
If set up properly, you should be able to view it in Lambda Triggers section from the SQS homepage like this.
The setup is done. To test this upload a .csv file to the S3 location.
SQS Standard Queue in comparison with FIFO queue
FIFO queue in SQS supports deduplication in two ways:
- Content based deduplication while writing to SQS.
- Processing one record/batch at a time.
Unlike Standard Queue, FIFO doesn’t support concurrency and lambda invocation. On top of all this there is a limit to how many messages you could write to FIFO queue in a second. FIFO queues are much suited when the order of processing is important.
Cost analysis:
First 1 million Amazon SQS requests are free each month.
Type | Cost per 1 million requests |
Standard Queue | $0.40 |
FIFO Queue | $0.50 |
More on SQS pricing here.
SQS Standard Queue in comparison with DynamoDB
DynamoDB streams are slow when compared SQS, and costs on various aspects like:
- Data Storage
- Writes
- Reads
- Provisioned throughput
- Reserved capacity
- Indexed data storage
- Streams and many more.
In a nutshell, DynamoDB’s monthly cost is dictated by data storage, writes and reads. The best use cases for DynamoDB are those that require a flexible data model, reliable performance, and the automatic scaling of throughput capacity.
SQS Standard Queue in comparison with Kinesis
Kinesis primary use case is collecting, storing and processing real-time continuous data streams. Kinesis is designed for large scale data ingestion and processing, with the ability to maximise write throughput for large volumes of data.
While a message queue makes it easy to decouple and scale micro-services, distributed systems, and serverless applications. Using a queue, you can send, store, and receive messages between software components at any volume, without losing messages or requiring other services to be always available. In a nutshell, Serverless applications are built using micro services, message queue serves as a reliable plumbing.
Drawbacks of Kinesis:
- Shard management
- Limited Read Throughput
For a much detailed comparison of SQS and Kinesis visit here.
Thanks for the read, I hope it was helpful.
This story is authored by Koushik. Koushik is a software engineer and a keen data science and machine learning enthusiast.
Comments
Why can’t we use S3 triggers to put messages in SQS, why we have to use S3->lambda->SQS,
Hello Sudhakar,
There isn’t any such S3 put file trigger, that allows us to directly read the CSV and insert rows into the SQS queue.
S3 supports triggers at an object/file level and not at a record/row level.
Let us say you have a CSV file with 100 records. When this file is put into S3, the trigger invokes write lambda. The write lambda reads the CSV file and writes records/messages into the SQS queue in batches, making them available for processing.