このエントリは、はてなエンジニアAdvent Calendarの9日目の記事としてかかれました。
AWS S3にはバッチオペレーションというマネージドサービスがあって、これは指定したバケット/オブジェクトに対して一括で何かしらの操作ができる。例えば「バケット内のすべてのオブジェクトを別バケットにコピーしたい」とかそういう時に使うと便利。
その一括操作ではLambdaを利用することもできる。Lambdaを使うとかなり柔軟な操作ができるようになるが、ドキュメントを見ただけでは最初どうしたらいいかわからなかった上に、利用する機会もそんなに無いので覚えられない。その他にも最初に知ってたらよかったみたいなのが細々とあるので、そういうのを少しまとめておく。
なお、このエントリではS3 バッチオペレーション自体のジョブの登録のやり方自体は割愛する。まずS3バッチオペレーションがどういうものか知りたいという人には、以下のエントリが雰囲気を掴むのに良いと思う。
マニフェストファイル
バッチオペレーションのジョブを登録するにはどのオブジェクトを操作するのかを指定するためのマニフェストファイルが必要になる。マニフェストファイルにはS3インベントリレポートというのを使うと良いというのをよく見るけど、実は自分で作ったCSVを使うこともできる。フォーマットとしては以下のような感じ。
{bucket},{object_key}
つまりバケットと操作したいオブジェクトのキーの一覧だけでよい。最初から操作したいオブジェクトがわかっている場合はインベントリレポートを出力せずとも上のフォーマットのCSVを作ればよいし、インベントリレポートを全部チェックした上で、一部の操作対象を出力する、とかでもよい。初回のインベントリレポートの出力には少し時間がかかる(24h程度?)ので、操作対象が完全にわかっている場合は自分でCSVファイルを作ったほうが実は手早いなどはありそう。
S3 バッチオペレーションからLambdaへのリクエスト
Lambdaへは以下のようなJSONがリクエストされる(ドキュメント から引用)
{ "invocationSchemaVersion": "1.0", "invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "job": { "id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce" }, "tasks": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "s3Key": "customerImage1.jpg", "s3VersionId": "1", "s3BucketArn": "arn:aws:s3:us-east-1:0123456788:awsexamplebucket1" } ] }
tasksは配列になっているが、実際には要素は一つしかない。Lambdaでどういう操作をするかにもよるが、操作するにあたって重要なのはs3Key
となる。これを利用して、例えばObjectをGetしてきて加工して別のバケットにPUTしたりする。Lambdaからのレスポンスは以下のようなものを返す(こちらもドキュメントから引用)
{ "invocationSchemaVersion": "1.0", "treatMissingKeysAs" : "PermanentFailure", "invocationId" : "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "results": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "resultCode": "Succeeded", "resultString": "[\"Mary Major", \"John Stiles\"]" } ] }
こちらもresults
は配列となっているが、実際には要素を一つだけ返せばよい。results内のresultCode
にはSucceeded
/TemporaryFailure
/PermanentFailure
のどれかを指定する。ジョブのレポートを出力して、オブジェクトごとに処理の成否を確認したいような場合は成否に合わせてSucceeded
/PermanentFailure
を指定し、resultString
に成功した場合は成功メッセージを、失敗した場合は失敗理由がわかるようなメッセージを埋めておき、レポートを精査するのが良い。
個人的にはresultString
にはJSON文字列を埋めておくのがよいと考えている。大量のオブジェクトを操作した結果について精査する場合、レポートのCSVを何かしらのスクリプトで処理した上でresultString
を確認することになるはずで、その時にJSONであれば扱いやすいと思う。
また、レスポンスの以下のフィールドにはリクエストのJSONに含まれる値をそのまま利用するとよい。あと特に詳細には触れなかったけど、バッチオペレーションからLambdaを実行するとか、LambdaからS3にアクセスするための権限設定はそれぞれ必要になる。
- invocationSchemaVersion
- invocationId
- results.taskId
特にresults.taskId
はバッチオペレーションのジョブで一意になっていないと失敗してしまうので、リクエストされたJSONに含まれている値をそのまま使うのが一番安全。
これ以外にtreatMissingKeysAs
フィールドがあり、これは特にドキュメントに記載されていないが、例えば先程のresults.taskId
がジョブで一意になっておらず失敗した場合、ここで指定されたレスポンスコードが利用される。下のメッセージはresults.taskId
が一意ではなかった場合にレポートに出力されていたもの。
"Lambda function didn't return the tasks/keys in the function response, using ""treatMissingKeysAs"" (default to TemporaryFailure) as error code: PERMANENT_FAILURE"
ここにあるように、デフォルトだとTemporaryFailure
が使われるけど、ここにはPermanentFailure
を指定しておけば良いと思う。
任意の値をLambdaに渡したい
任意の値をLambdaに渡す方法も用意されている。バッチオペレーションのジョブを登録する時に操作するオブジェクトのバケットとキーを指定するためのCSVファイルを渡すことになるのだけど、そのCSVのキーの位置にURLエンコードしたJSON文字列を入れるとよい。以下のドキュメントに記載されている。
例えば次のようなJSONをLambdaに渡したいとする。
{ "s3Key": "1aaaa.txt", "newKey": "new-1aaaa.txt" }
この場合、バッチオペレーションのジョブに渡すCSVファイルは以下のような感じになる。
tkzwtks-s3,%7B%22s3Key%22%3A%221aaaa.txt%22%2C%22newKey%22%3A%22new-1aaaa.txt%22%7D tkzwtks-s3,%7B%22s3Key%22%3A%222aaaa.txt%22%2C%22newKey%22%3A%22new-2aaaa.txt%22%7D tkzwtks-s3,%7B%22s3Key%22%3A%223aaaa.txt%22%2C%22newKey%22%3A%22new-3aaaa.txt%22%7D tkzwtks-s3,%7B%22s3Key%22%3A%224aaaa.txt%22%2C%22newKey%22%3A%22new-4aaaa.txt%22%7D tkzwtks-s3,%7B%22s3Key%22%3A%229aaaa.txt%22%2C%22newKey%22%3A%22new-9aaaa.txt%22%7D
このCSVは1つ目のフィールドにバケット名、二つ目のフィールドにURLエンコードしたJSON文字列が入っている。これをジョブの登録時に渡すと、Lambdaには以下のようなJSONがリクエストされる。
{ "invocationSchemaVersion": "1.0", "invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "job": { "id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce" }, "tasks": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "s3Key": "%7B%22s3Key%22%3A%221aaaa.txt%22%2C%22newKey%22%3A%22new-1aaaa.txt%22%7D", "s3VersionId": "1", "s3BucketArn": "arn:aws:s3:us-east-1:0123456788:awsexamplebucket1" } ] }
JSON内のs3Key
にはCSVファイルの2つ目のフィールドのURLエンコードされたJSON文字列が入っているため、これをデコードすることで外から任意の値を渡すことができる。実際のコードは以下の通り(このコードではオブジェクトに対して実際に何か操作をしているわけではない点には注意)。
console.log('Loading function'); const _sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); exports.handler = async (event, context) => { var date = new Date(); console.log('invoke: ' + date.toISOString()); const task = event.tasks[0]; const payload = JSON.parse(decodeURIComponent(task.s3Key)); await _sleep(3000); const message = payload.newKey; return { invocationSchemaVersion: event.invocationSchemaVersion, treatMissingKeysAs: 'PermanentFailure', invocationId: event.invocationId, results: [{ taskId: task.taskId, resultCode: 'Succeeded', resultString: message }] }; };
使い所が難しいが、例えばオブジェクトを全然違う名前にリネームしたい場合に、元のkeyと新しいkeyを渡してリネームする、みたいな場合に使うと良いと思う(実際そういう場面で使った)。リネームに近いけど、オブジェクトを整理するために、移動先のディレクトリを指定する、みたいな使い方もできると思う。
余談
ここまで試していて気づいたのだけど、実はS3 バッチオペレーションでLambdaを使うような場合、(このエントリを書いている時点では)実際にオブジェクトが存在しなくてもよく、マニフェストファイルさえ作れたらよい。つまりS3オブジェクトとは全く関係のない、任意のJSONをURLエンコードしてマニフェストファイルを作ってバッチオペレーションのジョブを登録するだけで大量にLambdaを実行させまくることができる。どういう場面で使えるかは全然わからないけど・・・
Lambdaの同時実行数
実際にLambdaを使うようなジョブを実行する場合、Lambdaの同時実行数には注意が必要。Lambdaからレスポンスが返って来たら次のを実行するという感じではなく、レスポンスが返る前に次から次にLambdaを実行していくため、油断していたらLambdaの同時実行数の制限に引っかかってしまう、ということも起こりそう。特にS3 バッチオペレーション以外でもLambdaを普通に使っているようなサービスの場合、通常のサービスに影響しうるので注意したほうが良いと思う。ちなみに上のコードで実際に10000件のオブジェクトに対してバッチオペレーションを実行した場合、LambdaのConcurrent executionsがMAX250くらいまで行った。
バッチオペレーションの使用感
「大量のオブジェクト操作をできるだけ素早く終わらせたい」というのがこれを使う一番のモチベーションだったため、実際に使ったときの処理時間をざっくり書く。
- 27000件のオブジェクトコピーはほぼ一瞬で終わる
- 2000万件のオブジェクトコピーは7時間弱で終わる
- 500万件のLambdaを利用したオブジェクトのリネームは2時間程度で終わる
ということで、何をやるにも結構早く終わらせることができる。大量のオブジェクトを操作する時に自分でバッチ処理を書いたりすると考えることが多い。並列実行の仕組みを用意しない場合、量が多ければ多いほど処理時間は増えていくだし、失敗した場合のリカバー方法も検討する必要がある。S3バッチオペレーションを利用する場合、そういう心配が完全には消えないものの、バッチ処理を書かなくてもよくなるため、だいぶ緩和されると思う。特に処理時間が短くなれば精神的にもだいぶ楽になる。レポートを出力しておけば、後からは失敗したものだけ再実行する、というのも簡単にできる。標準でできる処理は少ないが、Lambdaを使って様々操作できるのはいい。準備は若干面倒な部分もあるし、常用するのには向いていないが、いざというときには検討してみてもいいと思う。
まとめ
ということでS3バッチオペレーションを使う上で最初に知ってたらよかったみたいなポイントをまとめた。S3バッチオペレーション自体は大技って感じで使うタイミングはなかなか無いけど、大量のS3オブジェクトに何かしたい場合にはぜひ検討してみてほしい。
明日のはてなエンジニアアドベントカレンダーの担当はid:stefafafanさんです。よろしくおねがいします!