Using SQS to schedule a task with delays until the next action allows one-second granularity of delays and allows simple horizontal scalability because we are processing each task independently.
Service Limitations
You can schedule the next task not later, than 15 minutes. This is caused by SQS Message DelaySeconds
limitation which is Maximum: 15 minutes
.
Service Configuration
Sample CDK resources:
const taskQueue = new Queue(this, 'TaskQueue', { // Default Delay between task execution deliveryDelay: Duration.seconds(1), // Default Timeout derived from Lambda max excution time visibilityTimeout: Duration.minutes(15), }); const eventsAsset = Code.fromAsset('../lambda/lib/function/events'); const startNotification = new Function(this, 'StartNotification', { code: eventsAsset, runtime: Runtime.NODEJS_12_X, handler: 'index.startNotification', layers: [this.sharedLayer], environment: { TASK_QUEUE_URL: taskQueue.queueUrl, }, }); taskQueue.grantSendMessages(startNotification) const processNotification = new Function(this, 'ProcessNotification', { code: eventsAsset, handler: 'index.processNotification', runtime: Runtime.NODEJS_12_X, layers: [this.sharedLayer], events: [new SqsEventSource(taskQueue, {batchSize: 1})], }); taskQueue.grantConsumeMessages(processNotification)
Deploy
cdk deploy
Usage
Sample code to schedule follow-up tasks with delays (StartNotification
lambda):
type NotificationRequest = { notificationId: string; message: string; recipients: string[]; responseTimeout: number; maxRetries: number; }; type NotificationTask = { request: NotificationRequest; currentRecipient: number | null; currentRetries: number | null; }; export async function startNotification(request: NotificationRequest): Promise<void> { const sqs = new SQS(); const queueUrl = process.env.TASK_QUEUE_URL!; const task: NotificationTask = { request: request, currentRecipient: 0, currentRetries: 0, }; // Message template const params = { QueueUrl: queueUrl, MessageAttributes: { // Put notification ID in attributes to avoid redundant body parsing notificationId: { DataType: 'String', StringValue: request.notificationId, }, }, }; let messageDelay = 0; for (const recipient of request.recipients) { task.currentRecipient = request.recipients.indexOf(recipient); task.currentRetries = 0; while (task.currentRetries < request.maxRetries) { const message: SQS.Types.SendMessageRequest = Object.assign(params, { DelaySeconds: messageDelay, MessageBody: JSON.stringify(task), }); await sqs.sendMessage(message).promise(); messageDelay += request.responseTimeout; task.currentRetries++; } } // Last follow-up task will set correct notification status in case of timeout task.currentRecipient = null; task.currentRetries = null; const message: SQS.Types.SendMessageRequest = Object.assign(params, { DelaySeconds: messageDelay, MessageBody: JSON.stringify(task), }); await sqs.sendMessage(message).promise(); }
Sample code for task processing (ProcessNotification
lambda):
export async function processNotification(event: SQSEvent, context: any): Promise<void> { event.Records.forEach(record => { // Use notification ID from SQS message attributes instead of parsing JSON body const notificationId = record.messageAttributes.notificationId.stringValue!; // Query persistent store to check if notification still in progress if (notificationInProgress(notificationId)) { // continue with parsing message body const task = <NotificationTask>JSON.parse(record.body); // Check if it is the last follow-up task if (task.currentRecipient === null) { console.debug('Terminating notification: no recipients left', task); // Set notification status as terminated in persistent store // terminateNotification(id: notificationId, reason: 'No response') return; } // Send notification to current recipient const email = task.request.recipients[task.currentRecipient]; const subject = `Notification attempt #${task.currentRetries! + 1}`; console.debug('Sending email to recipient', email, subject, task.request.message); } }); }
Testing
Manual invocation:
aws lambda invoke --cli-binary-format raw-in-base64-out --function-name <LAMBDA_FUNCTION_NAME> --payload '<JSON_PAYLOAD>' response.json
LAMBDA_FUNCTION_NAME
- is the full name or ARN of the StartNotification
lambda
JSON_PAYLOAD
- is a sample JSON object of NotificationRequest
type
{"notificationId": "1", "maxRetries": 2, "responseTimeout": 10, "message": "sample message", "recipients": ["one@example.com", "two@example.com"] }
Logging
Use standard CloudWatch log group for Lambda:
arn:aws:logs:<...>:log-group:/aws/lambda/<...>-ProcessNotification<...>
Comments
0 comments
Please sign in to leave a comment.