コドモン Product Team Blog

株式会社コドモンの開発チームで運営しているブログです。エンジニアやPdMメンバーが、プロダクトや技術やチームについて発信します!

複数の集約を更新するユースケースで結果整合性を実現する方法を検討しました

こんにちは、プロダクト開発部の中田です。

今回はドメイン駆動設計をもとに実装する際に、個人的によく悩む課題について、対応方法を調べたので共有します。

はじめに

ドメイン駆動設計を意識していて、実装するときによく悩むのは、集約の単位・境界です。

特に、以下のような、マイクロサービス(境界づけられたコンテキスト)の中でもメインとなる集約は、開発が進むにつれ利用するユースケースや責務が増えて大きくなりがちなため、意識的に分解していく必要があると思います。

  • 例1:用品販売サービスの 「注文」
  • 例2:決済基盤サービスの 「決済」

ユースケースや目的、関連する情報などに応じて、ドメインモデルを精査することで、適切なサイズの集約にうまく分解できるとよいのですが、集約を扱う際、特に注意が必要なのが、トランザクションや整合性の境界です。

ドメイン駆動設計における「集約」は、不変条件が成立した状態であることが保証される一連のデータのまとまりであり、永続化する際のトランザクション単位となるもの、と理解しています。

なので、集約を小さく分解していく際には、

「この2つの集約に対する更新は、必ず同時に(1トランザクションで)行われる必要があるか」

をよく考えます。

更新タイミングが多少ズレてもサービス上大きな問題がない、つまり結果整合性で十分な場合は、分けてOK!となるのですが、いざ分けてみると、実装する際にそのズレをどうハンドリングすべきか悩むことになりがちです。

複数の集約をまたぐ整合性が必要となった例

例として、現在開発中の保護者向けアカウント基盤サービスにおける集約の話をします。

ここでは 「ファミリー」「個人」 という2つの集約を設計しました。 ユビキタス言語による説明例をいくつかご紹介します。

  • 1つの 「ファミリー」 には複数の保護者または子どもが所属します
  • 保護者も子どもも1人の 「個人」 として存在します
  • 保護者は必ずアカウントを持ち、ログインしてサービスを利用します
  • 子どもも大きくなると、アカウントを持つことができ、ログインしてサービスを利用できるようになります(塾での打刻等)
  • 大人はアカウントを持つ1人の 「個人」 として、複数の 「ファミリー」 に所属することができます
  • 子どもも大人になると、自身が親となって新たな 「ファミリー」 を作って保護者として所属することができます

また、集約に対して更新を行うユースケースとして以下の図のようなものがあります。

ユースケースとドメインモデル

「ファミリー」「個人」 は、多対多の関係にあり、ライフサイクルも異なるため、別の集約として分けることは適切に思えます。

ただし、以下の例のように、「ファミリー」 にメンバーとして所属する子どもは、1人の 「個人」 でありつつ 「ファミリー」 とも密に結びついています。

  • 子どもは必ず1つの 「ファミリー」 の中で保護者によって登録されます。いきなり単独で生まれることはありません。 (一方で保護者は、自分でサインアップすれば 「ファミリー」 がない状態でも存在できます。)

  • 「ファミリー」 が削除された場合、そこに所属する子どもの 「個人」 としての情報も合わせて削除する必要があります。 (一方で保護者は、その 「ファミリー」 に所属しなくなるだけで、「個人」 としては存続できます。)

そのため、これらのユースケースでは、 「ファミリー」「個人」 の2つの集約を更新する必要があります。

整合性の実現方法の選択肢

上記のような背景において、整合性を実現する方法はいくつか考えられます。

検討においては、以下の記事などを参考にさせていただきました。昔からよくある課題なんですね。

DDDで複数集約間の整合性を確保する方法(サンプルコードあり)[ドメイン駆動設計] - little hands' lab

「DDDで複数集約間の整合性を確保する方法 Rev2」に対する考察 - かとじゅんの技術日誌

DDDにおいて、なぜ複数の集約にまたがってトランザクションをかけてはいけないのか(multiple aggregates in one transaction) - pospomeのプログラミング日記

案1. モデルを見直して複数の集約をまたぐ更新を不要にする

例えば、子どもの情報(氏名等)については、保護者と同等の「個人」ではなく、あくまで「ファミリー」を構成する情報として、「ファミリー」の集約内で扱う設計も考えられます。

子どもが大きくなってアカウントを持ったり、保護者となって新しいファミリーを作った際は、元のファミリーから情報をコピーして新たに「個人」として登録するという方法もあるかもしれません。

このようなモデルの見直しによって、うまく問題を解決できれば、それが一番よいと思います。

ただし、トランザクション範囲を理由にした見直しのせいで、モデルが実態と乖離してしまったり、集約が大きくなってしまっては、本末転倒です。

今回は、子どもを「ファミリー」の集約に含めてしまうと、以下の懸念があります。

  • ファミリーの責務がさらに増えてしまう、
  • 子どもがアカウントを持ったり保護者になった際の対応が複雑になってしまう

案2. そのまま1つのユースケースで複数の集約を更新する [同期 & 結果整合]

単純に、1つのユースケース内で複数の集約を更新してしまいます。 ただし、トランザクション単位は、あくまで集約単位であり、リポジトリメソッド単位です。

この場合、実装都合で無理にモデルを見直すこともなく、ユースケースの実装としてもシンプルに書けます。

しかし、1つ目の集約の更新と2つ目の集約の更新との間で、処理が失敗したりアプリケーションが落ちた場合、 片方の集約だけが更新された不整合な状態が発生する可能性があるので、その場合の対応方法を考える必要があります。

即時に不整合を解消しなくても影響が小さい前提であれば、不整合発生時のエラー監視通知や、定期的なリコンサイル処理などで、不整合を検知できるようにさえしていれば、手動でデータパッチする等のリカバリをすれば十分かもしれません。

そもそも、同一のRDBに対する処理途中の刹那のタイミングで問題が起きる確率自体も低いと考えられる場合は、発生頻度が低い前提で、手動対応でなんとかなることも多いかと思います。

案3. ユースケース単位のトランザクションにする [同期 & トランザクション整合]

複数の集約の更新をまたいで、ユースケース単位で1つのトランザクションにしてしまう案です。

実装もSpringであれば@Transactionalを使う等でシンプルに書けますし、案2のような不整合状態の発生やリカバリを気にしなくてよくなるのは楽です。

ただやはり、本来は結果整合で十分な複数の集約を、実装都合で1トランザクションにまとめてしまっており、概念とコード上の表現がずれてしまうので、できる限り避けたいところです。

案4. ドメインイベントを契機に別の集約を事後更新する [非同期 & 結果整合]

集約の単位が(少なくともいま考えられる限りは)適切で、かつ、複数の集約間で結果整合性を実現したい前提とします。

案2では、障害による不整合発生時に手動でのリカバリが必要という課題に対して、ドメインイベントを用いた非同期化かつ自動リトライの仕組みを入れることで耐障害性を強めることができるとよさそうです。

とはいえ、その仕組みを用意して適切に活用すること自体に、難しさ、不安、工数といったハードルを感じるのも、よくあることではないでしょうか。

「できればそうしたいけど、今そこまでやる余裕や必要性はあるかな。。。」

と言って他の案に流されたことが、これまでに何度もある気がします。

とりあえずの現実解、だがしかし

結局、今回もプロジェクト状況的に、いったん上記の案2で実装しておいて、他部分の開発を進めることになりました。

「ファミリーに子どもを登録する」ユースケースの実装としては、以下のような感じです。

class AddChildUseCase(
    override val individualRepository: IndividualRepository,
    override val familyRepository: FamilyRepository,
) {
    // 案2. そのまま1つのユースケース内で複数の集約を更新する
    // トランザクション単位は、各集約リポジトリのsaveメソッド単位であり、ユースケース単位ではない
    suspend fun execute(input: Input): Either<UseCaseError, Output> = either {
        // 認可処理等は省略

        val family = familyRepository.findById(input.familyId)
            .mapLeft { it.toUseCaseError() }.bind()

        // 1つ目の集約「Individual」の更新を行う
        val child = IndividualNoAccount.create(
            firstName = input.fistName,
            lastName = input.lastName,
        )
        individualRepository.save(child)

        // ここで落ちるとFamilyに紐づかないIndividualが残ってしまう
        // ユーザーが再実行した場合は新規IDのIndividualが作られるため、
        // 失敗分が残っていても影響はない
        // ただし、残ったIndividualのゴミデータは削除する必要がある

        // 2つ目の集約「Family」の更新を行う
        val updatedFamily = family.addChild(child)
            .mapLeft { it.toUseCaseError() }.bind()
        familyRepository.save(updatedFamily)

        Output.from(child)
    }
    // Input,Outputの定義は省略
}

だけど、やっぱりモヤモヤする。。。

上記の案4の場合も、実際やるとしたらどうなるのか?

将来に向けて、実際に試してみたうえで、設計の選択肢として持っておきたいと思ったので、スキマでちょっとだけ試してみました。

ドメインイベントを用いた非同期処理を試してみる

というわけで、上記のユースケースのコードからスタートして、非同期化してみます。

SpringBootアプリケーションなので、SpringのEventListnerを使用することもできそうではありますが、 せっかく「nrs使いたい放題プラン」に加入できたので、Axon Framework(以降、Axonと略記)でやってみようと思います。

アプリケーションへのAxonの導入

Axonの導入は、過去記事でも触れた通り、以前に検証したので慣れたものです。詳細は省きます。

ちなみに、今回は1つのマイクロサービスアプリケーション内に閉じた対応であり、Kafkaは使わないので、依存の追加は最小限です。

// build.gradle.kts抜粋
dependencies {
    // 既存の記述は省略
    implementation("org.springframework.boot:spring-boot-starter-data-jpa:3.4.4")
    implementation("org.axonframework:axon-spring-boot-starter:4.11.2")
}

ユースケースメソッドの分割

ユースケースのメソッドを、コントローラから直接呼ぶ処理とドメインイベント契機で非同期に呼ぶ処理に分けます。 これで、1つのメソッドが1つの集約だけを更新するようになります。

class AddChildUseCase(
    override val individualRepository: IndividualRepository,
    override val familyRepository: FamilyRepository,
) : FamilyGroupUseCase {
    private val logger = LoggerFactory.getLogger(this.javaClass)

    // Controllerから直接呼ばれる処理
    suspend fun execute(input: Input): Either<UseCaseError, Output> = either {
        logger.info("execute called")
        // 認可処理等は省略

        // 1つ目の集約「Individual」の更新を行う
        val child = IndividualNoAccount.create(
            firstName = input.fistName,
            lastName = input.lastName,
        )
        // ドメインイベント
        val childCreated = ChildCreated.create(
            familyId = input.familyId,
            childId = child.individualId,
        )
        individualRepository.save(child, childCreated)

        Output.from(child)
    }
    // Input,Outputの定義は省略

    // ドメインイベント契機で非同期で呼ばれる処理
    suspend fun onChildCreated(event: ChildCreated): Either<UseCaseError, Unit> = either {
        logger.info("onChildCreated called")
        
        val family = familyRepository.findById(event.familyId)
            .mapLeft { it.toUseCaseError() }.bind()
        ensureNotNull(family) { familyNotFoundError(event.familyId) }

        val child = individualRepository.findById(event.childId)
            .mapLeft { it.toUseCaseError() }.bind()
        ensureNotNull(child) { childNotFoundError(event.childId) }

        // 2つ目の集約「Family」の更新を行う
        val updatedFamily = family.addChild(child)
            .mapLeft { it.toUseCaseError() }.bind()
        familyRepository.save(updatedFamily)
    }
}

リポジトリで集約のデータとドメインイベントを永続化

リポジトリメソッドで、集約のデータとドメインイベントを、同一トランザクション内で更新・保存します。 イベントの永続化にはAxonのEventGatewayを使用します。

このあたりは、イベントソーシングであれば、イベントだけを保存すればよいので、もっとシンプルになるはずです。 今回はステートソーシングの後付けでイベントを使っているので微妙ですが、勉強用なのでいったん妥協してしまいます。

class FamilyRepositoryImpl(
    private val driver: FamilyDriver,
    private val dslContext: DSLContext,
) : FamilyRepository {
    // 集約データとドメインイベントを同一トランザクションで永続化する
    @Transactional
    override suspend fun save(family: Family, event: DomainEvent?) {
        val existingChildIds = driver.fetchChildrenByFamilyId(family.familyId).map { it.individualId }
        val removedChildIds = existingChildIds - family.childIds
        // 集約データの永続化
        driver.insertOrUpdateFamily(family.toFamiliesRecord(), dslContext)
        driver.insertOrUpdateChildren(family.toChildrenRecords(), dslContext)
        if (removedChildIds.isNotEmpty()) {
            driver.deleteChildren(removedChildIds, dslContext)
        }
        // イベントの永続化
        event?.let { eventGateway.publish(event) }
    }
}

ドメインイベント契機で後続処理を実行

AxonのEventHandlerを使用し、イベント契機で別集約の更新処理を呼び出すハンドラーメソッドを追加します。

class AddChildStep(
    private val addChildUseCase: AddChildUseCase,
) {
    private val logger = LoggerFactory.getLogger(this.javaClass)

    @EventHandler
    fun on(event: ChildCreated) = runBlocking {
        logger.info("Processing event: $event")
        addChildUseCase.onChildCreated(event).getOrElse {
            logger.error("Failed to add child [ childId = ${event.childId} ]")
            throw it.cause
        }
    }
}

フロントエンドの考慮事項

バックエンドAPIの非同期処理としては、上記の実装で、意外とあっさり期待通りに動いてくれました。

これで、AxonのEventHandlerとして呼び出される非同期処理は、もし一時的な問題で失敗しても、Axonが成功するまでリトライしてくれるので、手動でリカバリしなくても結果整合性を保ちやすくなりました。

しかし、フロントエンドとの結合テストを実行したところ、テストが失敗してしまいました。

フロントエンドでは、子どもを登録するAPIが成功レスポンスを返却すると、子どもの一覧を取得できるAPIを再取得します。

今回の失敗原因を確認したところ、再取得した時点ではまだ非同期処理が完了していないために、登録したはずの子どものデータが取得されず、表示できていませんでした。

(ちなみフロントエンドのReactアプリからのバックエンドAPI呼び出しとキャッシュにはtanstack queryを使用しています。)

登録した子どもが表示されない原因

バックエンドだけの話だと思い込んでいたのが甘かったです。。。

非同期処理の完了に対する待ち合わせを、フロントエンドとバックエンドのどちらでどう行うか、を考える必要がありそうです。

案1. フロントエンドで待ち合わせ

一番単純なのは、一定時間(例えば1〜3秒ぐらい)待ってからAPI再取得する方法です。 ただし、余裕を持って決め打ちした待ち時間にすると長めになってUXが悪くなってしまいますし、かといって短めにすると、非同期処理の完了が間に合わずに結局取得できない可能性が高くなります。これでは柔軟性に乏しいです。

取得できるまで短い間隔でポーリングすることもできそうです。 ただこれはこれで、「登録したはずの子どものデータ」を状態として保持したうえで、それが期待通り取得できるまでポーリングする必要があり、実装が複雑になってしまいます。 また、ポーリング用のAPIをバックエンドで別途用意する必要もありそうです。

楽観的UI更新等により、待ち時間に対するUXを改善する方法もありそうですが、いずれにせよ複雑さは増しそうです。

今回のケースでは、非同期処理の完了を待ったとしてもプラス数百ms程度なので、フロントエンドとしては、元の通り同期的に更新できるのが一番シンプルです。

案2. バックエンドで待ち合わせ

バックエンドで非同期処理の完了を待ち合わせてからAPIレスポンスを返すようにすれば、フロントエンドとしては同期のときと同じになります。

ユースケース以降の部分はドメインイベントを用いて非同期に実行しつつも、コントローラ側で非同期処理の完了を待ってからAPIレスポンスを返すように実装することはできそうです。

ユースケースより内側の関心事がコントローラに滲み出てしまう気がするのが若干気にはなりつつも、依存の方向的には許容できますし、AkkaやAxonでもsend-and-waitのような仕組みがあった気もするので、アリなのかもしれません。

案3. フロントエンドも含めReactiveに

WebSocketやServer-Sent Eventsを使って、バックエンドからフロントエンドにリアルタイムに通知することにより、フロントエンドも含めてReactiveにする方法もあるようですが、

「できればそうしたいけど、今そこまでやる余裕や必要性はあるかな。。。」

と、また言ってしまいそうです。

興味はありますが、話が広がりすぎるのでいったんやめておきます。

バックエンドでの待ち合わせの実装

というわけで、案2のバックエンドでの待ち合わせ処理を実装してみることにしました。

待ち合わせの方法については、最初はAxonのSubscription queriesという機能を使い、 実装してみてローカルで実行した限りでは、期待通り待ち合わせ処理が動作しました。 しかし、Axon Serverを使用せずに複数ノード構成で動かすと、呼び出し元のコントローラが非同期処理から通知を受け取れないことがあることがわかり、見送りました。

参考: https://discuss.axoniq.io/t/distributed-subscription-query-updates-in-multi-node-environments/6033/6

そこで、コントローラに独自にポーリング処理をいれることにしました。

案2. バックエンドで待ち合わせ

なお、もともとコントローラは、Spring WebFluxかつKotlinのcoRouterを使用して実装しています。 今回追加するポーリング処理も含め、できる限りノンブロッキングとなるように実装したいところです。

試行錯誤の結果として、以下のようなコードとなりました。 ちなみに、どのようなスレッド構成で動作したのかを確認するために、各所でログ出力しています。

class ChildrenApiControllerImpl(
    private val addChildUseCase: AddChildUseCase,
    private val childQueryService: ChildQueryService,
) : ChildrenApiController, AccountController {

    override val logger: Logger = LoggerFactory.getLogger(this.javaClass)

    override suspend fun postChild(request: ServerRequest): ServerResponse {
        logger.info("Handler method started")

        return request.bodyToMono<PostChildRequest>()
            .doOnNext { logger.info("Request body received") }
            .flatMap { requestBody ->
                // monoブロック内でsuspend関数であるUseCaseメソッドを実行
                mono {
                    logger.info("Inside mono block")
                    requestBody.toAddChildInput()
                        .flatMap { input -> addChildUseCase.execute(input) }
                        .map { output -> output.toResponseBody() }
                        .mapLeft { error ->
                            error.log()
                            error.toResponseBodyAndStatus()
                        }
                }
            }
            .flatMap { result ->
                logger.info("Before blocking call")
                // UseCase内で発行されたイベント契機で実行される非同期処理の完了を待ち合わせ
                Mono.fromCallable {
                    logger.info("Inside fromCallable for blocking operation")
                    result.fold(
                        // UseCaseがエラーの場合は、
                        // すぐエラーレスポンスを返す(onErrorResumeでcatchさせる)
                        { error -> throw UseCaseErrorException(error) },
                        // UseCaseが成功した場合は、
                        // 追加された子どものレコードがテーブルに存在することを確認する
                        { child ->
                            logger.info("waiting for add child step to complete")
                            val child = childQueryService.find(child.individualId)
                            logger.info("Child added: [ ${child.memberId} ]")
                        }
                    )
                }
                    // ブロッキングIO処理を専用スレッドで実行
                    .subscribeOn(Schedulers.boundedElastic())
                    // レコードが追加されたことが確認できるまでポーリング
                    .retryWhen(
                        Retry
                            .backoff(5, Duration.ofMillis(100))
                            .multiplier(2.0)
                            .filter { ex -> ex !is UseCaseErrorException }
                            .doBeforeRetry { retrySignal ->
                                val attempt = retrySignal.totalRetries() + 1
                                logger.info("Retrying, attempt: $attempt")
                            }
                            .onRetryExhaustedThrow { _, retrySignal ->
                                TimeoutException("Add Child step has not completed")
                            }
                    )
                    // レコード追加が確認できたら、成功レスポンスを返す
                    .map { result.toResponse(successStatus = HttpStatus.CREATED) }
            }
            .doOnSuccess { logger.info("Processing finished") }
            .onErrorResume { ex ->
                when (ex) {
                    // UseCaseError発生時は内容に応じたエラーレスポンスを返す
                    is UseCaseErrorException -> Mono.just(error.toResponse())
                    // その他の例外は、別途ExceptionHandlerで処理する
                    else -> throw ex
                }
            }
            .awaitSingle() // Monoから結果を取得し、suspend関数の戻り値として返す
    }
}

実行すると、以下のようなログが出力されました。

# リクエスト受信直後は、WebFluxにより別スレッドで処理されている
05:29:21.144 INFO [parallel-8] xxx.api.controller.ChildrenApiControllerImpl: Handler method started
05:29:21.145 INFO [reactor-http-nio-7] xxx.api.controller.ChildrenApiControllerImpl: Request body received

# UseCaseのexecuteメソッド(suspend関数)が、コルーチンにより別スレッドで処理されている
05:29:21.146 INFO [DefaultDispatcher-worker-1] xxx.api.controller.ChildrenApiControllerImpl$postChild$3$1: Inside mono block
05:29:21.146 INFO [DefaultDispatcher-worker-1] xxx.core.usecase.AddChildUseCase: execute called

# UseCase実行後のポーリング処理が、WebFluxのブロッキングIO用のスレッドプールで実行されている
05:29:21.166 INFO [DefaultDispatcher-worker-1] xxx.api.controller.ChildrenApiControllerImpl: Before blocking call
05:29:21.166 INFO [boundedElastic-1] xxx.api.controller.ChildrenApiControllerImpl: Inside fromCallable for blocking operation
05:29:21.167 INFO [boundedElastic-1] xxx.api.controller.ChildrenApiControllerImpl: waiting for add child step to complete
05:29:21.168 INFO [boundedElastic-1] xxx.api.controller.ChildrenApiControllerImpl: Retrying, attempt: 1

# イベントハンドラー処理が、Axonにより別スレッドで実行されている
05:29:21.172 INFO [EventProcessor[xxx.core.process]-0] xxx.core.process.AddChildStep$on$1: Processing event: xxx.core.domain.event.ChildCreated
05:29:21.173 INFO [EventProcessor[xxx.core.process]-0] xxx.core.usecase.AddChildUseCase: onChildCreated called

# イベントハンドラー処理の完了後に、ポーリング処理の2回目で完了を検知し、成功レスポンスを返している
05:29:21.288 INFO [boundedElastic-1] xxx.api.controller.ChildrenApiControllerImpl: Inside fromCallable for blocking operation
05:29:21.288 INFO [boundedElastic-1] xxx.api.controller.ChildrenApiControllerImpl: waiting for child add step to complete
05:29:21.289 INFO [boundedElastic-1] xxx.api.controller.ChildrenApiControllerImpl: Child added: [ xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx ]
05:29:21.290 INFO [boundedElastic-1] xxx.api.controller.ChildrenApiControllerImpl: Processing finished

期待通りに、スレッドを切り替えながら、待ち合わせ処理が実行されているようです。

ただ、思った以上にコードがややこしくなってしまいました。。。

MonoのfromCallableretryWhenなどの機能を利用して、ポーリング処理を比較的容易に実装できた点はよかったのですが、 そもそも最初にcoRouterによりWebFluxからKotlinコルーチンの世界に切り替えているので、コルーチンのみで実装した方がよりKotlinらしく書けた可能性もあります。

まだまだリファクタリングの余地はあるとは思いますが、いずれにせよ多少の複雑さを追加することにはなりますし、負荷試験によるパフォーマンス影響の確認も必要です。 特に本番向けプロダクトで取り入れるべきかどうかは、状況に応じて慎重に検討する必要がありそうです。

まとめ

SpringBoot/Kotlinで書かれたWebアプリケーションにおいて、複数の集約にまたがる更新処理を、ドメインイベントを用いて結果整合となるよう実装してみました。

1つ目の集約の更新処理の後に、ドメインイベントを介して、別の集約の更新処理を非同期に呼び出すことは、Axonを使用して比較的容易に実現することができました。

ただし、フロントエンドにおいて、バックエンドで非同期に実行された処理結果を反映する方法についても、考える必要があることがわかりました。

今後、機会があれば、Axon等を利用したCQRS/ESによるバックエンド実装に加えて、Server-Sent Events等も活用してフロントエンドも含めてReactiveなシステムの構築にもチャレンジしてみたいと思いました。