サンフランシスコで開催されるQCon 2024にお出かけですか? ミーティングを予約する!

Momento でリアルタイム通知システムを構築する方法: ステップバイステップガイド

リアルタイム通知は、現代のアプリケーションで大きな役割を果たしています。リアルタイム通知により、アプリケーションは更新情報をユーザーに即座にプッシュすることができ、次のようなシナリオでは不可欠です。

ヤン・キュイ ヘッドショット
ヤン・クイ
著者

Share

リアルタイム通知は、現代のアプリケーションで大きな役割を果たしています。リアルタイム通知により、アプリケーションは更新情報をユーザーに即座にプッシュすることができ、次のようなシナリオでは不可欠です。

  • メッセージング アプリ: チャット メッセージを配信します。
  • 金融サービス: 株価のライブ更新。
  • ソーシャル メディア: いいね、コメント、共有についてユーザーに通知します。
  • オンライン ゲーム: マルチプレイヤー ゲームでのゲーム状態の更新。

リアルタイム機能を実装すると、ユーザー エンゲージメントが向上し、競争上の優位性が得られます。これは、WebSocket の最も一般的な使用例の 1 つです。

ただし、数百万の同時ユーザーに対応できる WebSocket システムを構築するのは簡単なことではありません。

幸いなことに、最近では多くの SaaS プロバイダーがマネージド WebSocket ソリューションを提供しています。たとえば、AWS は API Gateway、AppSync、IoT Core を通じて WebSocket をサポートしています。これらのオプションはすべて、メッセージと接続時間の両方に対して料金が発生します。

残念ながら、これはアイドル接続に対して料金を支払うことを意味し、更新が頻繁に行われない多くのアプリケーションにとってはコストがかかる可能性があります。

たとえば、ソーシャル ネットワークでは、10% のユーザーがコンテンツの 90% を作成している可能性が高く、中間のユーザーのフォロワーはわずか数人です。そのため、ほとんどのユーザーにとって、プッシュ更新をほとんどまたはまったく受信しないため、接続時間のコストが無駄になります。

WebSocket のコストが実際の使用量、つまり送信するメッセージの数にもっと合ったらいいと思いませんか?

さて、ここでMomentoトピック[1]の出番です!

WebSocket の難しい部分を抽象化し、接続に対して課金されないオンデマンドの価格設定を採用しています。

このガイドでは、Momento トピックを使用してリアルタイム通知システムを構築します。その過程で、ユーザーが自分の更新のみをサブスクライブできるように、きめ細かい認証を実装する方法を説明します。

このデモのすべてのコードはこのリポジトリ[2]で入手できます。

前提条件

このガイドの前提条件は次のとおりです。

Momentoトピックに関する注意事項

Momento トピックについて注目すべき重要な点は、それらが仮想的であるということです。つまり、トピックを使用する前に明示的に作成する必要はありません。

このステップバイステップのガイドを読み進める際には、この点に留意してください。

使い捨てトークンを理解する

Momento には、認証と承認のための 2 つの別個のメカニズム (API キーと使い捨てトークン) があります。

バックエンドプロセスの場合は、API キーを使用します。

フロントエンドでは、Momento SDK [4]を使用して使い捨てトークンを生成することができます。これらのトークンは、特定のリソースに対する特定のアクションに限定することができます。

たとえば、トークンは特定のトピックへのサブスクライブのみの権限を付与できます。すべてのユーザーが独自のトピックを持っている場合は、使い捨てトークンを使用して、ユーザーが自分のトピックのみをサブスクライブできるようにすることができます。

アーキテクチャの概要

このデモでは、次のようなフルスタック アプリケーションを構築します。

  • フロントエンドは API に対してタスクをキューに入れます。
  • タスクは SQS キューにプッシュされ、非同期で処理されます。
  • バックエンドはタスクを処理し、タスクが完了するとフロントエンドに通知します (Momento トピック経由)。

API には 2 つのルートがあります。

  • POST /task : 処理するタスクをキューに追加します。
  • GET /token : フロントエンドが Momento トピックをサブスクライブできるように、使い捨てトークンを生成します。

全体的なアーキテクチャは次のとおりです。

ステップ1: 新しいキャッシュを作成する

Momento トピックは、Momento キャッシュと同じインフラストラクチャを共有します。したがって、奇妙に聞こえるかもしれませんが、トピックを使用するには、まずキャッシュを作成する必要があります。

Momento コンソールにログインし、「キャッシュ」に移動します。

「キャッシュを作成」をクリックします。

この新しいキャッシュを「 notification 」と呼び、「us-east-1」リージョンに作成します。

これはMomento CLI [5]を通じて行うこともできます。

ステップ2: APIキーを生成する

Momento には 2 種類の API キーがあります。

  • スーパーユーザーキー: キャッシュとトピックを管理し、使い捨てトークンを生成するために使用されます。
  • きめ細かいアクセス キー: これは、Moment のキャッシュ/トピックとやり取りするためのものです。

使い捨てトークンを生成するにはスーパーユーザー キーが必要です。ただし、トピックにメッセージを公開するには、きめ細かいアクセス キーのみが必要です。

実際には、エンドポイントごとに 1 つずつ、合計 2 つのキーを作成する必要があります。ただし、簡潔にするために、スーパー ユーザー キーのみを作成します。

Momento コンソールの「API キー」に移動し、「us-east-1 」リージョンに新しい「スーパーユーザー キー」を生成します。

APIキーの生成」をクリックします。

ステップ3: APIキーを保護する

API キーを安全に保つために、SSM パラメータ ストアに保存します。

AWSの「AWS Systems Manager」コンソールに移動し、「Parameter Store」をクリックします。

新しいパラメータを作成し、その新しいパラメータを「/notification-api/dev/momento-api-key」と呼びます。これは、SSM パラメータに通常使用する命名規則です/{service-name}/{environment}/{parameter-name}

パラメータ タイプが「SecureString」であることを確認します。これにより、API キーが保存時に暗号化されます。

ステップ4: CDKアプリを作成する

このデモでは、CDK を使用します。ただし、他の Infrastructure-as-Code ツールも使用でき、同様に機能します。

CDK アプリでは、次のことを行います。

  1. サーバーレス開発において最も影響力のあるプラクティスの1つであるエフェメラル環境[6]をサポートします。
  2. 一時環境でメイン環境の1つ(例:dev)のSSMパラメータ[7]を再利用できるようにします。

stageNameしたがって、と という2 つのコンテキスト変数を取りますssmStageName

stageName名前の競合を避けるために、作成するすべての AWS リソースの名前に が含まれます。

参照するすべての SSM パラメータでは、ssmStageNameの代わりに が使用されます。stageName

これらを念頭に置いて、CDK アプリを紹介します。

#!/usr/bin/env node

const cdk = require('aws-cdk-lib');
const { NotificationApiStack } = require('./constructs/notification-api-stack');

const app = new cdk.App();

let stageName = app.node.tryGetContext('stageName');
let ssmStageName = app.node.tryGetContext('ssmStageName');

if (!stageName) {
  console.log('Defaulting stage name to dev');
  stageName = 'dev';
}

if (!ssmStageName) {
  console.log(`Defaulting SSM stage name to "stageName": ${stageName}`);
  ssmStageName = stageName;
}

const serviceName = 'notification-api';

new NotificationApiStack(app, `NotificationApiStack-${stageName}`, {
  serviceName,
  stageName,
  ssmStageName,
});

そしてここにNotificationApiStackがあります:

const { Stack, Duration, CfnOutput } = require('aws-cdk-lib');
const { Runtime } = require('aws-cdk-lib/aws-lambda');
const { NodejsFunction } = require('aws-cdk-lib/aws-lambda-nodejs');
const { RestApi, LambdaIntegration, CfnAuthorizer, AuthorizationType } = require('aws-cdk-lib/aws-apigateway');
const iam = require('aws-cdk-lib/aws-iam');
const sqs = require('aws-cdk-lib/aws-sqs');
const { SqsEventSource } = require('aws-cdk-lib/aws-lambda-event-sources');
const { UserPool, UserPoolClient } = require('aws-cdk-lib/aws-cognito');

const MOMENTO_CACHE_NAME = 'notifications';

class NotificationApiStack extends Stack {
  constructor(scope, id, props) {
    super(scope, id, props);

    const api = new RestApi(this, `${props.stageName}-NotificationApi`, {
      deployOptions: {
        stageName: props.stageName,
        tracingEnabled: true
      }
    });

    const userPool = new UserPool(this, 'CognitoUserPool', {
      userPoolName: `${props.serviceName}-${props.stageName}-UserPool`,
      selfSignUpEnabled: true,
      signInAliases: { email: true }
    });

    const webUserPoolClient = new UserPoolClient(this, 'WebUserPoolClient', {
      userPool,
      authFlows: {
        userSrp: true
      },
      preventUserExistenceErrors: true
    });

    new CfnOutput(this, 'UserPoolId', { value: userPool.userPoolId });
    new CfnOutput(this, 'UserPoolClientId', { value: webUserPoolClient.userPoolClientId });

    this.momentoApiKeyParamName = `/${props.serviceName}/${props.ssmStageName}/momento-api-key`;
    this.momentoApiKeyParamArn = `arn:aws:ssm:${this.region}:${this.account}:parameter${this.momentoApiKeyParamName}`;

    this.taskQueue = this.createTaskQueue(props);

    this.createProcessTaskFunction(props);

    const queueTaskFunction = this.createQueueTaskFunction(props);
    const tokenVendingMachineFunction = this.createTokenVendingMachineFunction(props);

    this.createApiEndpoints(api, userPool, {
      queueTask: queueTaskFunction,
      tokenVendingMachine: tokenVendingMachineFunction
    });
  }

  createTaskQueue(props) {
    return new sqs.Queue(this, 'TaskQueue', {
      queueName: `${props.serviceName}-${props.stageName}-tasks`      
    });
  }

  createQueueTaskFunction(props) {
    const func = new NodejsFunction(this, 'QueueTaskFunction', {
      runtime: Runtime.NODEJS_20_X,
      handler: 'handler',
      entry: 'functions/queue-task.js',
      memorySize: 1024,
      environment: {
        SERVICE_NAME: props.serviceName,
        STAGE_NAME: props.stageName,
        POWERTOOLS_LOG_LEVEL: props.stageName === 'prod' ? 'INFO' : 'DEBUG',
        TASK_QUEUE_URL: this.taskQueue.queueUrl
      }
    });

    this.taskQueue.grantSendMessages(func);

    return func;
  }

  createProcessTaskFunction(props) {
    const func = new NodejsFunction(this, 'ProcessTaskFunction', {
      runtime: Runtime.NODEJS_20_X,
      handler: 'handler',
      entry: 'functions/process-task.js',
      memorySize: 1024,
      timeout: Duration.seconds(10),
      environment: {
        SERVICE_NAME: props.serviceName,
        STAGE_NAME: props.stageName,
        MOMENTO_API_KEY_PARAM_NAME: this.momentoApiKeyParamName,
        MOMENTO_CACHE_NAME,
        POWERTOOLS_LOG_LEVEL: props.stageName === 'prod' ? 'INFO' : 'DEBUG'
      }
    });

    func.addEventSource(new SqsEventSource(this.taskQueue, {
      reportBatchItemFailures: true
    }));

    func.role.attachInlinePolicy(new iam.Policy(this, 'ProcessTaskFunctionSsmPolicy', {
      statements: [
        new iam.PolicyStatement({
          effect: iam.Effect.ALLOW,
          actions: [ 'ssm:GetParameter*' ],
          resources: [ this.momentoApiKeyParamArn ]
        })
      ]
    }));

    return func;
  }

  createTokenVendingMachineFunction(props) {
    const func = new NodejsFunction(this, 'TokenVendingMachineFunction', {
      runtime: Runtime.NODEJS_20_X,
      handler: 'handler',
      entry: 'functions/token-vending-machine.js',
      memorySize: 1024,      
      environment: {
        SERVICE_NAME: props.serviceName,
        STAGE_NAME: props.stageName,
        MOMENTO_API_KEY_PARAM_NAME: this.momentoApiKeyParamName,
        MOMENTO_CACHE_NAME,
        POWERTOOLS_LOG_LEVEL: props.stageName === 'prod' ? 'INFO' : 'DEBUG'
      }
    });

    func.role.attachInlinePolicy(new iam.Policy(this, 'TokenVendingMachineFunctionSsmPolicy', {
      statements: [
        new iam.PolicyStatement({
          effect: iam.Effect.ALLOW,
          actions: [ 'ssm:GetParameter*' ],
          resources: [ this.momentoApiKeyParamArn ]
        })
      ]
    }));

    return func;
  }

  /**
   * 
   * @param {RestApi} api
   * @param {UserPool} userPool
   */
  createApiEndpoints(api, userPool, functions) {
    const authorizer = new CfnAuthorizer(this, 'CognitoAuthorizer', {
      name: 'CognitoAuthorizer',
      type: 'COGNITO_USER_POOLS',
      identitySource: 'method.request.header.Authorization',
      providerArns: [userPool.userPoolArn],
      restApiId: api.restApiId,
    });    

    // POST /task
    const taskResource = api.root.addResource('task');
    taskResource.addMethod('POST', new LambdaIntegration(functions.queueTask), {
      authorizer: {
        authorizationType: AuthorizationType.COGNITO,
        authorizerId: authorizer.ref
      }
    });

    taskResource.addCorsPreflight({
      allowHeaders: ['*'],
      allowMethods: ['OPTIONS', 'POST'],
      allowCredentials: true,
      allowOrigins: ['*']
    });

    // GET /token
    const tokenResource = api.root.addResource('token');
    tokenResource.addMethod('GET', new LambdaIntegration(functions.tokenVendingMachine), {
      authorizer: {
        authorizationType: AuthorizationType.COGNITO,
        authorizerId: authorizer.ref
      }
    });

    tokenResource.addCorsPreflight({
      allowHeaders: ['*'],
      allowMethods: ['OPTIONS', 'POST'],
      allowCredentials: true,
      allowOrigins: ['*']
    });
  }
}

module.exports = { NotificationApiStack }

ここでは、API Gateway に API を作成し、前述のルートを実装するための 2 つの Lambda 関数を作成しました。

  • トークン自動販売機機能
  • キュータスク関数

SQS メッセージ処理

は、 POST /taskQueueTaskFunctionエンドポイントへのリクエストを処理し、タスクを SQS キューにエンキューします。これは、10 個のバッチで処理されます (SQS イベント ソース マッピングのデフォルト)。taskQueueProcessTaskFunction

部分的なバッチ失敗を処理reportBatchItemFailuresできるようになっていることに注意してください[8]。

func.addEventSource(new SqsEventSource(this.taskQueue, {
  reportBatchItemFailures: true
}));

認証と承認

API は Cognito 認証によって保護されます。

const authorizer = new  CfnAuthorizer ( this、'CognitoAuthorizer'、 { 
name : 'CognitoAuthorizer'、
type : 'COGNITO_USER_POOLS'、
identitySource : 'method.request.header.Authorization'、
providerArns : [userPool. userPoolArn ]、
restApiId : api. restApiId、
});

したがって、API Gateway によってが呼び出されると、呼び出しイベントでQueueTaskFunctionユーザーの Cognito を見つけることができます。sub

これをユーザーの一意の ID とユーザー固有のトピックの名前として使用します。SQS メッセージをキューに入れるときに、ProcessTaskFunction結果をどのトピックに公開するかを認識できるように、この情報を含める必要があります。

SSMパラメータを安全に読み込む

関数には環境変数として Momento API キーが含まれていないことに注意してください。代わりに、パラメータの名前を渡します。

environment: {
  SERVICE_NAME: props.serviceName,
  STAGE_NAME: props.stageName,
  MOMENTO_API_KEY_PARAM_NAME: this.momentoApiKeyParamName,
  MOMENTO_CACHE_NAME,
  POWERTOOLS_LOG_LEVEL: props.stageName === 'prod' ? 'INFO' : 'DEBUG'
}

そして、実行時にパラメータを取得するための IAM 権限を関数に付与します。

func.role.attachInlinePolicy(new iam.Policy(this, 'ProcessTaskFunctionSsmPolicy', {
  statements: [
    new iam.PolicyStatement({
      effect: iam.Effect.ALLOW,
      actions: [ 'ssm:GetParameter*' ],
      resources: [ this.momentoApiKeyParamArn ]
    })
  ]
}));

これは次の理由によるものです:

  • 環境変数から情報を盗む、侵害された依存関係から身を守ります。
  • API キーの有効期限を短く設定し、アプリケーションを再デプロイせずにキーをローテーションすることができます。

コールド スタート中、関数は SSM パラメータを取得し、それを復号化して、その値を数分間キャッシュします。キャッシュの有効期限が切れると、次の呼び出しで SSM パラメータ ストアから更新された値を取得しようとします。

この方法により、呼び出しごとに SSM を呼び出す必要がなくなります。API キーをバックグラウンドで (cron ジョブを使用して) ローテーションすると、関数はキャッシュの有効期限が切れた後に新しいキーを自動的に取得します。

幸いなことに、Middyのssmミドルウェア[9]は、このフローをすぐにサポートしています。このミドルウェアに重い処理を任せますが、これについては後で詳しく説明します。

ワークフローの例

JIRA チケット「ABP-1734」の作業を開始するときは、次のことを行います。

  1. 機能ブランチを作成しますABP-1734
  2. を実行して一時的な環境を作成しますcdk deploy --context stageName=FEAT-ABP-1734 --context ssmStageName=dev。これにより、通知サービスの新しいインスタンスが作成され、変更を個別に処理できるようになります。この新しい環境ではdevSSM パラメータが使用されますが、そのすべてのリソースにはサフィックスが付きますFEAT-ABP-1734
  3. 変更を加えてテストし、PR を作成します。
  4. を実行して一時的な環境を削除しますcdk destroy --context stageName=FEAT-ABP-1734 --context ssmStageName=dev

これらの短命な環境は、機能開発や CI/CD パイプラインでのテスト実行に役立ちます。従量課金制のおかげで、追加コストをかけずに、必要な数の環境を持つことができます。

理想的には、環境ごとに 1 つの Momento キャッシュを用意します。その場合、キャッシュの名前の先頭または末尾に を付ける必要がありますstageName

ステップ5: TokenVendingMachine関数を実装する

TokenVendingMachineルートの背後にある関数のコードは次のとおりですGET /token

const { initAuthClient, generateToken } = require('../lib/momento');
const middy = require('@middy/core');
const cors = require('@middy/http-cors');
const ssm = require('@middy/ssm');

const handler = async (event, context) => {  
  await initAuthClient(context.MOMENTO_API_KEY);

  const userId = event.requestContext.authorizer.claims.sub;
  const tokenResult = await generateToken(userId);

  return {
    statusCode: 200,
    body: JSON.stringify(tokenResult)
  }
}

module.exports.handler = middy(handler)
.use(cors())
.use(ssm({
  cache: true,
  cacheExpiry: 5 * 60 * 1000,
  setToContext: true,
  fetchData: {
    MOMENTO_API_KEY: process.env.MOMENTO_API_KEY_PARAM_NAME
  }
}));

ここでは、Middyのssmミドルウェア[9]を使用して、SSMパラメータストアからMomento APIキーを取得してキャッシュします。

.use(ssm({
  cache: true,
  cacheExpiry: 5 * 60 * 1000,
  setToContext: true,
  fetchData: {
    MOMENTO_API_KEY: process.env.MOMENTO_API_KEY_PARAM_NAME
  }
}));

デフォルトでは、ミドルウェアは取得したデータを環境変数に挿入します。ただし、前述のように、暗号化されていない API キーを Lambda 関数の環境変数に配置することは避けてください。攻撃者は多くの場合、環境変数をスキャンして機密情報を探します。

そこで、取得したデータを Lambda の呼び出しオブジェクトに設定するようにミドルウェアに依頼しますcontext。そのため、Momento クライアントを初期化するときに、 から Momento API キーを取得する必要がありますcontext.MOMENTO_API_KEY

await initAuthClient(context.MOMENTO_API_KEY);

共有ロジックのカプセル化

前のスニペットに従って、すべての Momento 関連の操作を共有momento.jsモジュールにカプセル化しました。

これには、Momento 認証クライアントの初期化などの共有ロジックが含まれます。

const {
  CredentialProvider,
  AuthClient,
  DisposableTokenScopes,
  ExpiresIn,
  GenerateDisposableTokenResponse
} = require('@gomomento/sdk');

const { Logger } = require('@aws-lambda-powertools/logger');
const logger = new Logger({ serviceName: 'notification-api' });

const { MOMENTO_CACHE_NAME } = global.process.env;

let authClient;

async function initAuthClient(apiKey) {
  if (!authClient) {
    logger.info('Initializing Momento auth client');
    
    authClient = new AuthClient({
      credentialProvider: CredentialProvider.fromString(apiKey)
    });

    logger.info('Initialized Momento auth client');
  }
};

async function generateToken(userId) {
  const result = await authClient.generateDisposableToken(
    DisposableTokenScopes.topicSubscribeOnly(MOMENTO_CACHE_NAME, userId),
    ExpiresIn.minutes(30)
  );

  return {
    endpoint: result.endpoint,
    token: result.authToken,
    cacheName: MOMENTO_CACHE_NAME,
    expiresAt: result.expiresAt
  };
}

ここでは、Lambda 実行環境が再利用されるという事実を活用しています。

新しい実行環境が作成されると (コールド スタート中)、authClient変数が設定されます。同じ実行環境での後続の呼び出しでは、initAuthClient関数は短絡してすぐに戻ります。

きめ細かな承認

この行に注目してください:

const result = await authClient.generateDisposableToken(
  DisposableTokenScopes.topicSubscribeOnly(MOMENTO_CACHE_NAME, userId),
  ExpiresIn.minutes(30)
);

ここでは、権限が非常に制限された短命のトークンを生成しています。このトークンは、ユーザーの Cognito サブと名前が一致するトピックのみをサブスクライブできます。

これにより、ユーザーが他のユーザーの更新を購読するのを防止します。

ステップ6: QueueTask関数を実装する

QueueTaskルートの背後にある関数のコードは次のとおりですPOST /task

const { SQSClient, SendMessageCommand } = require('@aws-sdk/client-sqs');
const sqsClient = new SQSClient();
const middy = require('@middy/core');
const cors = require('@middy/http-cors');

const handler = async (event) => {
  await sqsClient.send(new SendMessageCommand({
    QueueUrl: process.env.TASK_QUEUE_URL,
    MessageBody: JSON.stringify({
      userId: event.requestContext.authorizer.claims.sub,
      payload: event.body
    })
  }));

  return {
    statusCode: 202,
  };
};

module.exports.handler = middy(handler)
.use(cors());

この関数は POST 本文を受け取り、処理のために SQS のキューに入れます。注目すべき重要な点は、ユーザーの Cognito サブもuserIdSQS メッセージに含めたことです。

MessageBody: JSON.stringify({
  userId: event.requestContext.authorizer.claims.sub,
  payload: event.body
})

ステップ7: ProcessTask関数を実装する

SQS キューのもう一方の端では、ProcessTask関数がメッセージを処理し、タスクが完了したときにユーザーに通知します。

長時間実行されるタスクをシミュレートするには (そのため、HTTP ハンドラーで処理するのではなく、キューにオフロードします)、5 秒間スリープします。

const { initClient, publish } = require('../lib/momento');
const middy = require('@middy/core');
const ssm = require('@middy/ssm');

const handler = async (event, context) => {
  await initClient(context.MOMENTO_API_KEY);

  for (const record of event.Records) {
    const { body } = record;
    const task = JSON.parse(body);
    const payload = JSON.parse(task.payload);

    await sleep(5000);

    await publish(task.userId, `Finished task: ${payload.data}`);
  }
};

function sleep(ms) {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

module.exports.handler = middy(handler)
.use(ssm({
  cache: true,
  cacheExpiry: 5 * 60 * 1000,
  setToContext: true,
  fetchData: {
    MOMENTO_API_KEY: process.env.MOMENTO_API_KEY_PARAM_NAME
  }
}));

これは、ユーザーのトピックにメッセージを公開する重要な行です。

await publish(task.userId, `Finished task: ${payload.data}`);

以下は共有モジュール内の対応するコードですmomento.js

let topicClient;

async function initClient(apiKey) {
  if (!topicClient) {
    logger.info('Initializing Momento topic client');
    
    topicClient = new TopicClient({
      configuration: TopicConfigurations.Lambda.latest(),
      credentialProvider: CredentialProvider.fromString(apiKey)
    });

    logger.info('Initialized Momento topic client');
  }
}

async function publish(topicName, value) {
  const result = await topicClient.publish(MOMENTO_CACHE_NAME, topicName, value);
}

ステップ8: フロントエンドでトピックを購読する

フロントエンドコードのほとんどはリアルタイムメッセージングの処理とは関係がないため、詳細に説明するつもりはありません。すべてのコードは、ディレクトリの下のデモリポジトリ[2]にありますfrontend

ここに重要な部分があります。

ユーザーがサインインすると、次の処理が行われます。

  1. GET /tokenユーザーの ID トークンを使用してエンドポイントを呼び出し、使い捨てトークンを取得します。
  2. トークンを使用して、Cognitoユーザー名(つまりsub)をトピック名として使用してMomentoトピックをサブスクライブします。




import { Auth } from 'aws-amplify'
import { subscribeToTopic } from '@/lib/momento'
import { createToaster } from '@meforma/vue-toaster'

...

const session = await Auth.currentSession()
const jwtToken = session.getIdToken().getJwtToken()

const getTokenResp = await axios.get(apiConfig.apiUrl + '/token', {
  headers: {
    Authorization: `Bearer ${jwtToken}`
  }
})

const { token, cacheName } = getTokenResp.data

await subscribeToTopic(token, cacheName, user.username, (message) => {
  const toaster = createToaster()
  toaster.show(message, { type: 'success' })
})

関数は次のとおりですsubscribeToTopic:





async function subscribeToTopic(authToken, cacheName, userId, onMessage) {
  const topicClient = new TopicClient({
    configuration: TopicConfigurations.Browser.latest(),
    credentialProvider: CredentialProvider.fromString({
      authToken
    })
  })

  console.log('Initialized Momento topic client')
  console.log('Topic name:', userId)

  await topicClient.subscribe(cacheName, userId, {
    onItem: (item => onMessage(item.value()))
  })
}

呼び出すときには、topicClient.subscribe着信メッセージを処理するためのコールバック関数を渡す必要があることに注意してください。

この場合、メッセージを取得して、隅にトースター ウィジェットをポップアップ表示します。

await subscribeToTopic(token, cacheName, user.username, (message) => {
  const toaster = createToaster()
  toaster.show(message, { type: 'success' })
}

このように:

ステップ9: デプロイとテスト

最後に、実行しcdk deployてテストしてください。

実際にどのように動作するかを確認したい場合は、ここで完全なコースを確認してください[10]。

まとめ

Momento トピックは完全にサーバーレスで、アップタイム コストはかかりません。インフラストラクチャを管理する必要がなく、拡張性を考慮して設計されています。

わずか数行のコードで、WebSocket アプリケーションを数百万の同時ユーザーに拡張できます。

使い捨てトークンを使用したきめ細かいアクセス制御が可能です。

安全、堅牢、拡張性、コスト効率に優れています。

これが WebSocket ソリューションに求めるすべてです。

リンク

[1] Momentoトピックについてさらに詳しく

[2]デモリポジトリ

[3]モメントホームページ

[4]モメントSDK

[5]モメントCLI

[6]サーバーレスエフェメラル環境の説明

[7]一時環境:SSMパラメータの再利用方法

[8] LambdaにおけるSQSイベントソースのエラー処理

[9] MiddyのSSMミドルウェア

[10] [フルコース] Momentoを使ったリアルタイムアプリの構築

Share