Dynamodb Stream s Lambda, SQS a Lambda

Integrace Amazon DynamoDB s AWS Lambda a Amazon Simple Queue Service (SQS) poskytuje robustní řešení pro zpracování změn dat v reálném čase. Pokud používáte dynamodb, obvykle používáte Stream pro zachycení změn v tabulce. Ale v reálném světě můžete použít pouze dva limity souběžnosti pro lambda funkci. Tato architektura je obzvláště užitečná při zpracování velkých objemů datových změn a zajištění efektivního škálování downstream zpracování. Použití AWS Lambda funkcí umožňuje událostmi řízené provádění, zatímco SQS nabízí spolehlivou službu fronty zpráv pro oddělení komponent a správu zátěže při zpracování. Toto nastavení je dále vylepšeno nasazením na platformě ARM pro zvýšení efektivity a úsporu nákladů a využitím knihovny AWS Lambda PowerTools pro lepší pozorovatelnost a provozní excelenci.
Typickými zákazníky, kteří často používají tento případ, jsou ti, kteří mají mnoho změn dat ve své dynamodb tabulce a chtějí je zpracovávat v reálném čase. Jedná se o běžný případ použití pro zákazníky, kteří chtějí zpracovávat změny dat v reálném čase, jako je aktualizace vyhledávacího indexu, odesílání oznámení nebo spouštění jiných pracovních postupů. Například se jedná o zdravotnické společnosti, finanční služby a e-commerce společnosti.

Obsah

  • Vytvoření DynamoDB tabulky
  • Vytvoření Lambda funkce
  • Vytvoření SQS fronty
  • Vytvoření Lambda funkce pro zpracování SQS fronty
  • Příklad Python kódu pro Lambda

Architektura demo stacku

Tento malý stack obsahuje DynamoDB tabulku, Lambda funkci, která zpracovává DynamoDB stream, SQS frontu a Lambda funkci, která zpracovává SQS frontu.

Dynamodb Stream s Lambda, SQS a Lambda

AWS CDK skript (TypeScript)

Nejprve musíme vytvořit nový CDK stack. Následující příklad ukazuje, jak vytvořit nový CDK stack s DynamoDB tabulkou, SQS frontou a dvěma Lambda funkcemi. První Lambda funkce zpracovává DynamoDB stream a zapisuje zprávy do SQS fronty, zatímco druhá Lambda funkce zpracovává SQS frontu.

import * as cdk from '@aws-cdk/core';
import * as dynamodb from '@aws-cdk/aws-dynamodb';
import * as lambda from '@aws-cdk/aws-lambda';
import * as sqs from '@aws-cdk/aws-sqs';
import * as sources from '@aws-cdk/aws-lambda-event-sources';

export class MyStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // Vytvoření DynamoDB tabulky
    const table = new dynamodb.Table(this, 'MyTable', {
      partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING },
      stream: dynamodb.StreamViewType.PYTHON_3_12,
    });

    // Vytvoření SQS fronty
    const queue = new sqs.Queue(this, 'MyQueue');

    // Vytvoření první Lambda funkce (DynamoDB Stream procesor)
    const dynamoLambda = new lambda.Function(this, 'DynamoLambda', {
      runtime: lambda.Runtime.PYTHON_3_12,
      handler: 'app.handler',
      code: lambda.Code.fromAsset('lambda'),
      memorySize: 256,
      environment: {
        QUEUE_URL: queue.queueUrl,
      },
    });

    // Udělení oprávnění Lambda funkci pro zápis do SQS fronty
    queue.grantSendMessages(dynamoLambda);

    // Nastavení DynamoDB streamu jako zdroje událostí
    dynamoLambda.addEventSource(new sources.DynamoEventSource(table, {
      startingPosition: lambda.StartingPosition.TRIM_HORIZON,
    }));

    // Vytvoření druhé Lambda funkce (SQS procesor)
    const sqsLambda = new lambda.Function(this, 'SqsLambda', {
      runtime: lambda.Runtime.PYTHON_3_12,
      handler: 'app.handler',
      code: lambda.Code.fromAsset('lambda'),
      memorySize: 256,
    });

    // Nastavení SQS fronty jako zdroje událostí
    sqsLambda.addEventSource(new sources.SqsEventSource(queue));
  }
}

Lambda funkce (Python)

AWS Lambda PowerTools je sada nástrojů pro AWS Lambda funkce, která usnadňuje implementaci osvědčených postupů pro logování, monitorování a trasování v serverless aplikacích. Je navržena tak, aby pomohla vývojářům s různými aspekty vývoje, nasazení a údržby aplikací založených na Lambda, zjednodušuje úkoly jako strukturované logování, sběr metrik a trasování napříč mikroslužbami. Tato část popisuje, jak AWS Lambda PowerTools může prospět vašim serverless aplikacím, zejména v kontextu architektury DynamoDB-Lambda-SQS-Lambda.

app.py

import os
import boto3
import json

from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response  # type: ignore
from aws_lambda_powertools.utilities.typing import LambdaContext  # type: ignore
from aws_lambda_powertools.logging.logger import Logger  # type: ignore

from lib.queue import MediaQueue

ENV = os.environ.get("ENV", "dev")
ssm = boto3.client("ssm")

processor = BatchProcessor(event_type=EventType.DynamoDBStreams)

logger = Logger(service="spaces_data_table_stream", level="DEBUG")

media_queue = MediaQueue(logger, ssm.get_parameter(
    Name=f"/{ENV}/media-table-process-queue/url")["Parameter"]["Value"])


def record_handler(record: dict):
    return media_queue.send_stream_data(record)


@logger.inject_lambda_context(clear_state=True)
def handler(event, context: LambdaContext):
    print(json.dumps(event))
    return process_partial_response(event=event, record_handler=record_handler, processor=processor, context=context)

/lib/queue.py

import boto3
import traceback
import uuid
import json
import time
from botocore.exceptions import ClientError

sqs = boto3.resource("sqs")

class MediaQueue:
    def __init__(self, logger, media_queue_name: str) -> None:
        self.logger = logger
        self.media_queue = sqs.Queue(media_queue_name)

    def send_message(self, body: dict) -> tuple[int, str]:
        try:
            self.media_queue.send_message(
                MessageBody=json.dumps(body),
                MessageGroupId=f"media_{str(uuid.uuid4())}",
                MessageDeduplicationId=str(uuid.uuid4())
            )
            return 200, f"Zpráva odeslána do fronty"
        
        except ClientError as e:
            if e.response['Error']['Code'] == 'ThrottlingException':
                time.sleep(1)
                self.send_message(body)
        
        except Exception as e:
            traceback.print_exc()
            self.logger.error(e)
            return 500, f"{e.__dict__} {e.with_traceback(None)}"

    def send_stream_data(self, record: dict) -> tuple[int, str]:
        message = {
            "action": record.get("eventName", "unknown"),
            "body": record.get("dynamodb", {})
        }
        return self.send_message(message)

Copyright © 2024. All rights reserved.