コドモン Product Team Blog

株式会社コドモンの開発チームで運営しているブログです。エンジニアやPdMメンバーが、プロダクトや技術やチームについて発信します!

S3のIntelligent-TieringでArchiveされたファイルをScala CLIでRestoreする

こんにちは!プロダクト開発部のjunです。好きなアプリはFlightradar24です。

コドモンではアクセスログやアプリケーションログなど、様々なログファイルを出力し、Amazon S3で保管し利用しています。
ここで、一部のログファイルはコスト効率を上げるためにファイルの保存時のストレージクラスとしてIntelligent-Tiering ストレージクラスを採用しています。Intelligent-Tieringストレージクラスは、ユーザが保存したファイルのうち、AWSでアクセスが少なかったり古いと判定されたものについて、自動的にArchive階層へとファイルを移動しコストを圧縮してくれるものです。 そして、Intelligent-Tieringで保存されたファイルのうち、Archive階層にあるファイルはrestore-object操作を行ってからでなければ参照することができなくなります。

1年ほど前の話になるのですが、Archive階層にあるファイルを対象としたログ調査を行いたい業務があり、スクリプトを組んでArchiveされたファイルの一覧化→restore-objectを行ったため、その内容を紹介いたします。また、スクリプトはコドモンの標準技術スタックではないのですがScala言語およびScala CLIで書いており、とても便利だったのでその点についても触れたいと思います。

なぜスクリプトを書く必要があったのか

ログフォルダ構成

以下のような構成で日付ごとにディレクトリが作成されており、ログファイルがディレクトリ内に複数配置されているものとします。

awesome-bucket-dev
└ dt=YYYY-MM-DD-HH
  ├ awesome_1.log.gz
  ├ awesome_2.log.gz
  ├ ...
  └ awesome_999.log.gz

Intelligent-Tieringストレージクラスにおいて、任意の日数が経過していたら階層が変更されるわけではありません。ゆえに、同じ日付のディレクトリ内であっても異なる階層にファイルが存在することが多々あります。そのためRestoreは日付配下のファイルすべてではなくArchiveされたファイルを特定して実行しなければならないのです。

結果、対象となるログファイル百万件程度に対して階層を特定する必要があることが発覚しました。

試したこと

(※ 実装した2024年夏頃の情報です. 最新のaws sdkを利用すればlist-objectコマンドでストレージクラスを取れるかもしれないので要調査でです)

ArchiveされたファイルをRestoreするために一覧を作成することを考えるとき、パッと思いつく手段としてはlist-objcetコマンドでしょう。

最初はこのようにs3apiコマンドでファイルの一覧をとり、jqコマンドなどで対象ファイルを流せばTierを特定できる想定でした。

# ダウンロード
aws s3api list-objects-v2 --bucket awesome-bucket-dev --prefix dt=2023-12-31-00/ > ./file_lists/2023-12-31-00.json

# 解析
cat ./file_lists/2023-12-31-00.json | jq -r '.Contents[] | hogehoge'

しかしながら、list-objectコマンドだけではストレージクラスがIntelligent-Tieringであると特定することはできますが、そのうえでどの階層にファイルがあるのかは分かりませんでした。
したがってlist-objectして得られたファイルの一覧に対してhead-objectコマンドを実行することになるのですが、1回のhead-objectコマンドに0.2秒ほどかかったので、すべてを直列にやろうとすると2日近くかかることになります。

これは現実的ではないということで、並列に実行することで時間を短縮する方法を模索することになりました。
そうなると、非同期処理をサクッと書きたいという要件が出てくると、KotlinやScalaで書くモチベーションが生まれてきますね♪

Archiveファイル一覧化スクリプト

まずArchive階層にいるファイルの一覧を出力した際のスクリプトについて説明していきます。

処理を図解するとこのようになっており、list-objectを行ってから複数のhead-objectを非同期で実行しています。
実際には、AsyncS3Clientの内部でNettyにより並列にAPIコールが行われます。そして、一度に実行する数を上げすぎるとAPIコールが大量に発生してしまいrate-limitに引っかかってしまうので、適切に制御する必要がありました。

Archiveファイル一覧化スクリプトの処理

先述の通り今回はScalaとScala CLIを利用しました。作ったコードはgistに載せてあります。

https://gist.github.com/jsoizo/4f2d576447c5baa46e8b013ea249dd3c#file-list_s3_archived_logs-sc

ここから、いくつか工夫した箇所を説明します。

list-object結果をFuture[List[S3Object]]に変換する

AWS SDK JavaのAsync S3 Clientクラスには listObjectsV2Paginator という、list-objectの結果をReactive StreamのPublisherとして返す関数があります。
通常の listObjects は Javaの CompletableFuture[ListObjectsV2Response] を返すのですが、list-objectしたい対象が1,000件以上の場合はレスポンスに含まれる次の1,000件を取得するためのトークンを取得してサイドリクエストする必要があるなど処理が煩雑になります。その点、 listObjectsV2Paginator はSubscribeする処理さえ書いてしまえばいいので実装が簡単です。

今回はSubscribeした結果をListBufferに詰め込み、Publisherがすべてを流しきったらpromiseがcompleteし、Subscribe中に1回でもエラーが発生したらpromiseをfailireさせます。つまり、この関数内で一度ストリームを流しきってしまいList[S3Object]]に詰め込んでいるということです。

なお、もう少し雑に書くならばsubscriptionを定数Promise[Subscrioption]ではなく変数Subscriptionで宣言して、onSubscribe時に代入するということもできたかもしれないですが、好みの問題で厳格に定数で書きました。とはいえミュータブルなListBufferを利用しているくらいなのでそのくらいは妥協したほうがコードの可読性が高い可能性はあります。

def listObjects(bucket: String, prefix: String): Future[List[S3Object]] = {
  val request = ListObjectsV2Request.builder().bucket(bucket).prefix(prefix).build()
  val promise = Promise[List[S3Object]]()
  val subscriber = new Subscriber[ListObjectsV2Response] {
    val subscriptionPromise = Promise[Subscription]()
    val buffer = ListBuffer[S3Object]()
    override def onSubscribe(s: Subscription): Unit = {
      subscriptionPromise.success(s)
      s.request(1)
    }
    override def onNext(t: ListObjectsV2Response): Unit = {
      buffer ++= t.contents().asScala
      subscriptionPromise.future.value match {
        case Some(Success(s)) => s.request(1)
        case _                => promise.failure(new IllegalStateException("Subscription is not ready"))
      }
    }
    override def onError(t: Throwable): Unit = promise.failure(t)
    override def onComplete(): Unit = promise.success(buffer.toList)
  }
  val publisher = s3.listObjectsV2Paginator(request)
  publisher.subscribe(subscriber)
  promise.future
}

head-objectを非同期に実行する

list-objectの結果を引数で受け取り、s3上のファイルパスのListのすべ全てに対してhead-objectを行います。

ここで、AsyncS3Clientを利用して大量のhead-objectを行うことになりますが、head-objectのリクエストにrate-limitもあるため、セマフォを利用して同時実行数の制限をかけることにしました。
具体的にはhead-objectする関数の先頭でsemaphore.acquire()で処理をブロックし、完了したらreleaseしています。

これで対象のファイルすべての情報が取得できるので、あとはストレージの階層をチェックしてファイルに書き出すだけです。

// ファイルのメタデータを取得
def headObject(
    bucket: String,
    key: String,
    semaphore: Semaphore
): Future[KeyAnd[HeadObjectResponse]] = {
  semaphore.acquire()
  s3.headObject(_.bucket(bucket).key(key))
    .asScala
    .map(response => KeyAnd[HeadObjectResponse](key, response))
    .andThen(_ => semaphore.release())
}

// headObjectを複数のファイルに対して非同期に実行
// Nettyにより並列にAPIコールされる
def headObjects(
    bucket: String,
    keys: List[String]
): Future[List[KeyAnd[HeadObjectResponse]]] = {
  val semaphore = new Semaphore(maxConcurrency)
  val count = AtomicInteger(0)
  val futures = keys.map(key =>
    headObject(bucket, key, semaphore).andThen(_ => {
      val newCount = count.incrementAndGet()
      if (newCount % 1000 == 0) println("HeadObject count: " + count)
    })
  )
  Future.sequence(futures)
}

Restoreスクリプト

前述の一覧化のスクリプトでRestore対象がファイルとして生成できているので、Restoreするスクリプトではこれを読み込んでrestore-objectコマンドを実行していけばいいだけです。
しかしながらrestore-objectも同時実行数を上げすぎると同じようにrate-limitに引っかかるので同じように制御をかけます。

こちらは特筆すべきことはなく、ファイルに書かれた内容をrate limitに気をつけながら淡々とrestoreしていくだけです。
よく見ると実装が微妙だなと感じるところはありますが書き捨てスクリプトだったので容赦ください。

#!/usr/bin/env -S scala-cli shebang -S 3 --quiet

//> using option -Wunused:all
//> using dep "org.apache.logging.log4j:log4j-slf4j2-impl:2.23.1"
//> using dep "software.amazon.awssdk:s3:2.26.29"
//> using dep "software.amazon.awssdk:sso:2.26.29"
//> using dep "software.amazon.awssdk:ssooidc:2.26.29"
//> using dep "software.amazon.awssdk:netty-nio-client:2.26.29"
//> using dep "org.scala-lang.modules::scala-java8-compat:1.0.2"

import java.util.concurrent.Semaphore
import scala.util.Failure
import scala.util.Success
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
import software.amazon.awssdk.services.s3.S3AsyncClient
import scala.util.Try
import scala.concurrent.Await
import scala.concurrent.Future
import software.amazon.awssdk.services.s3.model.RestoreObjectResponse
import scala.jdk.FutureConverters._
import scala.concurrent.ExecutionContext.Implicits.global
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider
import software.amazon.awssdk.regions.Region

// コマンドラインオプションのパース
// --max-concurrencyオプションを追加
val maxConcurrencyIdx = args.indexOf("--max-concurrency")
val maxConcurrencyOpt =
  Option.when(maxConcurrencyIdx >= 0)(args(maxConcurrencyIdx + 1).toInt)
// headObject大量に叩きすぎるとrate limitに引っかかるので、maxConcurrencyを調整
val maxConcurrency = maxConcurrencyOpt.getOrElse(30)
println("Max concurrency: " + maxConcurrencyOpt)

// 引数のチェック
val requiredArgs = if (maxConcurrencyOpt.isDefined) 4 else 2
if (args.length < requiredArgs) {
  println(
    "Usage: restore_s3_archived_logs.sc [--max-concurrency <max-concurrency>] <env> <list_file_path>"
  )
  sys.exit(1)
}
val env = Try(args(args.length - 2)).getOrElse("dev")
val targeFilePath = args(args.length - 1)

// バケットを指定
val bucket = s"awesome-bucket-${env}"
println("Bucket: " + bucket)

// S3 clientを作成
val profile = "awesome-profile"
println("Profile: " + profile)
val credentialsProvider = ProfileCredentialsProvider.create(profile)
val nettyHttpClient =
  NettyNioAsyncHttpClient.builder().maxConcurrency(maxConcurrency).build()
val s3 = S3AsyncClient
  .builder()
  .httpClient(nettyHttpClient)
  .region(Region.AP_NORTHEAST_1)
  .credentialsProvider(credentialsProvider)
  .build()

case class KeyAnd[+A](key: String, value: A) {
  def to[B](value: B): KeyAnd[B] = KeyAnd(key, value)
}

// 与えたkeyのオブジェクトをrestoreする
// 大量に実行されるとrate-limitに引っかかるのでセマフォで同時に実行できる件数を制御する
def restoreObject(
    bucket: String,
    key: String,
    semaphore: Semaphore
): Future[KeyAnd[Try[RestoreObjectResponse]]] = {
  val R = KeyAnd[Try[RestoreObjectResponse]]
  semaphore.acquire()
  s3
    .restoreObject(_.bucket(bucket).key(key).restoreRequest(identity(_)))
    .asScala
    .map(response => R(key, Success(response)))
    .recover { case e: Throwable =>
      println(s"Failed to restore: ${key}, ${e.getMessage}")
      R(key, Failure(e))
    }
    .andThen(_ => semaphore.release())
}

// restoreObjectを複数実行する
def restoreObjects(
    bucket: String,
    keys: List[String]
): Future[List[KeyAnd[Try[RestoreObjectResponse]]]] = {
  val semaphore = new java.util.concurrent.Semaphore(maxConcurrency)
  val futures = keys.map(key => restoreObject(bucket, key, semaphore))
  Future.sequence(futures)
}

// ファイルを読み込む
println("Reading file: " + targeFilePath)
val list = scala.io.Source.fromFile(targeFilePath).getLines.toList
val keys = list.map(_.split('\t').head)
println("Keys to restore: " + keys.length)

// restoreを実行
println("Start restoring...")
val restoredF = restoreObjects(bucket, keys)
val restored = Await.result(restoredF, scala.concurrent.duration.Duration.Inf)
val (failures, successes) = restored.partitionMap(ka =>
  ka.value match
    case Success(r) => Right(ka.to(r))
    case Failure(e) => Left(ka.to(e))
)
println("Restored: " + successes.length)
println("Failed: " + failures.length)

// restoreの結果をtsvでファイルに出力
println("[Successes] Start to write file")
val dirName = "restored"
val dir = new java.io.File(dirName)
if (!dir.exists()) dir.mkdirs()
val fileName = "restored_" + targeFilePath.split('/').last
val writer = new java.io.PrintWriter(new java.io.File(s"${dirName}/${fileName}"))
successes.foreach(r => writer.println(r.key))
writer.close()
println("[Successes] End to write file")

// restoreに失敗したものを出力
println("[Failures] Start to write file")
val failureDirName = "restore_failures"
val failureDir = new java.io.File(failureDirName)
if (!failureDir.exists()) failureDir.mkdirs()
val failureFileName = "restore_failures_" + targeFilePath.split('/').last
val failureWriter = new java.io.PrintWriter(new java.io.File(s"${failureDirName}/${failureFileName}"))
failures.foreach(f => failureWriter.println(f.key + "\t" + f.value.getMessage.split("\n").head))
failureWriter.close()
println("[Failures] End to write file")

// 終了処理
s3.close(); nettyHttpClient.close()

println("Complete")

Scala CLI

今回のスクリプトの開発においては、Scala CLIを初めて利用しました。

Scala CLI自体の説明は以下のブログが非常にわかりやすいのでご参照ください。
blog.3qe.us

ScalaはJava資産を利用しながらも関数型プログラミングのパラダイムを活用しながら保守性の高いプログラムを簡潔に書くことができることが魅力のプログラム言語です。コドモンではサーバサイドの言語としてkotlinを採用していますが、たまに「Scalaでいうアレがほしいな」と思うことがあります。Arrow.ktのEither型を利用しているのもそういった背景があります。

Kotlin Scriptという手もありかなと考えたのですが、どうしてもIntelliJを使って開発することを半ば強制されるので*1、シュッとスクリプトを書く程度で使うにはやや腰が重いです。その点Scala CLIはMetals + VSCodeでの開発体験が優れているという噂を聞き試してみたところなかなかいい感じでした。

また、Kotlin Scriptはパッケージして配布することができないのですが、Scala CLIは scala-cli package コマンドでコマンドを実行可能ファイルとしてパッケージすることができます。今回のケースでは、私が実装しつつも会議などで不在の時に他のメンバに渡して実行していただくことがあったのですが、他のメンバのPCにはJDKだけインストールされていればよくScala CLIのインストールは不要だったので、素晴らしい体験でした。Graal VMのNative Imageを利用すればJDKのインストールすら不要です。

2025年現在において、簡単なスクリプトを書くという要件に加えて非同期処理を行いたいという場合、おそらく一般的にはGo言語などを利用するケースが多いと思われます。しかしながらScalaおよびScala CLIもJVM言語の利用者にとってはよい選択肢だと感じました。

余談ですが、先述の通り

AWS SDK JavaのAsync S3 Clientには listObjectsV2Paginator という、list-objectの結果をReactive StreamのPublisherとして返すものがあります。

という事情からAkka(Pekko) Streamを使う選択肢も考えました。技術的にはよい選択肢だと思いましたが、ストリーム処理やアクターシステムを知っていないとコードレビューが難しくなるという観点から見送っています。
なにかストリーム処理をしたいときには利用することを検討してみます。

さいごに

今回紹介したように、Archive階層にあるファイルの復元は少々手間がかかりますが、スクリプトを作成することで効率的に対応できました。
やや自分の好みで突っ走った部分もあったため、慣れない言語のレビューをしてくれた当時のチームメンバには感謝しています。

また、Scala CLIを使ったスクリプトの実装は、大量の非同期処理や既存のJava資産を活用する場面で非常に有用であることを実感しました。ご興味あれば試してみてください。

*1:公式やコミュニティベースのLSPもあるので無理ではないです