Amazon EventBridge経由でAmazon DynamoDBのストリームをMomentoと連携 ~ AWS CDKで完全自動化!
AWS CDKを使用したAmazon EventBridgeとMomentoの統合の完全なウォークスルー。
プロジェクトの GitHubリポジトリで、ステップ・バイ・ステップのセットアップ手順をご覧ください。ウェブ・アプリケーションの動作については、このブログ記事 をご覧ください。
このブログでは、AWS CDK (Cloud Development Kit) を使って Amazon EventBridge と Momento の統合を説明します。気象データをDynamoDBに保存し、AWSサービスとMomentoを組み合わせて変更を伝播するサンプルアプリケーションを使用します。
CDKスタックを使ったデプロイの自動化に焦点を当てます。具体的には、DynamoDBテーブルの変更をリッスンし、Amazon EventBridgeを使用してこれらのイベントをMomento キャッシュとトピックスにルーティングする方法を示します。
CDKスタック
提供されているCDKスタックは、DynamoDB, Amazon EventBridge, Momentoを統合している。順を追って説明します。
1.DynamoDBテーブルの定義
まず、気象統計を格納するDynamoDBテーブルを作成する。このテーブルには、変更をキャプチャするためのストリームが設定されています:
const weatherStatsDemoTable = new dynamodb.Table(this, "weather-stats-demo-table", {
tableName: "weather-stats-demo",
partitionKey: { name: "Location", type: dynamodb.AttributeType.STRING },
stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
removalPolicy: cdk.RemovalPolicy.DESTROY,
});
2.EventBridgeの接続を設定する
次に、EventBridgeがMomentoにリクエストを送信するための接続を作成します。この接続では、AWS Secrets Manager に保存されている API キーを使用して認証を行います:
const connection = new events.Connection(this, 'weather-stats-demo-connection', { connectionName: 'weather-stats-demo-connection', authorization: events.Authorization.apiKey('Authorization', cdk.SecretValue.secretsManager(apiKeySecret.secretName)), description: 'Connection with API Key Authorization',});
この接続は、EventBridgeがMomentoへのリクエストを安全に認証するために重要です。
注:Momentoは現在、Amazon EventBridgeのパートナーAPIデスティネーションとしてフィーチャーされています! もしCDKのコードを使ってデプロイするのではなく、AWSコンソールから手動でセットアップしたい場合は、このガイドに従って始めることができます。
3. MomentoのAPI Destinationsを設定する
EventBridgeのAPI Destinationsは、イベントを外部サービスにルーティングするためのものです。ここでは、Momentoの操作別にAPI Destinationsを設定する:
- キャッシュプット操作: Momentoのキャッシュにデータをセットする。
- トピック公開操作: イベントをMomentoトピックに公開します。
- キャッシュ削除操作: Momentoのキャッシュからデータを削除します。
// Define the API destination for the cache put operation.
const cachePutApiDestination = new events.ApiDestination(this, "weather-stats-demo-cache-put-api-destination", {
apiDestinationName: "weather-stats-demo-cache-put-api-destination",
connection,
endpoint: `${momentoApiEndpointParameter.valueAsString}/cache/*`,
description: "Cache Set API",
httpMethod: events.HttpMethod.PUT,
});
// Define the API destination for the topic publish operation
const topicPublishApiDestination = new events.ApiDestination(this, "weather-stats-demo-topic-publish-api-destination", {
apiDestinationName: "weather-stats-demo-topic-publish-api-destination",
connection,
endpoint: `${momentoApiEndpointParameter.valueAsString}/topics/*/*`,
description: "Topic Publish API",
httpMethod: events.HttpMethod.POST,
});
// Define the API destination for the cache delete operation
const cacheDeleteApiDestination = new events.ApiDestination(this, "weather-stats-demo-cache-delete-api-destination", {
apiDestinationName: "weather-stats-demo-cache-delete-api-destination",
connection,
endpoint: `${momentoApiEndpointParameter.valueAsString}/cache/*`,
description: "Cache Delete API",
httpMethod: events.HttpMethod.DELETE,
});
4.デッドレターキュー(DLQ)の実装
デッドレターキュー(DLQ)は、処理に失敗したイベントが失われないようにするものである。処理に失敗したイベントは破棄されるのではなく、DLQに送られ、そこで後で再試行することができます。
我々のスタックでは、各EventBridge Pipeに対してDLQを構成し、ターゲットAPIデスティネーションで正常に処理できなかったイベントを処理する:
const deadLetterQueue = new sqs.Queue(this, "DeadLetterQueue", {
queueName: "weather-stats-demo-dlq",
retentionPeriod: cdk.Duration.days(14),
});
5. EventBridgeパイプの作成
EventBridgeパイプは、イベントソースとターゲットを接続します。ここでは、異なるタイプのDynamoDBイベントを処理するために、3つのパイプを定義しています:
- キャッシュ・プット・パイプ:
INSERT
およびMODIFY
イベントをキャッシュ put API にルーティングします。 - トピック・パブリッシュ・パイプ: イベントをトピック・パブリッシュ API にルーティングします。
- キャッシュ削除パイプ:
REMOVE
イベントをキャッシュ削除 API にルーティングします。
// Define the pipe for the cache put operation
const cachePutCfnPipe = new pipes.CfnPipe(this, "weather-stats-demo-cache-put-pipe", {
name: "weather-stats-demo-cache-put-pipe",
desiredState: "RUNNING",
source: weatherStatsDemoTable.tableStreamArn!,
sourceParameters: {
dynamoDbStreamParameters: {
batchSize: 1,
startingPosition: "LATEST",
maximumRetryAttempts: 0,
deadLetterConfig: {
arn: deadLetterQueue.queueArn,
},
},
filterCriteria: {
filters: [{
pattern: '{"eventName": ["INSERT", "MODIFY"]}',
}],
},
},
target: cachePutApiDestination.apiDestinationArn!,
roleArn: role.roleArn,
logConfiguration: {
cloudwatchLogsLogDestination: {
logGroupArn: logGroup.logGroupArn,
},
level: "INFO",
includeExecutionData: ["ALL"],
},
});
// Define the pipe for the topic publish operation
const topicPublishCfnPipe = new pipes.CfnPipe(this, "weather-stats-demo-topic-publish-pipe", {
name: "weather-stats-demo-topic-publish-pipe",
desiredState: "RUNNING",
source: weatherStatsDemoTable.tableStreamArn!,
sourceParameters: {
dynamoDbStreamParameters: {
batchSize: 1,
startingPosition: "LATEST",
maximumRetryAttempts: 0,
deadLetterConfig: {
arn: deadLetterQueue.queueArn,
},
},
},
target: topicPublishApiDestination.apiDestinationArn!,
roleArn: role.roleArn,
logConfiguration: {
cloudwatchLogsLogDestination: {
logGroupArn: logGroup.logGroupArn,
},
level: "INFO",
includeExecutionData: ["ALL"],
},
});
// Define the pipe for the cache delete operation
const cacheDeleteCfnPipe = new pipes.CfnPipe(this, "weather-stats-demo-cache-delete-pipe", {
name: "weather-stats-demo-cache-delete-pipe",
desiredState: "RUNNING",
source: weatherStatsDemoTable.tableStreamArn!,
sourceParameters: {
dynamoDbStreamParameters: {
batchSize: 1,
startingPosition: "LATEST",
maximumRetryAttempts: 0,
deadLetterConfig: {
arn: deadLetterQueue.queueArn,
},
},
filterCriteria: {
filters: [{
pattern: '{"eventName": ["REMOVE"]}',
}],
},
},
target: cacheDeleteApiDestination.apiDestinationArn!,
roleArn: role.roleArn,
logConfiguration: {
cloudwatchLogsLogDestination: {
logGroupArn: logGroup.logGroupArn,
},
level: "INFO",
includeExecutionData: ["ALL"],
},
});
// Add target parameters to the pipes
cachePutCfnPipe.targetParameters = {
inputTemplate: "{\n \"Location\": <$.dynamodb.Keys.Location.S>, \n \"MaxTemp\": <$.dynamodb.NewImage.MaxTemp.N>,\n \"MinTemp\": <$.dynamodb.NewImage.MinTemp.N>, \n \"ChancesOfPrecipitation\": <$.dynamodb.NewImage.ChancesOfPrecipitation.N>\n}",
httpParameters: {
pathParameterValues: [cacheName],
queryStringParameters: {
key: "$.dynamodb.Keys.Location.S",
ttl_seconds: "$.dynamodb.NewImage.TTL.N"
},
},
};
topicPublishCfnPipe.targetParameters = {
inputTemplate: "{\n \"EventType\": <$.eventName>, \"Location\": <$.dynamodb.Keys.Location.S>, \n \"MaxTemp\": <$.dynamodb.NewImage.MaxTemp.N>,\n \"MinTemp\": <$.dynamodb.NewImage.MinTemp.N>, \n \"ChancesOfPrecipitation\": <$.dynamodb.NewImage.ChancesOfPrecipitation.N>\n}",
httpParameters: {
pathParameterValues: [cacheName, topicName],
},
};
cacheDeleteCfnPipe.targetParameters = {
httpParameters: {
pathParameterValues: [cacheName],
queryStringParameters: {
key: "$.dynamodb.Keys.Location.S"
},
},
};
また、これらのパイプにターゲット・パラメータを追加して、Momento に送信する前にデータをどのようにフォーマットするかを指定する。
6. IAMロールとポリシーの定義
IAMロールは、EventBridge PipesがDynamoDBストリーム、API Destinations、および失敗したイベントを処理するためのDead Letter Queue (DLQ) にアクセスすることを許可するポリシーとともに作成されます:
const role = new iam.Role(this, "AmazonEventBridgePipeWeatherStatsDemoEventToMomentoCache", {
roleName: "AmazonEventBridgePipeWeatherStatsDemoEventToMomentoCache",
assumedBy: new iam.ServicePrincipal("pipes.amazonaws.com"),
});
this.addPolicyForEventBridgeRole(role, cachePutApiDestination, cacheDeleteApiDestination, topicPublishApiDestination, weatherStatsDemoTable, deadLetterQueue);
7.ユーティリティリソースの作成
APIキーのシークレット、CloudWatchロググループ、Momento APIのパラメータ化された値など、いくつかのユーティリティリソースを作成します:
const apiKeySecret = new Secret(this, 'MomentoEventbridgeApiKey', {
secretName: 'momento-eventbridge-api-key',
secretStringValue: new cdk.SecretValue(momentoApiKeyParameter.valueAsString),
});
CDKコードのデプロイ
まず、GitHubのリポジトリをクローンし、EventBridgeのサンプルにcdします:
git clone https://github.com/momentohq/client-sdk-javascript
cd examples/nodejs/aws/eventbridge
デプロイする前に、以下のものが必要です:
- Momentoキャッシュ:
momento-eventbridge-cache
というキャッシュが必要です。Momento コンソール.で作成できます。 - Momento APIキー: これもMomento コンソール.で作成できます。APIキーがキャッシュと同じリージョンに作成されていることを確認してください!
- HTTP API エンドポイント: API Key を作成した後、Momento コンソールからコピーするか、 ドキュメントのリージョンセクションを参照してください。.
- AWS 認証情報: AWS アカウントの AccessKeyId、SecretAccessKey (および、一時的な認証情報を使用している場合はオプションで SessionToken)。
次に、プロジェクトのルート・ディレクトリに.env
ファイルを作成し、以下の環境変数を設定する:
MOMENTO_API_KEY=<your-momento-api-key>
MOMENTO_API_ENDPOINT=<your-momento-api-endpoint>
AWS_ACCESS_KEY_ID=<your-aws-access-key-id>
AWS_SECRET_ACCESS_KEY=<your-aws-secret-access-key>
AWS_REGION=<your-aws-region>
AWS_SESSION_TOKEN=<your-aws-session-token> # Optional, if you are using temporary credentials
infrastructure
ディレクトリには、このブログで前述したDynamoDBテーブル、DynamoDB Stream、EventBridgeリソースを定義したCDKアプリケーションが含まれています。これらは、残りのデモコードに不可欠なものなので、プロジェクトの他の部分を実行する前に、CDKスタックをデプロイする必要があります。
CDKスタックをデプロイするには、以下のスクリプトを実行します:
./deploy-stack.sh
これで必要なAWSリソースがすべてプロビジョニングされ、デモコードの実行を開始する準備が整う。
結論
このセットアップにより、DynamoDB、EventBridge、Momentoを統合したAWS CDKを使用した自動化パイプラインが作成され、天候の更新をリアルタイムで処理できるようになりました。このスタックは、需要に応じて無理なくスケールするキャッシュレイヤーを備えたイベント駆動型アプリケーションを構築する方法の強力な例です。
その他のリソース
サンプル・アプリケーションの詳細については、こちらをご覧ください。
AWS EventBridgeとMomentoの統合の詳細については、以下のリソースを参照してください: