Headed to QCon 2024 in San Francisco? Book a meeting with us!

Integrating Amazon DynamoDB Streams with Momento, via Amazon EventBridge: Fully Automated via AWS CDK!

A full walk through of the Momento integration with Amazon EventBridge using AWS Cloud Development Kit.

Rishti Gupta
Author

Share

Explore the project’s GitHub repository for step-by-step setup instructions. Check out this blog post to see the web application in action.

In this blog post, we’ll walk through the integration of Amazon EventBridge with Momento using AWS CDK (Cloud Development Kit). We’ll use a sample application that stores weather data in DynamoDB and propagates changes using a combination of AWS services and Momento.

Our focus will be on automating the deployment of this setup through a CDK stack. Specifically, we’ll demonstrate how to listen for changes in a DynamoDB table, and route these events to Momento caches and topics using Amazon EventBridge.

The CDK Stack

The provided CDK stack integrates DynamoDB, Amazon EventBridge, and Momento. Let’s break it down step-by-step.

1. Define DynamoDB Table We start by creating a DynamoDB table to store weather statistics. The table is configured with a stream to capture changes:

const weatherStatsDemoTable = new dynamodb.Table(this, "weather-stats-demo-table", {
  tableName: "weather-stats-demo",
  partitionKey: { name: "Location", type: dynamodb.AttributeType.STRING },
  stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
  removalPolicy: cdk.RemovalPolicy.DESTROY,
});

2. Set Up the Connection for EventBridge Next, we create a connection to allow EventBridge to send requests to Momento. This connection uses an API key stored in AWS Secrets Manager for authorization:

const connection = new events.Connection(this, 'weather-stats-demo-connection', {
connectionName: 'weather-stats-demo-connection',
authorization: events.Authorization.apiKey('Authorization', cdk.SecretValue.secretsManager(apiKeySecret.secretName)),
description: 'Connection with API Key Authorization',
});

This connection is crucial for EventBridge to authenticate requests to Momento securely.

Note: Momento is now featured as a partner API destination in Amazon EventBridge! 🎉 If you prefer to set it up manually through the AWS console rather than using the CDK code for deployment, you can follow this guide to get started.

screenshot of picking Momento in the AWS console

3. Setup API Destinations for Momento API Destinations in EventBridge allow you to route events to external services. Here, we set up API Destinations for different Momento operations:

  • Cache Put Operation: Sets data in Momento’s cache.
  • Topic Publish Operation: Publishes events to Momento topics.
  • Cache Delete Operation: Removes data from Momento’s cache.
// Define the API destination for the cache put operation.
    const cachePutApiDestination = new events.ApiDestination(this, "weather-stats-demo-cache-put-api-destination", {
      apiDestinationName: "weather-stats-demo-cache-put-api-destination",
      connection,
      endpoint: `${momentoApiEndpointParameter.valueAsString}/cache/*`,
      description: "Cache Set API",
      httpMethod: events.HttpMethod.PUT,
    });

    // Define the API destination for the topic publish operation
    const topicPublishApiDestination = new events.ApiDestination(this, "weather-stats-demo-topic-publish-api-destination", {
      apiDestinationName: "weather-stats-demo-topic-publish-api-destination",
      connection,
      endpoint: `${momentoApiEndpointParameter.valueAsString}/topics/*/*`,
      description: "Topic Publish API",
      httpMethod: events.HttpMethod.POST,
    });

    // Define the API destination for the cache delete operation
    const cacheDeleteApiDestination = new events.ApiDestination(this, "weather-stats-demo-cache-delete-api-destination", {
      apiDestinationName: "weather-stats-demo-cache-delete-api-destination",
      connection,
      endpoint: `${momentoApiEndpointParameter.valueAsString}/cache/*`,
      description: "Cache Delete API",
      httpMethod: events.HttpMethod.DELETE,
    });

4. Implementing a Dead Letter Queue (DLQ)

A Dead Letter Queue (DLQ) ensuring that events that fail to be processed are not lost. Instead of being discarded, these failed events are sent to a DLQ, where they can be reviewed and retried later.

In our stack, we configure a DLQ for each EventBridge Pipe to handle any events that couldn’t be successfully processed by the target API destinations:

const deadLetterQueue = new sqs.Queue(this, "DeadLetterQueue", {
      queueName: "weather-stats-demo-dlq",
      retentionPeriod: cdk.Duration.days(14),
    });

5. Create EventBridge Pipes EventBridge Pipes connect event sources to targets. Here, three pipes are defined to handle different types of DynamoDB events:

  • Cache Put Pipe: Routes INSERT and MODIFY events to the cache put API.
  • Topic Publish Pipe: Routes events to the topic publish API.
  • Cache Delete Pipe: Routes REMOVE events to the cache delete API.
// Define the pipe for the cache put operation
    const cachePutCfnPipe = new pipes.CfnPipe(this, "weather-stats-demo-cache-put-pipe", {
      name: "weather-stats-demo-cache-put-pipe",
      desiredState: "RUNNING",
      source: weatherStatsDemoTable.tableStreamArn!,
      sourceParameters: {
        dynamoDbStreamParameters: {
          batchSize: 1,
          startingPosition: "LATEST",
          maximumRetryAttempts: 0,
          deadLetterConfig: {
            arn: deadLetterQueue.queueArn,
          },
        },
        filterCriteria: {
          filters: [{
            pattern: '{"eventName": ["INSERT", "MODIFY"]}',
          }],
        },
      },
      target: cachePutApiDestination.apiDestinationArn!,
      roleArn: role.roleArn,
      logConfiguration: {
        cloudwatchLogsLogDestination: {
          logGroupArn: logGroup.logGroupArn,
        },
        level: "INFO",
        includeExecutionData: ["ALL"],
      },
    });

    // Define the pipe for the topic publish operation
    const topicPublishCfnPipe = new pipes.CfnPipe(this, "weather-stats-demo-topic-publish-pipe", {
      name: "weather-stats-demo-topic-publish-pipe",
      desiredState: "RUNNING",
      source: weatherStatsDemoTable.tableStreamArn!,
      sourceParameters: {
        dynamoDbStreamParameters: {
          batchSize: 1,
          startingPosition: "LATEST",
          maximumRetryAttempts: 0,
          deadLetterConfig: {
            arn: deadLetterQueue.queueArn,
          },
        },
      },
      target: topicPublishApiDestination.apiDestinationArn!,
      roleArn: role.roleArn,
      logConfiguration: {
        cloudwatchLogsLogDestination: {
          logGroupArn: logGroup.logGroupArn,
        },
        level: "INFO",
        includeExecutionData: ["ALL"],
      },
    });

    // Define the pipe for the cache delete operation
    const cacheDeleteCfnPipe = new pipes.CfnPipe(this, "weather-stats-demo-cache-delete-pipe", {
      name: "weather-stats-demo-cache-delete-pipe",
      desiredState: "RUNNING",
      source: weatherStatsDemoTable.tableStreamArn!,
      sourceParameters: {
        dynamoDbStreamParameters: {
          batchSize: 1,
          startingPosition: "LATEST",
          maximumRetryAttempts: 0,
          deadLetterConfig: {
            arn: deadLetterQueue.queueArn,
          },
        },
        filterCriteria: {
          filters: [{
            pattern: '{"eventName": ["REMOVE"]}',
          }],
        },
      },
      target: cacheDeleteApiDestination.apiDestinationArn!,
      roleArn: role.roleArn,
      logConfiguration: {
        cloudwatchLogsLogDestination: {
          logGroupArn: logGroup.logGroupArn,
        },
        level: "INFO",
        includeExecutionData: ["ALL"],
      },
    });

    // Add target parameters to the pipes
    cachePutCfnPipe.targetParameters = {
      inputTemplate: "{\n  \"Location\": <$.dynamodb.Keys.Location.S>, \n  \"MaxTemp\": <$.dynamodb.NewImage.MaxTemp.N>,\n  \"MinTemp\": <$.dynamodb.NewImage.MinTemp.N>, \n  \"ChancesOfPrecipitation\": <$.dynamodb.NewImage.ChancesOfPrecipitation.N>\n}",
      httpParameters: {
        pathParameterValues: [cacheName],
        queryStringParameters: {
          key: "$.dynamodb.Keys.Location.S",
          ttl_seconds: "$.dynamodb.NewImage.TTL.N"
        },
      },
    };

    topicPublishCfnPipe.targetParameters = {
      inputTemplate: "{\n \"EventType\": <$.eventName>,  \"Location\": <$.dynamodb.Keys.Location.S>, \n  \"MaxTemp\": <$.dynamodb.NewImage.MaxTemp.N>,\n  \"MinTemp\": <$.dynamodb.NewImage.MinTemp.N>, \n  \"ChancesOfPrecipitation\": <$.dynamodb.NewImage.ChancesOfPrecipitation.N>\n}",
      httpParameters: {
        pathParameterValues: [cacheName, topicName],
      },
    };

    cacheDeleteCfnPipe.targetParameters = {
      httpParameters: {
        pathParameterValues: [cacheName],
        queryStringParameters: {
          key: "$.dynamodb.Keys.Location.S"
        },
      },
    };

We also add target parameters to these pipes to specify how data should be formatted before being sent to Momento.

6. Define IAM Role and Policies An IAM role is created with policies that allow EventBridge Pipes to access DynamoDB streams, API Destinations, and the Dead Letter Queue (DLQ) for handling failed events:

const role = new iam.Role(this, "AmazonEventBridgePipeWeatherStatsDemoEventToMomentoCache", {
  roleName: "AmazonEventBridgePipeWeatherStatsDemoEventToMomentoCache",
  assumedBy: new iam.ServicePrincipal("pipes.amazonaws.com"),
});
this.addPolicyForEventBridgeRole(role, cachePutApiDestination, cacheDeleteApiDestination, topicPublishApiDestination, weatherStatsDemoTable, deadLetterQueue);

7. Create Utility Resources We create several utility resources like secrets for API keys, CloudWatch Log Groups, and parameterized values for the Momento API:

const apiKeySecret = new Secret(this, 'MomentoEventbridgeApiKey', {
  secretName: 'momento-eventbridge-api-key',
  secretStringValue: new cdk.SecretValue(momentoApiKeyParameter.valueAsString),
});

Deploying the CDK code

First, clone the GitHub repository and cd into the EventBridge example:

git clone https://github.com/momentohq/client-sdk-javascript
cd examples/nodejs/aws/eventbridge

Before deploying, you’ll need the following:

  • Momento Cache: You will need a cache named momento-eventbridge-cache. You can create one using the momento console.
  • Momento API Key: This can also be created using momento console. Make sure the API Key is created in the same region as the cache!
  • HTTP API endpoint: You can copy this from the Momento console after creating the API Key, or refer to the Regions Section here in the documentation.
  • AWS Credentials: AccessKeyId, SecretAccessKey for your AWS account (and, optional, SessionToken if you are using temporary credentials).

Next, create a .env file in the root directory of the project with the following environment variables:

MOMENTO_API_KEY=<your-momento-api-key> 
MOMENTO_API_ENDPOINT=<your-momento-api-endpoint>
AWS_ACCESS_KEY_ID=<your-aws-access-key-id>
AWS_SECRET_ACCESS_KEY=<your-aws-secret-access-key>
AWS_REGION=<your-aws-region>
AWS_SESSION_TOKEN=<your-aws-session-token> # Optional, if you are using temporary credentials

The infrastructure directory contains a CDK application that defines the DynamoDB table, DynamoDB Stream, and EventBridge resources as discussed above in this blog. These are essential for the rest of the demo code, so you’ll need to deploy the CDK stack before running any other part of the project.

To deploy the CDK stack, run the following script:

./deploy-stack.sh

This will provision all the necessary AWS resources, and you’ll be ready to start running the demo code.

Conclusion

With this setup, you’ve now created an automated pipeline using AWS CDK that integrates DynamoDB, EventBridge, and Momento, allowing you to handle weather updates in real-time. This stack is a powerful example of how to build event-driven applications with a caching layer that scales effortlessly with demand.

Additional Resources

For a deep dive, explore the full sample application here.

To learn more about integrating Amazon EventBridge with Momento, visit the following resources:

Share