Dynamodb Stream s Lambda, SQS a Lambda
Integrace Amazon DynamoDB s AWS Lambda a Amazon Simple Queue Service (SQS) poskytuje robustní řešení pro zpracování změn dat v reálném čase. Pokud používáte dynamodb, obvykle používáte Stream pro zachycení změn v tabulce. Ale v reálném světě můžete použít pouze dva limity souběžnosti pro lambda funkci. Tato architektura je obzvláště užitečná při zpracování velkých objemů datových změn a zajištění efektivního škálování downstream zpracování. Použití AWS Lambda funkcí umožňuje událostmi řízené provádění, zatímco SQS nabízí spolehlivou službu fronty zpráv pro oddělení komponent a správu zátěže při zpracování. Toto nastavení je dále vylepšeno nasazením na platformě ARM pro zvýšení efektivity a úsporu nákladů a využitím knihovny AWS Lambda PowerTools pro lepší pozorovatelnost a provozní excelenci.
Typickými zákazníky, kteří často používají tento případ, jsou ti, kteří mají mnoho změn dat ve své dynamodb tabulce a chtějí je zpracovávat v reálném čase. Jedná se o běžný případ použití pro zákazníky, kteří chtějí zpracovávat změny dat v reálném čase, jako je aktualizace vyhledávacího indexu, odesílání oznámení nebo spouštění jiných pracovních postupů. Například se jedná o zdravotnické společnosti, finanční služby a e-commerce společnosti.
Obsah
- Vytvoření DynamoDB tabulky
- Vytvoření Lambda funkce
- Vytvoření SQS fronty
- Vytvoření Lambda funkce pro zpracování SQS fronty
- Příklad Python kódu pro Lambda
Architektura demo stacku
Tento malý stack obsahuje DynamoDB tabulku, Lambda funkci, která zpracovává DynamoDB stream, SQS frontu a Lambda funkci, která zpracovává SQS frontu.
AWS CDK skript (TypeScript)
Nejprve musíme vytvořit nový CDK stack. Následující příklad ukazuje, jak vytvořit nový CDK stack s DynamoDB tabulkou, SQS frontou a dvěma Lambda funkcemi. První Lambda funkce zpracovává DynamoDB stream a zapisuje zprávy do SQS fronty, zatímco druhá Lambda funkce zpracovává SQS frontu.
import * as cdk from '@aws-cdk/core';
import * as dynamodb from '@aws-cdk/aws-dynamodb';
import * as lambda from '@aws-cdk/aws-lambda';
import * as sqs from '@aws-cdk/aws-sqs';
import * as sources from '@aws-cdk/aws-lambda-event-sources';
export class MyStack extends cdk.Stack {
constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// Vytvoření DynamoDB tabulky
const table = new dynamodb.Table(this, 'MyTable', {
partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING },
stream: dynamodb.StreamViewType.PYTHON_3_12,
});
// Vytvoření SQS fronty
const queue = new sqs.Queue(this, 'MyQueue');
// Vytvoření první Lambda funkce (DynamoDB Stream procesor)
const dynamoLambda = new lambda.Function(this, 'DynamoLambda', {
runtime: lambda.Runtime.PYTHON_3_12,
handler: 'app.handler',
code: lambda.Code.fromAsset('lambda'),
memorySize: 256,
environment: {
QUEUE_URL: queue.queueUrl,
},
});
// Udělení oprávnění Lambda funkci pro zápis do SQS fronty
queue.grantSendMessages(dynamoLambda);
// Nastavení DynamoDB streamu jako zdroje událostí
dynamoLambda.addEventSource(new sources.DynamoEventSource(table, {
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
}));
// Vytvoření druhé Lambda funkce (SQS procesor)
const sqsLambda = new lambda.Function(this, 'SqsLambda', {
runtime: lambda.Runtime.PYTHON_3_12,
handler: 'app.handler',
code: lambda.Code.fromAsset('lambda'),
memorySize: 256,
});
// Nastavení SQS fronty jako zdroje událostí
sqsLambda.addEventSource(new sources.SqsEventSource(queue));
}
}
Lambda funkce (Python)
AWS Lambda PowerTools je sada nástrojů pro AWS Lambda funkce, která usnadňuje implementaci osvědčených postupů pro logování, monitorování a trasování v serverless aplikacích. Je navržena tak, aby pomohla vývojářům s různými aspekty vývoje, nasazení a údržby aplikací založených na Lambda, zjednodušuje úkoly jako strukturované logování, sběr metrik a trasování napříč mikroslužbami. Tato část popisuje, jak AWS Lambda PowerTools může prospět vašim serverless aplikacím, zejména v kontextu architektury DynamoDB-Lambda-SQS-Lambda.
app.py
import os
import boto3
import json
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response # type: ignore
from aws_lambda_powertools.utilities.typing import LambdaContext # type: ignore
from aws_lambda_powertools.logging.logger import Logger # type: ignore
from lib.queue import MediaQueue
ENV = os.environ.get("ENV", "dev")
ssm = boto3.client("ssm")
processor = BatchProcessor(event_type=EventType.DynamoDBStreams)
logger = Logger(service="spaces_data_table_stream", level="DEBUG")
media_queue = MediaQueue(logger, ssm.get_parameter(
Name=f"/{ENV}/media-table-process-queue/url")["Parameter"]["Value"])
def record_handler(record: dict):
return media_queue.send_stream_data(record)
@logger.inject_lambda_context(clear_state=True)
def handler(event, context: LambdaContext):
print(json.dumps(event))
return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context)
/lib/queue.py
import boto3
import traceback
import uuid
import json
import time
from botocore.exceptions import ClientError
sqs = boto3.resource("sqs")
class MediaQueue:
def __init__(self, logger, media_queue_name: str) -> None:
self.logger = logger
self.media_queue = sqs.Queue(media_queue_name)
def send_message(self, body: dict) -> tuple[int, str]:
try:
self.media_queue.send_message(
MessageBody=json.dumps(body),
MessageGroupId=f"media_{str(uuid.uuid4())}",
MessageDeduplicationId=str(uuid.uuid4())
)
return 200, f"Zpráva odeslána do fronty"
except ClientError as e:
if e.response['Error']['Code'] == 'ThrottlingException':
time.sleep(1)
self.send_message(body)
except Exception as e:
traceback.print_exc()
self.logger.error(e)
return 500, f"{e.__dict__} {e.with_traceback(None)}"
def send_stream_data(self, record: dict) -> tuple[int, str]:
message = {
"action": record.get("eventName", "unknown"),
"body": record.get("dynamodb", {})
}
return self.send_message(message)