コドモン Product Team Blog

株式会社コドモンの開発チームで運営しているブログです。エンジニアやPdMメンバーが、プロダクトや技術やチームについて発信します!

イベント連携基盤の技術選定をした話 〜EventBridgeさわってみた編〜

こんにちは、プロダクト開発部の中田です。

今回は、イベント連携基盤の技術選定の記事の続きです。 MessageBrokerの候補として挙げたAmazon EventBridge(以降、EventBridgeと記述)について、実際にさわってみて気づいた点をご紹介します。

  1. イベント連携基盤の技術選定をした話 〜背景・課題編〜
  2. イベント連携基盤の技術選定をした話 〜EventBridgeさわってみた編〜 ←今回はこの話
  3. イベント連携基盤の技術選定をした話 〜Axon + MSKさわってみた編〜
  4. イベント連携基盤の技術選定をした話 〜比較・結論編〜

本記事の概要

まず、イベント連携基盤のMessageBrokerの候補としてEventBridgeを挙げた理由と、EventBridgeを利用する場合の構成案を簡単にまとめます。

そして、構成を検討するために実際にEventBridgeをさわってみたうえで、気になった点や課題を整理していきます。

EventBridgeを候補にあげた理由

特に前回の記事で述べた保守性の観点から、利用・保守が容易なサーバレスのサービスを第一候補として検討し始めました。

AWSでサーバレスにPub/Subの仕組みを実現する構成としては、以下のような例が見つかりました。

  • SNSによるファンアウトと、SQSとLmabdaによるバッファリング・ポーリングを組み合わせた構成
  • Kinesisでバッファリングとファンアウトを行う構成
  • EventBridgeのEventBusでイベントをルーティングする構成

参考にさせていただいた資料:

AWSにおけるイベント駆動アーキテクチャ /event driven architecture on aws - Speaker Deck

SNS+SQS+Lambda

SNS+SQS+Lambdaの構成は、実現性やパフォーマンスの高い、手堅い案だと思います。

しかし、構成要素となるリソースが多く、SQS+Lambdaの部分はSubscriberを増やすたびにリソースの作成が必要になる点が、機能開発チームにとってハードルとなりえます。

Kinesis

Kinesisについても、イベントの種類やPublisherごとにストリームの作成が必要になります。代案として、1つのストリームで多種イベントを扱う場合は、拡張ファンアウト数の上限(最大20個)を気にする必要がありそうです。

また、課金単位となるデータサイズの単位が比較的大きい点(プロビジョンドで25KB、オンデマンドで1KB)が少量多種のイベントには不向きだったり、Consumerとなる既存アプリケーションに後からKinesis Consumer Libraryを組み込むのが難しそうな点も気になりました。

EventBridge

EventBridgeは、アプリケーションが発行する多種のイベントを1つのカスタムイベントバスに集約し、RuleやTargetの設定によって多数のSubscriberにルーティングできる点が、今回の要件にもっとも合致するのではと考えました。

作成・管理すべきリソースが少なく、料金体系としてもイベントのPUTが1.00USD/100万件/月と小さく安く使い始められる点が魅力です。

構成案

EventBridgeのカスタムイベントバスを利用する構成として以下のようなものを考えました。

EventBridgeを利用する構成案
Publisherとなるアプリケーションは、単一のEventBusにイベントをPutします。

Subscriberとなるアプリケーションの担当チームは、トリガーとして利用したいイベントをEventBusから抽出する条件をRuleとして作成し、該当イベントを自分たちのアプリケーションに送信します。

Subscriberとなるアプリケーションに送信する方法は、各アプリケーションの担当チームに委ねることが可能です。 特にAPIを基本的なインターフェースとしているアプリケーションにとっては、API Destinationsを利用すればEventBridgeからHTTPでAPIリクエストを送信することが可能なので、対応が容易になります。 例えば、既存のAPIと同様に、負荷分散はロードバランサーで対応できますし、テストもAPI単位の結合テストができます。

もちろん、要件次第では、直接API送信するのでなく、SQSを挟んでバッファリングしたり、StepFunctionsやECSでワークフローやバッチを起動することも可能です。

検討内容

上記の構成案の実現性判断のため、実際にローカル環境やAWSのsandbox用アカウントで動かしてみて確認を行いました。

確認した内容の概要としては以下になります。

  • 基本的な動作の確認

    • EventBusへのイベント送信
    • API DestinationによるAPI送信
      • 入力変換によるイベントデータのリクエストパス・ヘッダー・ボディへのマッピング
    • Ruleで設定可能なパターンの確認
    • 複数のRule,Targetによる送信先の振り分け
      • Rule対API Destinationの関係が、1対NおよびN対1のケース
  • 障害時の調査・リカバリ方法

    • デッドレターキュー(以降、DLQと記述)の設定方法
    • アーカイブと再生の設定・利用方法
    • イベントログのCloudWatchLogsへの出力
    • 取得可能なメトリクスの確認

リソース構築にはCDKを使用し、ローカル環境で構築する場合はLocalstackを使用しました。 作成・設定するものとしては以下があります。

  • EventBus: イベントを受信するルーター
  • Rule: 後述のTargetへの送信対象となるイベントを抽出する条件(イベントの値が指定したパターンにマッチするか)
  • API Destination: イベント送信先API(URL等)
    • Connection: イベント送信先APIの認証方式(Basic認証、APIキー、OAuth)
  • Target: RuleとAPI Destination等の送信先を紐づける設定
    • Input Transformer: 送信するイベントの内容を変換する設定(送信先API仕様に合わせた、リクエストパス、ヘッダー、ボディの変換等)

検証時のCDKのコードは以下のようなものです(ただし、あくまで抜粋なので、上記要素を全て含むもの、これだけでデプロイ成立するもの、ではない点はご了承ください)。

// EventBusの作成
const bus = new events.EventBus(this, 'bus', {
  eventBusName: 'testEventBus'
});

// アーカイブ設定
bus.archive('Archive', {
  archiveName: 'testArchive',
  eventPattern: {
    account: [accountId],
  },
  retention: cdk.Duration.days(1),
});

// イベントログをCloudWatch Logsに保存する設定
const logGroup = new cdk.aws_logs.LogGroup(this, 'logGroup', {
  logGroupName: 'testLogGroup',
  retention: cdk.aws_logs.RetentionDays.ONE_DAY,  
});
const logRule = new events.Rule(this, 'logRule', {
  eventBus: bus,
  eventPattern: {
    account: [accountId],
  },
  targets: [new targets.CloudWatchLogGroup(logGroup)],
});

// API Destinationの作成
const connection = new events.Connection(this, 'Connection', {
  authorization: events.Authorization.apiKey('x-api-key', SecretValue.unsafePlainText('API-key-sample-value')),
});    
const destination = new events.ApiDestination(this, 'Destination', {
  connection,
  endpoint: functionUrl.url,
});

// 特定の型のドメインイベントを抽出するルールと、API Destinationの紐付け
const target = new events.Rule(this, 'Rule', {
  eventBus: bus,
  eventPattern: {
    detail: {
      eventType: ['BillConfirmed']
    }
  },
  targets: [new targets.ApiDestination(destination)],
});

ただ、CDKからLocalstackに対してdeployした際、Connection作成やInput Transformer実行が上手くいかずに失敗したため、代わりにaws cliも併用して設定をしたりしていました。 (CDKまたはLocalstackのバージョンによる可能性もあるので、最新バージョンで確認されることをおすすめします。)

また、上記の確認をしたうえで、Publisher/Subscriberとなるアプリケーションで必要となる実装内容についても検討しましたが、この部分は後述します。

気になること

MessageBroker(EventBridge)

API Destinationsエラー時のリカバリ方法

保守性の観点の一部として、EventBridgeからSubscriberのAPIへの送信が失敗したときの挙動やリカバリ方法は気になるところです。

API DestinationsでAPI送信がエラーになると、最長24時間まではリトライが繰り返されますが、その後はイベントを破棄するかDLQに入れることができます。

DLQ

DLQの実態はSQSキューなので、別途作成したSQSキューをDLQとして指定する必要があります。 DLQの設定はTargetごとに行う必要があります。

つまり、DLQを使用する場合、イベントの種類とSubscriberの組み合わせごとにSQSキューを作成することになります。 また、一度DLQに入ったイベントをEventBusに自動でリドライブする機能はないようなので、自動でリカバリまでしたい場合は、DLQとして使うSQSキューからイベントを刈り取ってリカバリを行うLambdaなどを用意する必要があります。 こうなると、SNS+SQS+LambdaよりEventBridgeを選んだメリットが薄れてしまいます。

アーカイブ・再生

別のリカバリ手段として、EventBridgeにはアーカイブと再生という機能があります。 EventBusに入った全てのイベントをアーカイブに蓄積しておくことができ、アーカイブに入ったイベントを対象のRuleや時間範囲を指定して再生することができます。

再生については、Subscriberへの配信の部分だけをやり直すというよりも、アーカイブから取り出したイベントを再度EventBusにPutするイメージです。 再生されたイベントにはreplayNameという項目が追加されるので、オリジナルのイベントと再生されたイベントを区別することができます。

ただし、現状はアーカイブの中身を参照・検索することはできませんでした。 なので、アーカイブ設定に加えて、全てのイベントをS3にも蓄積しておいてAthena等で検索できるようにしておくと、調査やリカバリがしやすくなりそうです。

リカバリについては、基本的にはアーカイブ再生を利用し、特に迅速な自動リカバリが必要なユースケースでは個別にDLQを設定するのがよいかと思いました。

設定の管理方法

イベント連携基盤の共通リソースとなるEventBusやアーカイブ設定等については、共通インフラリソースを管理しているIaCのリポジトリで管理すればよいとして、 ユースケースごとに各機能開発チームで作成する部分(RuleやTarget)をどこでどう管理するかを決める必要があります。

中央集権的な統一管理を重視するか、機能開発チームごとの柔軟・スピーディーな管理を重視するかで、管理方法が変わってきそうです。 以下の2つの案が考えられました。

  • IaCのリポジトリ内で設定を管理し、IaCのデプロイワークフローで設定をデプロイする
    • インフラリソースと同じリポジトリならTerraformになるが、別リポジトリならアプリケーション開発者の志向に応じてCDKなどでもOK
  • アプリケーションのリポジトリ内で、JSON等で設定を管理し、アプリケーションのCI/CDワークフローで設定をデプロイする
    • ECSサービスのタスク定義をアプリケーションのリポジトリで管理してecspressoでデプロイしているのと同様のイメージ

検討結果としては、必ずしもアプリケーションのデプロイと同時にデプロイする必要はなく、設定の更新頻度もそれほど高くはないと思われたので、EventBus等の共通リソースと同じIaCリポジトリ内で管理するのがよいと考えました。 Subscriber側の機能開発チームがPullRequestを作成し、Publisher側などの関係チームやSREがレビューをしたうえで、マージ・デプロイするという運用がよさそうです。

アプリケーション実装

Publisher側の対応

保全性に関連して、イベント連携においてPublisherとなるアプリケーションでは、集約の更新・DB保存とそれによるドメインイベントのMessageBrokerへの送信の整合性に注意する必要があります。 つまり、DB保存とMessageBroker送信のどちらか一方だけが成功した不整合な状態が発生した場合、結果整合性を保つために、成功した方を取り消したり、失敗した方をリトライするといった対応を考えることになります。

この課題に対する解決パターンとして、Transactional outboxパターンというものがあります。

参考記事:https://microservices.io/patterns/data/transactional-outbox.html

この実装方法を考える必要があるのですが、大きく分けて2つの案があるので、それぞれ検討をしました。

  • ポーリング方式の案(上記の参考記事の記載だと「Polling publisher」)
  • CDCツールを利用する案(上記の参考記事の記載だと「Transaction log tailing」)
ポーリング方式の案

こちらは、Publisherアプリケーションがイベントを記録するoutboxテーブルに対して、定期的に参照して新規レコードがあればMessageBrokerに送信する方式です。この処理のことをMessageRelayと呼んでいます。

そこまで複雑な処理ではないのですが、いざ自分で実装しようとすると意外と考えることが多いと感じました。

  • outboxテーブルのテーブル設計
    • 多種のドメインイベントを汎用的に保存・管理できるテーブル構造
    • MessageBrokerへの送信完了状態を表現する方法(オフセット、送信時刻または送信済フラグ)
  • outboxテーブルへのポーリングのパフォーマンス・負荷
    • ポーリング間隔、検索条件、インデックス、バッチサイズ等
  • MessageRelayが複数台で並列動作する場合、ロックや重複の回避、順序保証
  • モノリス(PHP)の場合は、MessageRelayを別プロセスとして起動する必要がありそう
CDCツールを利用する案

こちらは、DBのトランザクションログを利用して、テーブルの更新をイベントとして扱ってMessageBrokerに送信する方式で、CDC(Change Data Capture)とも呼ばれます。

CDCを0から実装するコストは高そうなので、CDCを実現できるツールの選定が必要となります。

  • Dynamodb streamsが利用できるとよいですが、コドモンのDBは現状はほぼAuroraですし、Dynamodbが適さない用途も多いです。
  • debeziumが有名ですが、直接EventBridgeへの連携には対応していないようです。(http clientのsink connectorを使えば可能かもしれませんがが、利用例は多くなさそうです。)

Subscriber側の対応

SubscriberとなるアプリケーションがAPIで受信できることを期待したAPI Destinationsでしたが、現状はInternetからアクセス可能なPublicなAPIにしか対応していません。 今回は、システム内のノード間のイベント連携での利用を想定しているので、システム外からの意図せぬアクセスを制限する方法を考える必要があります。

PublicなAPIとしつつアクセス制限を行う方法としては、以下のようなものが考えられますが、それぞれに課題があります。

  • ソースIP制限
    • EventBridgeのIPアドレスレンジは広く、かつ変動する可能性もあるため、アクセス制限として有効でなく、運用も難しい
  • クライアント認証
    • API Destinationsが対応する方式は、APIキー、Basic認証、OAuthの3種類
      • OAuthに対応させるのは開発負担が比較的大きい
      • APIキーかBasic認証の場合、セキュリティのレベルが比較的低く、キーのローテーション等の運用も必要
  • Publisher側での署名(JWT等)付与と、Subscriber側での検証
    • システム内からのみ取得可能な共通鍵を使用した署名と検証を行い、通信経路というよりイベントデータ自体の有効性をチェックする
    • Subscriber側のAPI仕様に合わせて入力変換してしまうと署名の有効性が下がってしまう
    • 署名に有効期限を設定すると、期限超過後にアーカイブからの再生ができない(再生してもAPI側で無効なデータと扱われてしまう)
  • イベントデータの暗号化
    • 共通鍵でPublisher側での暗号化とSubscriber側での復号化を行う
    • イベントデータが暗号化されるため、Ruleによるイベント抽出条件の指定や調査等が難しくなる

この中ならクライアント認証か暗号化が現実的と思われます。

ただし、別の問題として、検証環境やステージング環境ではInternetからのアクセスを制限しています。 そのため、EventBridgeからのアクセスのみを許可する、または、EventBridgeから受信するAPIのみを公開する、といった方法を検討する必要があります。

システム内のイベント連携のために、こういった検討や対応が必要になるというのは納得感が低くなってしまいます。 本来のAPI Destinationsの想定用途(どちらかというと社外のシステム・サービスとの連携)からは外れているのかもしれません。

API Destinationsを使用せずに、VPC接続が可能なLambda等からAPI送信する方法も考えられます。 しかし、この場合は、作成・管理するリソースが増えるのに加え、API Destinationsであれば自動でやってくれたリトライやレートリミットなどの処理を自分で実装する必要があります。

まとめ

EventBridgeのカスタムイベントバスおよびAPI Destinationsについて、調査したり実際にさわってみたうえで、利用にあたって気になった点を挙げました。

  • よかった点

    • 期待通り、構築・設定が容易でPub/Subを実現でき、小さくはじめるのに最適
    • アーカイブ再生によるリカバリが使いやすい
  • 気になる点

    • 利用する機能開発チームは、アプリケーションに加えて、EventBridgeの設定を管理する必要がある
    • Publisherアプリケーションは、Transactional outboxパターンの実装が必要となる
      • モノリス(PHP)で利用する場合は新規プロセス(ECSサービスやLambda等)の作成が必要
    • Subscriberアプリケーションは、APIでイベント受信したい場合、クライアント認証や暗号化の仕組み、または、VPC経由でAPI受信する仕組みを、用意する必要がある

具体的に構成を検討してみて感じた点として、EventBridgeは、VPC内にEC2やECSで起動した既存アプリケーションに対して、後からイベント連携の仕組みとして追加するのには、あまり向いていないのではと思いました。

アプリケーション自体もLambda ベースのサーバレスな構成にしてはじめて、EventBridgeの真価が発揮できるのかもしれません。 その場合はAPI Destinationsのprivate接続が不要になりますし、順序保証が必要な部分に限ってはFIFOのSNSやSQSを使えばよさそうです。

当初の発想として、LambdaやSQSのリソースを個別に構築・運用することを避けるための共通基盤を求めていました。 しかし、サーバレスな構成を考えるうえでは、各機能開発チームが多数の小さなリソースを構築・運用すること自体のハードルを下げるアプローチ(例えば、CDKカスタムConstructやTerraformモジュール、Observabilityを高める仕組みの提供など)を考えた方がよさそうに感じました。

次回は、MSK(Axon Frameworkとともに)について記載予定です。