We’re sorry we missed you at re:Invent, but we can still meet!

Amazon EventBridge経由でAmazon DynamoDBのストリームをMomentoと連携 ~ AWS CDKで完全自動化!

AWS CDKを使用したAmazon EventBridgeとMomentoの統合の完全なウォークスルー。

リシュティ・グプタ
著者

Share

プロジェクトの GitHubリポジトリで、ステップ・バイ・ステップのセットアップ手順をご覧ください。ウェブ・アプリケーションの動作については、このブログ記事 をご覧ください。

このブログでは、AWS CDK (Cloud Development Kit) を使って Amazon EventBridge と Momento の統合を説明します。気象データをDynamoDBに保存し、AWSサービスとMomentoを組み合わせて変更を伝播するサンプルアプリケーションを使用します。

CDKスタックを使ったデプロイの自動化に焦点を当てます。具体的には、DynamoDBテーブルの変更をリッスンし、Amazon EventBridgeを使用してこれらのイベントをMomento キャッシュトピックスにルーティングする方法を示します。

CDKスタック

提供されているCDKスタックは、DynamoDBAmazon EventBridge, Momentoを統合している。順を追って説明します。

1.DynamoDBテーブルの定義

まず、気象統計を格納するDynamoDBテーブルを作成する。このテーブルには、変更をキャプチャするためのストリームが設定されています:

const weatherStatsDemoTable = new dynamodb.Table(this, "weather-stats-demo-table", {
  tableName: "weather-stats-demo",
  partitionKey: { name: "Location", type: dynamodb.AttributeType.STRING },
  stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
  removalPolicy: cdk.RemovalPolicy.DESTROY,
});

2.EventBridgeの接続を設定する

次に、EventBridgeがMomentoにリクエストを送信するための接続を作成します。この接続では、AWS Secrets Manager に保存されている API キーを使用して認証を行います:

const connection = new events.Connection(this, 'weather-stats-demo-connection', {  connectionName: 'weather-stats-demo-connection',  authorization: events.Authorization.apiKey('Authorization', cdk.SecretValue.secretsManager(apiKeySecret.secretName)),  description: 'Connection with API Key Authorization',});

この接続は、EventBridgeがMomentoへのリクエストを安全に認証するために重要です。

注:Momentoは現在、Amazon EventBridgeのパートナーAPIデスティネーションとしてフィーチャーされています! もしCDKのコードを使ってデプロイするのではなく、AWSコンソールから手動でセットアップしたい場合は、このガイドに従って始めることができます。

3. MomentoのAPI Destinationsを設定する

EventBridgeのAPI Destinationsは、イベントを外部サービスにルーティングするためのものです。ここでは、Momentoの操作別にAPI Destinationsを設定する:

  • キャッシュプット操作: Momentoのキャッシュにデータをセットする。
  • トピック公開操作: イベントをMomentoトピックに公開します。
  • キャッシュ削除操作: Momentoのキャッシュからデータを削除します。
// Define the API destination for the cache put operation.
    const cachePutApiDestination = new events.ApiDestination(this, "weather-stats-demo-cache-put-api-destination", {
      apiDestinationName: "weather-stats-demo-cache-put-api-destination",
      connection,
      endpoint: `${momentoApiEndpointParameter.valueAsString}/cache/*`,
      description: "Cache Set API",
      httpMethod: events.HttpMethod.PUT,
    });

    // Define the API destination for the topic publish operation
    const topicPublishApiDestination = new events.ApiDestination(this, "weather-stats-demo-topic-publish-api-destination", {
      apiDestinationName: "weather-stats-demo-topic-publish-api-destination",
      connection,
      endpoint: `${momentoApiEndpointParameter.valueAsString}/topics/*/*`,
      description: "Topic Publish API",
      httpMethod: events.HttpMethod.POST,
    });

    // Define the API destination for the cache delete operation
    const cacheDeleteApiDestination = new events.ApiDestination(this, "weather-stats-demo-cache-delete-api-destination", {
      apiDestinationName: "weather-stats-demo-cache-delete-api-destination",
      connection,
      endpoint: `${momentoApiEndpointParameter.valueAsString}/cache/*`,
      description: "Cache Delete API",
      httpMethod: events.HttpMethod.DELETE,
    });

4.デッドレターキュー(DLQ)の実装

デッドレターキュー(DLQ)は、処理に失敗したイベントが失われないようにするものである。処理に失敗したイベントは破棄されるのではなく、DLQに送られ、そこで後で再試行することができます。

我々のスタックでは、各EventBridge Pipeに対してDLQを構成し、ターゲットAPIデスティネーションで正常に処理できなかったイベントを処理する:

const deadLetterQueue = new sqs.Queue(this, "DeadLetterQueue", {
      queueName: "weather-stats-demo-dlq",
      retentionPeriod: cdk.Duration.days(14),
    });

5. EventBridgeパイプの作成

EventBridgeパイプは、イベントソースとターゲットを接続します。ここでは、異なるタイプのDynamoDBイベントを処理するために、3つのパイプを定義しています:

  • キャッシュ・プット・パイプ:  INSERT  および  MODIFY イベントをキャッシュ put API にルーティングします。
  • トピック・パブリッシュ・パイプ: イベントをトピック・パブリッシュ API にルーティングします。
  • キャッシュ削除パイプ:  REMOVEイベントをキャッシュ削除 API にルーティングします。
// Define the pipe for the cache put operation
    const cachePutCfnPipe = new pipes.CfnPipe(this, "weather-stats-demo-cache-put-pipe", {
      name: "weather-stats-demo-cache-put-pipe",
      desiredState: "RUNNING",
      source: weatherStatsDemoTable.tableStreamArn!,
      sourceParameters: {
        dynamoDbStreamParameters: {
          batchSize: 1,
          startingPosition: "LATEST",
          maximumRetryAttempts: 0,
          deadLetterConfig: {
            arn: deadLetterQueue.queueArn,
          },
        },
        filterCriteria: {
          filters: [{
            pattern: '{"eventName": ["INSERT", "MODIFY"]}',
          }],
        },
      },
      target: cachePutApiDestination.apiDestinationArn!,
      roleArn: role.roleArn,
      logConfiguration: {
        cloudwatchLogsLogDestination: {
          logGroupArn: logGroup.logGroupArn,
        },
        level: "INFO",
        includeExecutionData: ["ALL"],
      },
    });

    // Define the pipe for the topic publish operation
    const topicPublishCfnPipe = new pipes.CfnPipe(this, "weather-stats-demo-topic-publish-pipe", {
      name: "weather-stats-demo-topic-publish-pipe",
      desiredState: "RUNNING",
      source: weatherStatsDemoTable.tableStreamArn!,
      sourceParameters: {
        dynamoDbStreamParameters: {
          batchSize: 1,
          startingPosition: "LATEST",
          maximumRetryAttempts: 0,
          deadLetterConfig: {
            arn: deadLetterQueue.queueArn,
          },
        },
      },
      target: topicPublishApiDestination.apiDestinationArn!,
      roleArn: role.roleArn,
      logConfiguration: {
        cloudwatchLogsLogDestination: {
          logGroupArn: logGroup.logGroupArn,
        },
        level: "INFO",
        includeExecutionData: ["ALL"],
      },
    });

    // Define the pipe for the cache delete operation
    const cacheDeleteCfnPipe = new pipes.CfnPipe(this, "weather-stats-demo-cache-delete-pipe", {
      name: "weather-stats-demo-cache-delete-pipe",
      desiredState: "RUNNING",
      source: weatherStatsDemoTable.tableStreamArn!,
      sourceParameters: {
        dynamoDbStreamParameters: {
          batchSize: 1,
          startingPosition: "LATEST",
          maximumRetryAttempts: 0,
          deadLetterConfig: {
            arn: deadLetterQueue.queueArn,
          },
        },
        filterCriteria: {
          filters: [{
            pattern: '{"eventName": ["REMOVE"]}',
          }],
        },
      },
      target: cacheDeleteApiDestination.apiDestinationArn!,
      roleArn: role.roleArn,
      logConfiguration: {
        cloudwatchLogsLogDestination: {
          logGroupArn: logGroup.logGroupArn,
        },
        level: "INFO",
        includeExecutionData: ["ALL"],
      },
    });

    // Add target parameters to the pipes
    cachePutCfnPipe.targetParameters = {
      inputTemplate: "{\n  \"Location\": <$.dynamodb.Keys.Location.S>, \n  \"MaxTemp\": <$.dynamodb.NewImage.MaxTemp.N>,\n  \"MinTemp\": <$.dynamodb.NewImage.MinTemp.N>, \n  \"ChancesOfPrecipitation\": <$.dynamodb.NewImage.ChancesOfPrecipitation.N>\n}",
      httpParameters: {
        pathParameterValues: [cacheName],
        queryStringParameters: {
          key: "$.dynamodb.Keys.Location.S",
          ttl_seconds: "$.dynamodb.NewImage.TTL.N"
        },
      },
    };

    topicPublishCfnPipe.targetParameters = {
      inputTemplate: "{\n \"EventType\": <$.eventName>,  \"Location\": <$.dynamodb.Keys.Location.S>, \n  \"MaxTemp\": <$.dynamodb.NewImage.MaxTemp.N>,\n  \"MinTemp\": <$.dynamodb.NewImage.MinTemp.N>, \n  \"ChancesOfPrecipitation\": <$.dynamodb.NewImage.ChancesOfPrecipitation.N>\n}",
      httpParameters: {
        pathParameterValues: [cacheName, topicName],
      },
    };

    cacheDeleteCfnPipe.targetParameters = {
      httpParameters: {
        pathParameterValues: [cacheName],
        queryStringParameters: {
          key: "$.dynamodb.Keys.Location.S"
        },
      },
    };

また、これらのパイプにターゲット・パラメータを追加して、Momento に送信する前にデータをどのようにフォーマットするかを指定する。

6. IAMロールとポリシーの定義

IAMロールは、EventBridge PipesがDynamoDBストリーム、API Destinations、および失敗したイベントを処理するためのDead Letter Queue (DLQ) にアクセスすることを許可するポリシーとともに作成されます:

const role = new iam.Role(this, "AmazonEventBridgePipeWeatherStatsDemoEventToMomentoCache", {
  roleName: "AmazonEventBridgePipeWeatherStatsDemoEventToMomentoCache",
  assumedBy: new iam.ServicePrincipal("pipes.amazonaws.com"),
});
this.addPolicyForEventBridgeRole(role, cachePutApiDestination, cacheDeleteApiDestination, topicPublishApiDestination, weatherStatsDemoTable, deadLetterQueue);

7.ユーティリティリソースの作成

APIキーのシークレット、CloudWatchロググループ、Momento APIのパラメータ化された値など、いくつかのユーティリティリソースを作成します:

const apiKeySecret = new Secret(this, 'MomentoEventbridgeApiKey', {
  secretName: 'momento-eventbridge-api-key',
  secretStringValue: new cdk.SecretValue(momentoApiKeyParameter.valueAsString),
});

CDKコードのデプロイ

まず、GitHubのリポジトリをクローンし、EventBridgeのサンプルにcdします:

git clone https://github.com/momentohq/client-sdk-javascript
cd examples/nodejs/aws/eventbridge

デプロイする前に、以下のものが必要です:

  • Momentoキャッシュ: momento-eventbridge-cacheというキャッシュが必要です。Momento コンソール.で作成できます。
  • Momento APIキー: これもMomento コンソール.で作成できます。APIキーがキャッシュと同じリージョンに作成されていることを確認してください! 
  • HTTP API エンドポイント: API Key を作成した後、Momento コンソールからコピーするか、 ドキュメントのリージョンセクションを参照してください。.
  • AWS 認証情報: AWS アカウントの AccessKeyId、SecretAccessKey (および、一時的な認証情報を使用している場合はオプションで SessionToken)。

次に、プロジェクトのルート・ディレクトリに.envファイルを作成し、以下の環境変数を設定する:

MOMENTO_API_KEY=<your-momento-api-key> 
MOMENTO_API_ENDPOINT=<your-momento-api-endpoint>
AWS_ACCESS_KEY_ID=<your-aws-access-key-id>
AWS_SECRET_ACCESS_KEY=<your-aws-secret-access-key>
AWS_REGION=<your-aws-region>
AWS_SESSION_TOKEN=<your-aws-session-token> # Optional, if you are using temporary credentials

infrastructure ディレクトリには、このブログで前述したDynamoDBテーブル、DynamoDB Stream、EventBridgeリソースを定義したCDKアプリケーションが含まれています。これらは、残りのデモコードに不可欠なものなので、プロジェクトの他の部分を実行する前に、CDKスタックをデプロイする必要があります。

CDKスタックをデプロイするには、以下のスクリプトを実行します:

./deploy-stack.sh

これで必要なAWSリソースがすべてプロビジョニングされ、デモコードの実行を開始する準備が整う。 

結論

このセットアップにより、DynamoDB、EventBridge、Momentoを統合したAWS CDKを使用した自動化パイプラインが作成され、天候の更新をリアルタイムで処理できるようになりました。このスタックは、需要に応じて無理なくスケールするキャッシュレイヤーを備えたイベント駆動型アプリケーションを構築する方法の強力な例です。

その他のリソース

サンプル・アプリケーションの詳細については、こちらをご覧ください。 

AWS EventBridgeとMomentoの統合の詳細については、以下のリソースを参照してください:

Share