こんにちは、プロダクト開発部の中田です。
今回は、イベント連携基盤の技術選定において、前回のEventBridge編に引き続き、MessageBrokerの候補として挙げたAmazon Managed Streaming for Apache Kafka(以降、MSKと記述)およびAxon Framework(以降、Axonと記述)について、実際にさわってみて気づいた点をご紹介します。
- イベント連携基盤の技術選定をした話 〜背景・課題編〜
- イベント連携基盤の技術選定をした話 〜EventBridgeさわってみた編〜
- イベント連携基盤の技術選定をした話 〜Axon + MSKさわってみた編〜 ←今回はこの話
- イベント連携基盤の技術選定をした話 〜比較・結論編〜
本記事の概要
前回記事ではEventBridgeをさわってみて気づいたことを記載しましたが、今回はMSKについて、同様に、イベント連携基盤のMessageBrokerの候補として挙げた理由や構成案と、さわってみて得た気づきをまとめます。
詳細は後述しますが、MSKはAxonと組み合わせて利用することを前提として候補に挙げたので、MSKやKafkaそのものというよりも、Axonの使い勝手に関する記述が多くなっている点はご了承ください。
Axon+MSKを候補にあげた理由
きっかけは、成瀬允宣さんの以下の発表や資料を見たことです。
これらの資料は、Axonを知るきっかけになっただけでなく、最初の記事で記載したイベント連携基盤の要件整理においても非常に参考になりました。
Axon
Axonは、DDDおよびCQRS/ESを実現するフレームワークですが、前回記事でも記載した、PublisherのTransactional outboxパターンの実装をはじめ、Subscriber側でのリトライやDLQや再生など、イベント連携において必要とされる機能も提供してくれるので、CQRS/ESは行わずにマイクロサービス間のイベント連携を行うだけの場合でも利用できます。
SpringBootとの統合も可能なため、AutoConfigurationにより少ない設定項目で簡単に導入することができますし、Configurationクラスを実装することで部分的にカスタマイズすることも可能です。
また、AxonをKafkaと統合するためのextensionも提供されているので、抽象化されたインターフェースでアプリケーションからほとんど意識することなくKafkaとのイベント送受信をすることができます。
MSK(Apache Kafka)
Kafkaを単体で候補に挙げたというよりは、Axonと統合されているMessageBrokerとしてKafkaを選んだという形です。
Kafka自体も、メッセージング基盤として広く利用されているものなので、信頼性やスケーラビリティが期待でき、Kafka Connectによる他リソース連携をはじめ、拡張性があります。
とはいえ、Kafkaのクラスターを構築・運用するには、Kafkaについて一定の知識が必要な点は気がかりです。 AWSの場合、KafkaのフルマネージドサービスとしてMSKがあるので、それを利用することで、構築・運用の負担を軽減したいところです。
MSKの場合、AWSの他サービスであるLambda、Firehose、EventBridgePipes等との統合が可能な点もメリットになります。
構成案
AxonとMSKを利用する構成として以下のようなものを考えました。
もともとKotlin/SpringBootで実装されているマイクロサービスについては、Axonを導入するだけで直接Kafkaとイベント送受信を行うことができます。
PHPで作られたモノリスについては、後述します。
検証内容
要件を実現するためのAxonでの実装方法を理解することを主な目的とし、実際にAxonを利用したサンプルアプリケーションを実装してみながら理解を進めました。
確認した内容の概要としては以下になります。
- 基本的な動作の確認
- Kafkaへのイベント送信
- イベントの型(クラス)と送信先Kafkaトピックのマッピング
- 複数ノードからの送信時の挙動確認
- Kafkaからのイベント受信
- KafkaMessageSourceの比較
- SubscribableKafkaMessageSource
- StreamableKafkaMessageSource
- 複数ノードでの負荷分散方法
- KafkaMessageSourceの比較
- Kafkaへのイベント送信
- 障害時の調査・リカバリ方法
- Producer
- event-processor-modeとconfirmation-modeの設定ごとの挙動確認
- acks設定ごとの挙動確認
- Consumer
- EventHandlerメソッド内で例外スロー時の挙動確認とカスタマイズ方法確認
- DLQの登録およびリドライブの方法の確認
- Replay方法の確認
- Producer
- gaugeでの結合テスト方法
- API単位の結合テスト相当のテストを、Kafkaのイベント送受信を含めて行う方法
ローカル環境では、docker composeでKafkaおよびProducer/Consumerとなるアプリケーションを起動し、動作確認を行いました。
AWS上では、MSKクラスターをマネジメントコンソールから作成し、Producer/ConsumerとなるアプリケーションをECSで起動して、MSKに接続しました。
Axonでイベントを送受信するときのKotlinのコードのイメージは以下のようなものです。
Producer側
suspend fun handleEvent(event: DomainEvent) { when (event) { is ConveniencePaymentCompleted -> eventGateway.publish(event.toSharedEvent()) } }
Consumer側
@EventHandler fun on(event: ConveniencePaymentCompleted) { if (event.paymentSubjectId == facilityStoreSubjectId) { ConvenienceUseCase.CompleteInput( paymentId = event.paymentId, completedAt = event.paymentDate, processedAt = event.processDate, ).let { input -> convenienceUseCase.complete(input).getOrElse { it.handle() } } } }
大変だったのはAxonに関する情報量がまだ少ないことでした。特に日本語の情報は少なく、英語が得意でない私は公式ドキュメントを翻訳しながら読むしかない状況でした。 ChatGPTなどに聞いてみても、公式ドキュメントで見たことがある情報以上は出てこないことが多かった印象です。(プロンプトが良くなかっただけかもしれませんが)
結局参考になったのは、以下の3点でした。
ちなみに、IntellijがJavaのコードをコピペすると自動でKotlinに変換してくれるのは地味に便利でした。
また、やはりKafka自体を理解することも避けることはできませんでした。 コドモンではオライリー学習プラットフォームが導入されているので、Kafka: The Definitive Guideなど、まだ日本語版が出ていない書籍をブラウザで翻訳しながら読むことができたのはありがたかったです。
気になること
MessageBroker(MSK)
クラスター構築
やはりKafkaクラスターの構築・運用の難易度がもっとも気になるところです。
AWSのマネジメントコンソールからMSKクラスターを構築してみたところ、設定項目はそれほど多くはなく、難しくない印象でした。 KafkaのバージョンやBroker台数や認証方法と、VPC設定や通信・保存の暗号化ぐらいです。 Kafkaクラスター自体の設定項目は多くありますが、MSK用のデフォルト設定一式が用意されているので、少なくとも最初はそれをそのまま使うのが無難そうです。
ただ、作成してから起動してアクティブな状態になるまでにはかなり時間がかかったので、待っている間はうまくいくか不安になりました。だいたい30分ぐらいかかったかと思います。
バージョンアップ
KafkaおよびMSKのバージョンアップ頻度の傾向を見ると、およそ1〜2年ごとに新バージョンがリリースされているようです。
互換性ポリシーについてはKafkaのドキュメント内でも言及はされていますが、AxonおよびKafkaクライアントのライブラリは最新に保っておいた方がよさそうです。 コドモンではDependabotの運用が定着化してきているので、クライアント側のバージョンアップについては比較的安心できそうです。
Kafkaクラスター側のバージョンアップは、マネジメントコンソールから無停止で容易に実行できそうです。 ただし、ローリングアップデートとなり、かなり時間はかかりそうです。バージョンアップではないですが、クラスターの設定変更を行なった際も、作成時と同様に30分ぐらいかかったので、成功するまで待つ時間はちょっと精神的につらかったです。
また、バージョンダウンはできないようなので、事前に新バージョンのクラスターを別途立てて検証するなど、慎重に行う必要がありそうです。
料金
MSKは、EventBridgeとは異なり、EC2等のように稼働時間に応じて課金されるモデルになっています。 プロビジョンドタイプだけでなくサーバレスタイプでの起動も可能ですが、時間単位で料金が発生する点は同じです。
本番稼働想定で、kraftモードや階層型ストレージが使用できるインスタンスタイプでの最小構成だと、ストレージを含め月あたり700USD前後かかる計算になります。 ちなみに、サーバレスタイプだと、上記のプロビジョンドタイプの最小構成よりも高くなったうえで、パーティション作成ごとに追加料金が発生するので、選択しませんでした。
検証環境やステージング環境にも同様のクラスターを構築するとなると、2倍、3倍のコストがかかることになります。
EventBridge(API Destinations使用の場合)が、イベント100万件あたり1.24USDであるのと比べると、かなり高くつきます。特に導入初期は利用がまだ少ないので、料金差が顕著です。
将来性
AWSのドキュメント等を見る限り、MSKは、主にオンプレからの移行用途や、Kafka Connect等の周辺ツールを利用したいケースで紹介されており、それ以外ではそれほど推奨はされていないようです。 そのため、今後の機能改善のペースやサービス継続については、若干の不安があるのが正直なところです。
アプリケーション実装
Axon(kafka-extension含む)の設定の検討
AxonのKafka連携に関する設定内容を理解し、ユースケースに応じて選択する必要があります。
Publisher(Kafkaの用語ではProducer)側
AxonからKafkaにイベント送信するうえでの主な設定が、event-processor-modeとconfirmation-modeです。
詳細はAxonのドキュメントを参照いただくのがよいと思いますが、簡単に言ってしまうと以下のようなものと理解しています。
- event-processor-mode:発行したイベントのDB保存とKafka送信を
- 同一スレッドで行うか(SUBSCRIBING)
- 別スレッドで非同期に行うか(TRACKING)
- confirmation-mode:イベント発行処理を
- Kafka送信成功してはじめて完了扱いとするか(WAIT_FOR_ACK or TRANSACTIONAL)
- Kakfa送信成功を待たずに完了扱いとするか(NONE)
この2つの設定の組み合わせにより、Kafka送信が失敗した場合の挙動が変わってきます。 なお、いずれの設定でも、Kafka送信が失敗した後は、Axonが自動でリトライを続けます。
例えば、検証時に想定していたユースケースでは、Kafka送信エラー発生時点でエラーログ出力して検知はしたいが、Kafka送信が失敗しただけであれば、処理は継続したい(APIトリガの処理なので、APIの成功レスポンスは返したい)と考えていました。 この場合は、event-processor-modeはSUBSCRIBING、confirmation-modeはWAIT_FOR_ACKに設定するのがよいと思われます。
機能・処理におけるKafka送信の失敗時の影響に応じて、即時検知やロールバックの要否などを考慮して設定を選択する必要があります。
また、AxonというよりKafka送信における設定ですが、acksの設定も考えた方がよさそうです。 こちらも簡単に言うと、Kafka側でイベントのレプリケーションが完了してはじめて送信完了扱いとするか(ALL)、それを待たずに送信完了扱いとするか(0 or 1)、だと理解しています。
保全性とパフォーマンスのトレードオフとなりそうですが、いまのところ性能要件はさほど高くないので、パフォーマンスより欠損を避けることを優先し、ALLにするのがよさそうです。
Subscriber(Kafkaの用語ではConsumer)側
AxonでKafkaからイベントを受信(取得)するうえでの主な設定は、KafkaMessageSourceの種類の選択です。
大きく分けて以下の2種類があるのですが(Axonのドキュメントはこちら)、 この2つの違いは、オフセット(Consumerがトピック内のどのメッセージまでを処理したか)の管理方法だと理解しています。
- SubscribableKafkaMessageSource:Kafka側で、Consumer Group機能を使用して管理する
- StreamableKafkaMessageSource:Consumerアプリケーション(Axon)側で、DB(token_entryテーブル)を使用して管理する
主なメリット・デメリットとして以下などがあります。
- SubscribableKafkaMessageSource
- 複数ノード分散をKafkaがやってくれる
- 1パーティションの全てのデータを1スレッドで処理
- AxonのReplay機能が使えない
- Consumerのイベント処理のエラー時のリトライにはKafkaへのcommitを自分で制御する必要がある
- StreamableKafkaMessageSource
- 複数ノード分散をアプリケーション側で制御する部分を自分で実装する必要がある
- 1パーティションのデータを複数スレッドで並列処理できる
- AxonのReplay機能が使える
- Consumerのイベント処理のエラー時のリトライやDLQ使用をAxonがやってくれる
Axonが推奨しているのはStreamableKafkaMessageSourceなので、Consumer側でDBを使いたくないなどの要件がなければ、そちらを選ぶのがよさそうです。
ただし、複数ノード分散とReplay機能については、実装するうえで以下のような課題も見つかりました。
- 複数ノード分散:複数ノード間で負荷を平準化する処理を定期的に実行する必要がある。
- Replay機能:複数ノードで動いている場合、Consumer処理を全てのノードで停止してからReplayを開始する必要がある。
当初は、APIサーバとしてロードバランサーを利用して複数ノードで負荷分散しているアプリケーションに対して、Axonを導入する想定だったので、Consumerとしての処理も複数ノードで分散させようと考えていました。
ですが、上記の手間を考えると、Consumer専用のサーバを別途立てるか、負荷(イベント流量)が少なければ複数ノードのうち1ノードのみで処理した方がシンプルでよいかもしれません。 その場合でも、ノードが障害やリリースで停止した際には、自動で別ノードが引き継いで処理を継続してくれます。
モノリス(PHP)からどう利用するか
Axonは現状はJavaやKotlinにしか公式に対応していないため、PHPで作られたモノリスやGoのマイクロサービスについては、Axonとのアダプターとなるプロセスを用意する必要がありそうです。
PHPおよびGoから直接Kafkaに接続するためのライブラリも存在しますが、できればイベント連携をするうえでAxonの機能を活かしたいところです。
Publisher(Kafkaの用語ではProducer)側
モノリスがPublisher側となる場合は、Debeziumを利用したCDCで、トランザクションログ(MySQLなのでbinlog)をKafkaに送信する案があります。 DebeziumはKafka Connectのプラグインとして提供されているため、MSKの場合はMSK Connectを利用して起動することができます。
DBの更新をイベントとして扱うことができますが、課題もあります。
まず、1つの集約の更新が複数テーブルの更新となる場合、複数テーブルの更新ログから1つのドメインイベントを解釈して生成するのが難しい可能性があります。 そもそもモノリスの中にはまだDDDにもとづく実装になっていない部分も多いので、集約やドメインイベントの概念に沿ったテーブル更新となるようにするためには、まずリファクタリングが必要と思われます。
また、Axonは、Axonのイベントとして生成される形式のKafkaメッセージしか処理しないため、DebeziumのメッセージをAxonのイベントに変換する処理が必要となります。 モノリス側のリファクタリングが難しい場合は、このタイミングで複数テーブルの更新ログを中間テーブルに整形・蓄積しつつ、ドメインイベント(Axonのイベント)として変換してKafkaに入れ直す方法も考えられます。
Subscriber(Kafkaの用語ではConsumer)側
モノリスがSubscriber側となる場合は、KafkaからAxonのイベントを受信した契機でモノリスのAPIを呼び出す必要があります。
Kafkaのメッセージを直接取得して処理するのでなく、Axon経由で受け取ることで、Axonのリトライ、DLQ、Replay等の機能を利用することができます。 この場合、このAxonを利用するアダプタープロセスをSubscriberとなる複数チームで共有することになりそうなので、依存や競合に注意が必要となる点が懸念です。
Axonの機能が不要だったり代替できる場合は、MSKトリガーを使用したLambdaからAPIを呼ぶこともできます。 この場合は、トピックとSubscriberの組み合わせごとにLambda作成が必要になりそうです。このあたりは前回記事に記載したEventBrige利用かつAPI Destinationsを利用しない場合と同様です。
その他
他にも、イベントのスキーマをどう管理するか、Kafkaを介した結合テストをCIでどう行うか、といった課題もありましたが、比較的容易に解決できたので、記事の分量の都合もあり割愛させていただきます。
まとめ
MSKおよびAxonに対して、調査や検討が必要となった部分について記載しました。
よかった点
- Axonの機能のカバー範囲が広く、自分で実装する必要がある部分が少ない
- KotlinとSpringBootで実装しているマイクロサービスへの組み込みが用意
- private subnet内でECSサービスとMSKが直接通信できるので、セキュリティやNW構成の考慮・対応が比較的少ない
気になる点
- モノリスとの連携するための検討・構築要素が多い
- MSKの保守の難易度や将来性に不安がある
- 特にスモールスタート時点では料金が高め
感想として、やはりKafkaはメッセージング基盤として成熟していますし、Axonも利用が容易でDDDを志向しつつイベント連携をしたいコドモンの現状にフィットすると感じました。イベント連携基盤として利用するうえでの実現性は高いと思います。
ただ、どちらも、使い始めるうえでは、それなりに学習が必要な点はハードルにはなってしまいそうです。学習コストという言葉はあまり使いたくないですが、投資としてエンジニアに前向きに取り組んでもらえるよう働きかけは必要かもしれません。
少なくともKafkaについて学ぶことは、メッセージングや分散処理について勉強になることが多いですし、Axonも今後どこまで広まるかは未知ですが、CQRS/ESにチャレンジするきっかけにできればと思っています。
次は最終回として、EventBridgeとMSKの比較のまとめと、技術選定の結論をお伝えする予定です。