コドモン Product Team Blog

株式会社コドモンの開発チームで運営しているブログです。エンジニアやPdMメンバーが、プロダクトや技術やチームについて発信します!

つつましくはじめるバッチ処理

こんにちは、プロダクト開発部の中田です。

新規プロダクトの開発においてバッチ処理を作る機会があったのですが、 設計時の考え方がこれまでとは少し違うところがあったので、今回はその話をします。

「つつましさ」とは

バッチ処理を「つつましく」はじめる、というのはつまり、以下の2点を重視する必要があったということです。

  • 最小限の構成要素とし、構築・運用コストを抑える
  • 周辺システムへの負荷や影響を最小限にする

では、なぜその必要があったのか、背景から説明していきます。

最小限の構成要素とし、構築・運用コストを抑える

新規プロダクトの開発にあたり、事業・開発体制ともに小規模からスタートしている状況でした。 うまくPMFに成功し、ユーザー数を伸ばして事業拡大していけるかは、まだわかりません。

その状態で、必要以上のスケーラビリティを実現するために、最初からシステムに複雑さを持たせてしまうのは得策ではありません。

初期開発コストが増えてリリースタイミング遅延による機会損失が発生する可能性があります。 また、運用のコスト・負担が大きくなり、その後の開発スピードの低下や利益圧迫につながる可能性もあります。

そのため、なるべくシンプルな構成で、必要最低限の機能・性能を早期に実現することが重要となります。

周辺システムへの負荷や影響を最小限にする

この新規プロダクトは、既存プロダクトのシステムと連携して動作する必要がありました。

既存プロダクトはユーザー数も多く安定稼働が重要であるため、新規プロダクトの処理が安易に既存プロダクトのシステムに負荷をかけたり影響を与えるわけにはいきません。

また、外部サービスのAPIを呼び出す必要もありました。 そういったAPIにはレートリミットが設定されていることが多いですよね。

たとえば生成AI系のAPIでは、利用量・利用額が増えるのに応じて、徐々にTierが上がってレートリミットが緩和されていくものがあります。 その場合、リリース直後からいきなり高いリクエストレートで利用することはできず、低いリクエストレートから始めて利用実績を積んだ上で、 プロダクトのスケールに応じてレートを上げていく必要があります。

バッチ処理の前提

前提として、今回作成したバッチ処理は、以下のような特性を持ちます。

  • プロダクトを利用する全ユーザーに対して日次や月次でユーザーデータ処理を行う
  • ユーザーデータ処理では、連携先システムや外部サービスのAPIを呼び出す
  • 複数のユーザーをまたいだ集約・集計のような処理は不要

つまり、ユーザー単位のmapやfilterだけで処理でき、reduceは不要なため、並列分散処理を行うのが比較的容易なものです。

また、前述のとおり、連携先システムや外部サービスのAPI呼び出しのリクエストレートは適切なレベルに抑える必要があります。

性能についての考え方

このようなバッチ処理では、性能評価の考え方もよくあるケースとは少し異なります。

想定するデータ量に対する動作状況を確認する点や、消費HWリソース量あたりの処理量を高める点は同じですが、 評価の方向性が以下のように違います。

  • よくあるバッチ処理
    • HWリソースをうまく使い切って、最大限のスループットで最小限の処理時間で完了することを目指す
  • つつましいバッチ処理
    • HWリソース消費や周辺システムへの負荷を最小限に抑えつつ、許容できる時間内に処理を完了させることを目指す

全力で一気に片付けられることよりも、省エネで細く長くこなせることの方が、評価されるわけです。

採用したアーキテクチャ

では、今回採用したアーキテクチャについてご紹介していきます。

アーキテクチャ概要

システムアーキテクチャ

単一のECSサービスが複数のSQSキューとメッセージを送受信しながら連鎖的に処理を進める構成となっています。

クラウドデザインパターンでいうQueuing Chainの典型ですが、 メッセージ送受信するのが単一のECSサービスである点が、最小限の構成としての特徴かと思います。

ECSサービスは、Spring Boot/Kotlinで実装した単一のアプリケーションプロセスのECSタスクで構成されており、 このアプリケーションは、バックエンドAPIサーバーとしての機能と、バッチ処理のスケジューラおよびSQSリスナーとしての機能を、両方持っています。

一般的なAPIサーバーの実装に加えて、以下のようなバッチ関連処理が相乗りしており、複数のSQSキューを介して処理間連携している状態です。 SQSキューからメッセージを受信して行う処理は、Spring Cloud AWSの@SqsListenerアノテーションを使用して実装しています。

  1. スケジューラ処理

    • Spring Bootの@Scheduledアノテーションを使用して日次や月次で定期起動する
    • 処理開始メッセージを開始キューに送信する
    • FIFOキュー(重複排除機能あり)を使用することにより、複数ECSタスクからのエンキューによる重複起動を防止する
  2. 処理対象ユーザー抽出処理

    • 開始キューからメッセージを受信して処理開始する
    • 内部DBからバッチ処理対象ユーザーのリストを抽出する(カーソルを利用したストリーミング)
    • リストを元に、ユーザー単位のメッセージをユーザーデータ処理キューに送信する
  3. ユーザーデータ処理

    • ユーザーデータ処理キューからメッセージを受信して処理開始する
    • 内部DBおよび連携先システムや外部サービスのAPIを利用してユーザーごとのデータ処理を行う

なお、ユーザーデータ処理は、必要に応じてさらに複数ステップに分割しており、後続ステップがある場合は対応するキューにメッセージを送信します。

システムアーキテクチャの採用理由

バッチ処理のもっともシンプルな構成と言われたら、どんなものを思い浮かべるでしょうか。

私なら単一プロセスでのループ処理を想像します。ちょっとした処理であれば、シェル等のスクリプトをcronで回すだけで十分なことも多いでしょう。

では、前述のとおり最小限の構成を目指す中で、わざわざSQSを加えてQueuing Chainパターンを採用したのはなぜか。

処理間の疎結合化や障害復旧の容易さなど、Queuing Chainパターンの一般的なメリットはもちろん採用理由に含むものとして、 今回のケースにおいて特に重視した点をいくつか挙げます。

(1) スループットの調整がしやすい

キューを挟むことにより、メッセージをバッファリングしつつ、処理の並列度を容易かつ柔軟に調整できる点が、一番のメリットです。

スループット調整

今回は特に、スループットを上げるだけでなく、あえて下げることにより周辺システムへの負荷を抑えるという調整も必要でした。

とはいえ、単純にスリープを挟んで調整するようなやり方では、処理時間も含めて無駄が大きく、パラメータ調整も直感的にはできません。

内部DBアクセス、既存システム連携、外部サービス連携では、それぞれ許容できるスループットもレイテンシも異なるので、 それらの特性に合わせてステップごとに処理を分割し、キューでつなぐことで、ステップごとに並列度を調整できるようにしています。

以下の例のようにapplication.ymlでキューごとにリスナーの並列度を設定しています。

aws:
  sqs:
    queues:
      daily-process-start: # 日次開始キュー(日次1回なので1並列のみ)
        max-messages-per-poll: 1
        max-concurrent-messages: 1
      list-target-user:    # 処理対象ユーザー抽出キュー(日次1回なので1並列のみ)
        max-messages-per-poll: 1
        max-concurrent-messages: 1
      user-data-process-1: # ユーザーデータ処理(既存システムAPIは低レイテンシだが高リクエストレートは控える)
        max-messages-per-poll: 10
        max-concurrent-messages: 50
      user-data-process-2: # ユーザーデータ処理(外部サービスAPIは高レイテンシのためリクエストレートをある程度上げる)
        max-messages-per-poll: 10
        max-concurrent-messages: 100
    queue-urls:
      daily-process-start: ${DAILY_PROCESS_START_QUEUE_URL}
      list-target-user: ${LIST_TARGET_USER_QUEUE_URL}
      user-data-process-1: ${USER_DATA_PROCESS_1_QUEUE_URL}
      user-data-process-2: ${USER_DATA_PROCESS_2_QUEUE_URL}

なお、キューを軸とした構成であっても注意が必要な点もあります。

例えば日次処理の実行開始直後は、並列度が100であれば、周辺システムに対して100件の初回APIリクエストがほぼ同時に発生する可能性があります。

その場合、APIのレートリミットとしては問題ないレベルであっても、コネクションの同時接続開始数のような別の制限に引っかかってしまうことがあります。 そのため、処理途中にJitterを入れたり、APIクライアントに流量制御の仕組みを入れたりする等により、接続開始契機を分散する必要がありました。

(2) バッチ処理用のHWリソース追加が不要

スケジューラもアプリケーション内で実装したことで、APIサーバーとバッチ処理を同一プロセスで実行しています。

APIサーバーとバッチ処理のHWリソース共有

コドモンでは現状、アプリケーションの実行基盤としてECSをメインに採用しているため、 バッチ処理を行う場合は、ECSタスクやEventBridgeスケジューラ等の構成要素を追加することが一般的となっています。

それらを構築・運用するための知見や仕組みの整備も進んではいるものの、 小規模な開発・運用体制では、やはりデプロイやモニタリングが必要となる要素は少ない方が望ましいです。

もとより必須であるAPIサーバーのアプリケーションプロセスとバッチ処理が相乗りできれば、運用が必要な追加要素がほぼなくなる点は大きなメリットです。

特にリリース時点でのAPIサーバーでは、本番稼働用ECSサービスの最小構成でもHWリソースに余裕がある想定でしたので、追加要素が不要な点はHWコストの面でもわずかながら有利です。

なお、相乗りするがゆえに、ECSタスクのHWリソース消費に関しても「つつましさ」が必要になってしまっている点は自覚しておくべきですが、 DBリソースに関しては(バッチ専用のレプリケーションでもしない限りは)APIサーバーと共有の想定だったため、いずれにせよ「つつましさ」は必要でした。

(3) 定期バッチ以外からも利用しやすい

最後の点は、副次的なメリットですが、SQSキューを軸にしたことにより、柔軟に処理を起動できるようになりました。

定期バッチ以外からの利用

例えば、以下のような対応が容易にできるようになっています。

  • 通常は日次起動である処理を、SQSキューにメッセージ送信するだけで、任意のタイミングで実行できる
  • 一部ユーザー分のみ処理したい場合や、途中の処理から実行したい場合も、対応するSQSキューにメッセージ送信するだけで容易に実行できる
  • バッチに限らず、APIリクエスト契機やイベント契機でも、同一の処理を呼び出すことができる

これらは、テスト時の手動実行や障害発生後の再実行などの運用を容易にするほか、 例えばAPIとして公開すれば、ユーザーからも任意のタイミングで利用可能にすることもできます。

ソフトウェアアーキテクチャ

アプリケーションのソフトウェアアーキテクチャにおいても、 最小限の構成ではじめつつも柔軟に拡張できるようにするために、重要なポイントがあります。

大事なのはやはりレイヤーの分離です。

処理のentrypointとなるAPIコントローラ、SQSリスナー、スケジューラを実装するレイヤーと、 具体的な処理を実装するレイヤーを、別のモジュールに分けています。

ソフトウェアアーキテクチャとその将来的な分割・変更イメージ

  • entrypoints
    • APIコントローラ
    • SQSリスナー
    • スケジューラ
  • core (usecase / domain / infrastructure)
    • 処理対象ユーザー抽出処理
    • ユーザーデータ処理

これにより、例えば以下のような方式に変更したくなった際でも影響を小さくすることができます。

  • APIリクエストやイベントトリガーで、単一ユーザー単位でも処理できるようにする
  • キューベースではなく、単一バッチプロセスで高スループットで処理する

また、将来的にECSタスクを分割したくなった際には、 スケジューラやSQSリスナーの起動要否を環境変数等で制御できるようにしておけば、同一のビルドイメージでAPIサーバーとバッチ処理を別々のECSサービスとして起動できます。 さらに、entrypointsのモジュールを分割してビルドイメージを分けることも可能です。

SQSリスナーのサンプルコード

参考までに、スケジューラおよびSQSリスナーのサンプルコードも載せておきます。

いずれも、定期起動およびSQS連携の仕組みと各処理のつなぎこみだけを行っているので、シンプルです。 具体的な処理の実装は、個別のServiceクラスやUseCaseクラスに移譲しています。

スケジューラ

@Component
class BatchScheduler(
    private val messageQueue: MessageQueue,
) {
    @Scheduled(cron = "0 0 1 * * *", zone = "JST")
    suspend fun dailyProcess() {
        messageQueue.sendMessage(
            DailyProcessStartMessage(
                targetDate = LocalDate.now().toString(),
            )
        )
    }
}

SQSリスナーメソッド定義

@Component
class DailyProcessStartListener(
    private val listTargetUserService: ListTargetUserService,
) {
    @SqsListener(
        value = ["\${aws.sqs.queue-urls.daily-process-start}"],
        factory = "dailyProcessStartListenerContainerFactory"
    )
    fun handle(message: DailyProcessStartMessage) {
        runBlocking {
            listTargetUserService.execute(
                ListTargetUserService.Input.from(
                    targetDateStr = message.targetDate,
                )
            )
        }
    }
}

SQSリスナー設定

@Configuration
@ConfigurationProperties(prefix = "aws.sqs")
class AwsSqsConfig(
    val queues: MutableMap<String, QueueConfig> = mutableMapOf()
) {
    @Bean
    fun dailyProcessStartListenerContainerFactory(sqsAsyncClient: SqsAsyncClient): SqsMessageListenerContainerFactory<Any> {
        return createFactory(sqsAsyncClient, "daily-process-start")
    }

    private fun createFactory(sqsAsyncClient: SqsAsyncClient, queueKey: String): SqsMessageListenerContainerFactory<Any> {
        val queueConfig = queues[queueKey] ?: QueueConfig()
        return SqsMessageListenerContainerFactory.builder<Any>()
            .sqsAsyncClient(sqsAsyncClient)
            .configure {
                it.maxConcurrentMessages(queueConfig.maxConcurrentMessages)
                it.maxMessagesPerPoll(queueConfig.maxMessagesPerPoll)
            }
            .build()
    }
}

スケール時に想定される課題

今回、新規プロダクトの立ち上げ時の開発においては、以下のような「つつましさ」を重視した構成としました。

  • 最小限の構成要素とし、構築・運用コストを抑える
  • 周辺システムへの負荷や影響を最小限にする

しかし、これらは事業とシステムが将来スケールしていく中では、課題になっていく可能性が高いことも忘れてはなりません。

最小限の構成ゆえのリスク

単一のECSサービス・アプリケーションプロセスにAPIサーバーと複数のバッチ処理を相乗りさせる構成は、すなわち、それがSPOF(単一障害点)になるということです。

また、問題発生時には、相乗りしている機能間で互いに影響が発生したり、HWリソースが共有ゆえに原因の切り分けが難しくなったりする可能性もあります。

これらは前述のメリットに対するトレードオフであることを理解し、状況の変化に応じて適切にアーキテクチャを見直していくことが重要です。

そして、ECSサービス・アプリケーションの分割や、バッチ処理の方式変更などが必要になった際に、 初期のアーキテクチャが足枷とならないよう気をつける必要があります。

今回のアーキテクチャの採用理由には、この観点も含まれています。

  • (1) スループットの調整がしやすい

    • → スケールが必要なときには、スループットを上げることもできるようにしておく
  • (2) バッチ処理用のHWリソース追加が不要

    • → バッチ処理用のHWリソースを追加する際に、切り出しやすいモジュール構成にしておく
  • (3) 定期バッチ以外からも利用しやすい

    • → バッチ処理の方式変更の際にも、既存の実装を活かせる選択肢を用意しておく

周辺システム起因のボトルネック

周辺システムへの負荷や影響を抑えて対応できているうちは問題ありません。 しかし、抑える必要があるということはつまり、スケールに伴いその部分がボトルネックになる可能性が高いということです。

事業が成功に近づくほど、それらのボトルネックにどう対処するかも、検討が必要になっていくでしょう。

例えば、以下のような対応が必要になるかもしれません。

  • 連携先システムとの連携を、新規プロダクト側からのポーリングではなく、連携先システムからのイベント連携にすることで、不要な通信を減らす
  • API連携ではなく、バッチベースのファイル連携にすることで、APIターンアラウンドのオーバーヘッドを減らす
  • 単一ユーザー分のAPIリクエストの同期呼び出しではなく、Batch APIを利用した複数ユーザー分の一括非同期呼び出しにすることで、レイテンシやリクエスト数の増加に対処する

おわりに

新規のプロダクトや機能の初期開発において、スケーラビリティよりもシンプルさを優先して構築することは多いと思います。 ただし、その中でも、いざスケールが必要になった場合にどのように改善していくかを想定しておくことは大事です。

アジャイルに開発していく中では、ベイビーステップでまずやってみることや、早くリリースしてフィードバックを得ることは、非常に重要です。

一方で、ことアーキテクチャに関しては、本番運用中に後から変更する場合の影響が大きくなりがちです。

また、そもそもアーキテクチャの良し悪しは、想定して備えていた変化が実際にどれほど発生したかや、想定外の変化にも柔軟に対応できたか、などによって後から評価されるものです。そのため、フィードバックが得られるまでに時間がかかることが多いです。

ドメイン駆動設計は、ドメインの理解を深めることにより、変化に対する想定の精度を高めるためのアプローチであるとも言えるでしょう。

生成AIの発展により、バイブコーディングで簡単に開発ができるようになってきました。プロトタイピングであればそれも十分に有効でしょう。

ですが、もし本番運用と事業化を見据えてシステム構築するのであれば、安易な設計でリリースしてしまう前に、思考実験を繰り返す方がコスパがよいかもしれません。

AIコーディングエージェントには、コードを書かせる前に壁打ち相手になってもらい、まずは設計について徹底的に議論してみるのはいかがでしょうか。