AmebaブログにおけるDynamoDBからMySQLへのストレージ移行の取り組み
AmebaブログがDynamoDBからMySQLへのデータベース移行を実施し、その過程と技術的課題について解説しています。
キーポイント
AmebaブログがDynamoDBからMySQLへのストレージ移行を実施し、運用負荷の軽減、柔軟なクエリ対応、大幅なコスト削減(月額約285ドル→約2ドル)を実現
移行の第一歩として、ハッシュタグ記事ランキングデータの管理システムを刷新。DynamoDBのデータ構造をRDB用に正規化し、GSIをMySQLのセカンダリインデックスに置き換え
技術的には、BigQueryから1.5GBのデータをGoの並列処理(ChannelとGoroutine)で効率的にMySQLへストリーミングするパイプラインを構築し、メモリ問題を解決
アーキテクチャ変更として、APIをElasticBeanstalkからEC2に移行し、データ書き込みはRDSプライマリ、読み取りはリードレプリカを参照する構成を採用
サービスの成長に伴い、データ参照パターンが多様化したため、NoSQLからRDBへの移行が適切と判断。これは大規模サービスにおけるデータベース選定の実践例として参考になる
影響分析・編集コメントを表示
影響分析
この記事は、大規模Webサービスにおけるデータベース移行の実践的なケーススタディとして価値がある。NoSQLからRDBへの移行判断は、サービスの成熟に伴うデータアクセスパターンの変化に対応する一般的な課題を示しており、特にコスト削減と運用効率化の観点で業界の参考になる。
編集コメント
内定者バイトの実践プロジェクトとして、大規模サービスの核心的なインフラ刷新に携わった貴重な経験談。技術的詳細とビジネス判断のバランスがよく取れた解説。
こんにちは、26 卒内定者でバックエンドエンジニアの相馬大和です!
本記事では、内定者バイト期間中の、Ameba ブログのストレージ移行と、それに伴う周辺システム刷新の取り組みをご紹介します。
Ameba ブログは様々なコンポーネントから構成されており、その多くで DynamoDB を利用しています。
DynamoDB はスケーラビリティに優れた NoSQL データベースであり、かつての Ameba ブログの要件には適していました。しかし、サービスの成長とともにデータの参照パターンが多様化した結果、現在の運用においては RDB の方が適しているという判断に至りました。
その背景として、まず運用負荷の観点があります。DynamoDB のスループット制御は WCU(Write Capacity Unit)によって管理されますが、障害復旧時のリトライ処理において WCU の調整が難しく、運用負荷となっていました。
次に、データの参照パターンの多様化への対応です。ブログのデータは多方面で活用されることが多く、アクセスパターンが増えるたびに GSI の追加を検討する必要がありました。RDB であれば、こうした柔軟なクエリへの対応がより低コストで実現できます。
三つ目はコスト面です。現在のアクセス規模においては、Aurora MySQL の課金モデルの方がより費用対効果が高く、既存の月額約 285 ドルから約 2 ドルへの大幅なコスト削減が見込まれます。
これらを踏まえ、Ameba ブログ全体として DynamoDB から RDB への移行が計画されており、その第一歩目として、ハッシュタグ記事ランキングを管理するストレージを移行しました。
Ameba ブログでは記事ごとにハッシュタグをつけることができ、ハッシュタグのランキングを閲覧することが可能です。

Ameba ブログのハッシュタグ記事ページ
https://blogtag.ameba.jp/
今回の内定者バイトでは、このハッシュタグ記事のランキングデータを管理する DynamoDB を MySQL に置き換えると同時に、DynamoDB に依存するリード系 API や、データを書き込むバッチ処理も再実装しました。
まず既存実装の流れをご紹介します。

DSC (Data Science Center) の方が管理してくださっている、BigQuery の元データを、Ameba ブログ側の EKS の CronJob が stream し、DynamoDB に書き込んでいます。その DynamoDB を参照する、Go で書かれた API が ElasticBeanstalk で動いており、ハッシュタグ記事ランキング関連のデータを返しています。
そして今回の新実装では、以下のような流れになります。

上記の構成のように、ハッシュタグ記事ランキングのデータを管理する DynamoDB を MySQL へ移行するためには、以下の三つのタスクを行いました。
(ElasticBeanstalk から EC2 への移行は後ほど解説します。)
DynamoDB のデータ構造をもとに、RDB 用のテーブルを設計する
ハッシュタグ記事ランキングのデータを BigQuery からストリームし、MySQL に書き込むバッチを再実装して、EKS の Cronjob で実行する
旧 DynamoDB のテーブルを参照していた API を ElasticBeanstalk から EC2 へ移行する
まず最初に行ったのが、DynamoDB のデータ構造をもとに、RDB(リレーショナルデータベース)用のテーブルを設計し、構築することです。
DynamoDB では、キーの検索効率を上げるために複数の属性を連結した複合文字列(例:ameba_entry_id_tag_name)を利用しています。
MySQL 移行にあたって、これらの複合キーを分解し、リレーショナルモデルとして適切に正規化した設計に刷新しました。
次にインデックスの再定義に関してです。GSI(Global Secondary Index)をもとに、RDB のインデックス設計を行いましたが、この対応でコスト面でメリットを享受できました。理由としては、DynamoDB の GSI は、メインテーブルへの書き込みのたびに GSI 分の書き込みも独立して発生するため、インデックス数に比例して WCU(Write Capacity Unit)を余分に消費する一方で、MySQL のセカンダリインデックスは、インデックス数に応じてコストが線形増加するような課金構造ではないためです。
最後に DB、テーブル構築に関しては、今回すでに利用されている RDS のクラスターの中に作成し、Batch での書き込みはプライマリインスタンス、DB から読み込む際は、リードレプリカを参照するようにしました。
ハッシュタグ記事ランキングの元データは BigQuery に蓄積されており、ここから毎日 1.5GBほどのデータを抽出し、MySQL へ反映させるバッチ処理の再実装を行いました。
BQ(BigQuery)からのストリーム処理では、1.5GB のデータを一度にメモリに載せると、アウトオブメモリーでプロセスが強制終了されてしまうため、Go の並列処理(Channel と Goroutine)を利用して、「BigQuery からの読み出し」と「MySQL への書き込み」をオーバーラップさせるパイプラインを構築しました。
func StreamHashtagEntryRankingFromBQ(ctx context.Context, bqClient *bigquery.Client) (<-chan *HashtagEntryRanking, <-chan error, error) {
err := WaitForTableActive(ctx, bqClient, "prj_hashtag_entry_ranking", "hashtag_ranking_similarity_normalized_pv")
if err != nil {
return nil, nil, fmt.Errorf("WaitForTableActive failed: %w", err)
}
q := bqClient.Query(HashtagEntryRankingListQuery)
job, err := q.Run(ctx)
if err != nil {
return nil, nil, fmt.Errorf("job run failed: %w", err)
}
status, err := job.Wait(ctx)
if err != nil {
return nil, nil, fmt.Errorf("job wait failed: %w", err)
}
if status.Err() != nil {
return nil, nil, fmt.Errorf("job execution error: %w", status.Err())
}
it, err := job.Read(ctx)
if err != nil {
return nil, nil, fmt.Errorf("job read failed: %w", err)
}
// 1 ページあたり 500 行ずつ取得する
it.PageInfo().MaxSize = 500
// データを逐次的に返すチャネルを作成
outChan := make(chan *HashtagEntryRanking)
errChan := make(chan error, 1)
go func() {
defer close(outChan)
defer close(errChan)
for {
bqRanking := new(HashtagEntryRanking)
err := it.Next(bqRanking)
if errors.Is(err, iterator.Done) {
logger.Default().Info("Finished reading all BigQuery results")
break
}
if err != nil {
logger.Default().Error("Iterator Next failed", zap.Error(err))
errChan <- fmt.Errorf("it.Next: %w", err)
return
}
outChan <- bqRanking
}
}()
return outChan, errChan, nil
}
また、ハッシュタグ記事ランキングデータは直近5日間のデータのみを保管するという仕様があり、これまでは、DynamoDB の TTL(Time To Live)で自動削除していましたが、今回は、RDB に置き換えたため、バッチ処理のロジック側で対応しました。そして、一度に削除するデータ量が 1.5GB ほどで、一括削除すると CPU 使用率が急上昇してしまうため、メトリクスを確認しながら、バッチサイズを調整し、逐次削除で対応しました。
ハッシュタグ記事ランキング取得 API の移行
CronJob(定期実行ジョブ)のバッチ処理で書き込まれたハッシュタグ記事のランキングデータを、既存実装では Elastic Beanstalk 上の API でクライアントに提供していましたが、Elastic Beanstalk を管理する AWS アカウントは今後廃止する動きがあり、これら API も、Ameba ブログのサブシステムが集まる EC2 側に移行することが決まりました。
既存の DynamoDB に依存するハッシュタグ関連の API は全部で 9 個ほどありましたが、期間の都合上、今回は二つの API を移行しました。
実装に関しては、DynamoDB を利用したインフラ層(Infrastructure Layer)のロジックを理解した上で、Ameba ブログで使われている Go 言語の ORM(Object-Relational Mapping)である bob を利用しましたが、従来のコードと比べると、シンプルかつ少ないコード量になることが実感できました。
旧実装 (DynamoDB から取得)
必ず JSON 形式で返してください。translation フィールドのみ。他のフィールド (technical_terms 等) は一切追加しないこと — 余計なフィールドを書こうとして本文翻訳がトークン上限で打ち切られる事故を防ぐため:
{"translation": "また、ハッシュタグ記事ランキングデータは直近5日間のデータのみを保管するという仕様があり、これまでは、DynamoDB の TTL(Time To Live)で自動削除していましたが、今回は、RDB に置き換えたため、バッチ処理のロジック側で対応しました。そして、一度に削除するデータ量が1.5GBほどで、一括削除するとCPU使用率が急上昇してしまうため、メトリクスを確認しながら、バッチサイズを調整し、逐次削除で対応しました。
ハッシュタグ記事ランキング取得APIの移行
CronJob(定期実行ジョブ)のバッチ処理で書き込まれたハッシュタグ記事のランキングデータを、既存実装ではElasticBeanstalk上のAPIでクライアントに提供していましたが、ElasticBeanstalkを管理するAWSアカウントは今後廃止する動きがあり、これらAPIも、Amebaブログのサブシステムが集まるEC2側に移行することが決まりました。
既存のDynamoDBに依存するハッシュタグ関連のAPIは全部で9個ほどありましたが、期間の都合上、今回は二つのAPIを移行しました。
実装に関しては、DynamoDBを利用したインフラ層(Infrastructure Layer)のロジックを理解した上で、Amebaブログで使われているGo言語のORM(Object-Relational Mapping)であるbobを利用しましたが、従来のコードと比べると、シンプルかつ少ないコード量になることが実感できました。
旧実装(DynamoDBから取得)
必ずJSON形式で返してください。translation フィールドのみ。他のフィールド (technical_terms 等) は一切追加しないこと — 余計なフィールドを書こうとして本文翻訳がトークン上限で打ち切られる事故を防ぐため:
{\"translation\": \"また、ハッシュタグ記事ランキングデータは直近5日間のデータのみを保管するという仕様があり、これまでは、DynamoDB の TTL(Time To Live)で自動削除していましたが、今回は、RDB に置き換えたため、バッチ処理のロジック側で対応しました。そして、一度に削除するデータ量が1.5GBほどで、一括削除するとCPU使用率が急上昇してしまうため、メトリクスを確認しながら、バッチサイズを調整し、逐次削除で対応しました。
ハッシュタグ記事ランキング取得APIの移行
CronJob(定期実行ジョブ)のバッチ処理で書き込まれたハッシュタグ記事のランキングデータを、既存実装ではElasticBeanstalk上のAPIでクライアントに提供していましたが、ElasticBeanstalkを管理するAWSアカウントは今後廃止する動きがあり、これらAPIも、Amebaブログのサブシステムが集まるEC2側に移行することが決まりました。
既存のDynamoDBに依存するハッシュタグ関連のAPIは全部で9個ほどありましたが、期間の都合上、今回は二つのAPIを移行しました。
実装に関しては、DynamoDBを利用したインフラ層(Infrastructure Layer)のロジックを理解した上で、Amebaブログで使われているGo言語のORM(Object-Relational Mapping)であるbobを利用しましたが、従来のコードと比べると、シンプルかつ少ないコード量になることが実感できました。
旧実装(DynamoDBから取得)\n\n必ずJSON形式で返してください。translation フィールドのみ。他のフィールド (technical_terms 等) は一切追加しないこと — 余計なフィールドを書こうとして本文翻訳がトークン上限で打ち切られる事故を防ぐため:\n{\"translation\": \"また、ハッシュタグ記事ランキングデータは直近5日間のデータのみを保管するという仕様があり、これまでは、DynamoDB の TTL(Time To Live)で自動削除していましたが、今回は、RDB に置き換えたため、バッチ処理のロジック側で対応しました。そして、一度に削除するデータ量が1.5GBほどで、一括削除するとCPU使用率が急上昇してしまうため、メトリクスを確認しながら、バッチサイズを調整し、逐次削除で対応しました。\n\nハッシュタグ記事ランキング取得APIの移行\n\nCronJob(定期実行ジョブ)のバッチ処理で書き込まれたハッシュタグ記事のランキングデータを、既存実装ではElasticBeanstalk上のAPIでクライアントに提供していましたが、ElasticBeanstalkを管理するAWSアカウントは今後廃止する動きがあり、これらAPIも、Amebaブログのサブシステムが集まるEC2側に移行することが決まりました。\n\n既存のDynamoDBに依存するハッシュタグ関連のAPIは全部で9個ほどありましたが、期間の都合上、今回は二つのAPIを移行しました。\n\n実装に関しては、DynamoDBを利用したインフラ層(Infrastructure Layer)のロジックを理解した上で、Amebaブログで使われているGo言語のORM(Object-Relational Mapping)であるbobを利用しましたが、従来のコードと比べると、シンプルかつ少ないコード量になることが実感できました。\n\n旧実装(DynamoDBから取得)"}"}
func (r *tagRankingRepositoryImplV2) listTagRankingEntriesByAmebaIDSimple( ctx context.Context, dt string, amebaID model.AmebaID, limit int, offsetRank *int, offsetEntryID *model.EntryID, offsetHashtag *model.HashtagID ) (es []blogtagRankingEntryV2, nextOffsetRank *int, nextOffsetEntryID *model.EntryID, nextOffsetHashtag *model.HashtagID, err error) { key := fmt.Sprintf("%s_%s", amebaID.String(), dt) var exclusiveStartKey map[string]types.AttributeValue if offsetRank != nil && offsetEntryID != nil && offsetHashtag != nil { exclusiveStartKey = map[string]types.AttributeValue{ "rank": &types.AttributeValueMemberN{ Value: fmt.Sprintf("%d", *offsetRank), }, "ameba_id_dt": &types.AttributeValueMemberS{ Value: key, }, "ameba_entry_id_tag_name": &types.AttributeValueMemberS{ Value: fmt.Sprintf("%s_%s_%s", amebaID.String(), offsetEntryID.String(), offsetHashtag.TagName()), }, "dt": &types.AttributeValueMemberS{ Value: dt, }, } } res, err := r.client.Query(ctx, &dynamodb.QueryInput{ TableName: &r.blogtagEntryRankingTable, IndexName: aws.String("ameba_id_dt-rank-index"), KeyConditionExpression: aws.String("ameba_id_dt = :ameba_id_dt"), FilterExpression: aws.String("official_tag_flag = :official_tag_flag"), ExpressionAttributeValues: map[string]types.AttributeValue{ ":ameba_id_dt": &types.AttributeValueMemberS{ Value: key, }, ":official_tag_flag": &types.AttributeValueMemberN{ Value: "1", }, }, Limit: aws.Int32(int32(limit)), ScanIndexForward: aws.Bool(true), ExclusiveStartKey: exclusiveStartKey, }) if err != nil { return nil, nil, nil, nil, fmt.Errorf("r.client.Query: %w", err) } if res.LastEvaluatedKey != nil { lastEvaluatedKey := res.LastEvaluatedKey var l blogtagRankingEntryV2 if err := attributevalue.UnmarshalMap(lastEvaluatedKey, &l); err != nil { return nil, nil, nil, nil, fmt.Errorf("attributevalue.UnmarshalMap: %w", err) } nextOffsetRank = &l.Rank eid := model.EntryID(l.EntryID()) nextOffsetEntryID = &eid hid := model.NewHashtagID(l.TagName()) nextOffsetHashtag = &hid } var blogtagRankingEntries []blogtagRankingEntryV2 if err := attributevalue.UnmarshalListOfMaps(res.Items, &blogtagRankingEntries); err != nil { return nil, nil, nil, nil, fmt.Errorf("attributevalue.UnmarshalListOfMaps: %w", err) } return blogtagRankingEntries, nextOffsetRank, nextOffsetEntryID, nextOffsetHashtag, nil }
func (r *hashtagEntryRankingRepositoryImpl) listTagRankingEntriesByAmebaIDSimple( ctx context.Context, dt civil.Date, amebaID model.AmebaID, limit int, offsetRanking *int, offsetEntryID *model.HashtagEntryID, offsetHashtag *model.HashtagID, ) ([]hashtagdb.BlogHashtagEntryRanking, error) {
qb := mysql.Select( sm.From(hashtagdb.BlogHashtagEntryRankings.Name()), sm.Where(hashtagdb.BlogHashtagEntryRankings.Columns.AmebaID.EQ(mysql.Arg(amebaID.String()))), sm.Where(hashtagdb.BlogHashtagEntryRankings.Columns.DT.EQ(mysql.Arg(dt.String()))), sm.Where(hashtagdb.BlogHashtagEntryRankings.Columns.OfficialTagFlag.EQ(mysql.Arg(1))), )
if offsetRanking != nil && offsetEntryID != nil && offsetHashtag != nil {
qb.Apply(sm.Where(mysql.Raw(
"(rank, entry_id, tag_name) > (?, ?, ?)",
mysql.Arg(*offsetRanking),
mysql.Arg(offsetEntryID.String()),
mysql.Arg(offsetHashtag.TagName()),
)))
}
qb.Apply( sm.OrderBy(clause.OrderDef{ Expression: hashtagdb.BlogHashtagEntryRankings.Columns.Rank, Direction: " ASC", }), sm.OrderBy(clause.OrderDef{ Expression: hashtagdb.BlogHashtagEntryRankings.Columns.EntryID, Direction: " ASC", }), sm.OrderBy(clause.OrderDef{ Expression: hashtagdb.BlogHashtagEntryRankings.Columns.TagName, Direction: " ASC", }), sm.Limit(int64(limit)), )
rows, err := bob.All(ctx, r.replicaDBs.Next(), qb, scan.StructMapper[hashtagdb.BlogHashtagEntryRanking]())
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return []hashtagdb.BlogHashtagEntryRanking{}, nil
}
return nil, fmt.Errorf("bob.All: %w", err)
}
return rows, nil
}
上記コードを比較すると、SQL での実装が以下二つの観点で改善されている事がわかります。
SQL での実装の方では、クエリの意図がコードから直接読み取れる一方で、DynamoDB はやや難解です。ExclusiveStartKey
ameba_entry_id_tag_name
amebaID_entryID_tagName
ScanIndexForward: true
DynamoDB 版の実装を追うと、コスト効率の観点で問題があった事がわかります。FilterExpression
official_tag_flag = 1
MySQL に置き換えたことにより、WHERE 句とインデックスによって該当レコードのみを正確に読み取れるようになり、不要なレコードの読み取りが排除されました。これにより Read Capacity の無駄な消費がなくなり、コスト効率が改善されました。
この記事では、Ameba ブログのハッシュタグ記事ランキングのストレージ移行とシステム刷新について紹介しました。今回の刷新により、インフラコストの削減と運用効率化が期待されます。
原文を表示
こんにちは、26卒内定者でバックエンドエンジニアの相馬大和です!
本記事では、内定者バイト期間中の、Amebaブログのストレージ移行と、それに伴う周辺システム刷新の取り組みをご紹介します。
Amebaブログは様々なコンポーネントから構成されており、その多くでDynamoDBを利用しています。
DynamoDBはスケーラビリティに優れたNoSQLデータベースであり、かつてのAmebaブログの要件には適していました。しかし、サービスの成長とともにデータの参照パターンが多様化した結果、現在の運用においてはRDBの方が適しているという判断に至りました。
その背景として、まず運用負荷の観点があります。DynamoDBのスループット制御はWCU(Write Capacity Unit)によって管理されますが、障害復旧時のリトライ処理においてWCUの調整が難しく、運用負荷となっていました。
次に、データの参照パターンの多様化への対応です。ブログのデータは多方面で活用されることが多く、アクセスパターンが増えるたびにGSIの追加を検討する必要がありました。RDBであれば、こうした柔軟なクエリへの対応がより低コストで実現できます。
三つ目はコスト面です。現在のアクセス規模においては、Aurora MySQLの課金モデルの方がより費用対効果が高く、既存の月額約285ドルから約2ドルへの大幅なコスト削減が見込まれます。
これらを踏まえ、Amebaブログ全体としてDynamoDBからRDBへの移行が計画されており、その第一歩目として、ハッシュタグ記事ランキングを管理するストレージを移行しました。
Amebaブログでは記事ごとにハッシュタグをつけることができ、ハッシュタグのランキングを閲覧することが可能です。

Amebaブログのハッシュタグ記事ページ
https://blogtag.ameba.jp/
今回の内定者バイトでは、このハッシュタグ記事のランキングデータを管理するDynamoDBをMySQLに置き換えると同時に、DynamoDBに依存するリード系APIや、データを書き込むバッチ処理も再実装しました。
まず既存実装の流れをご紹介します。

DSC (Data Science Center)の方が管理してくださっている、BigQueryの元データを、Amebaブログ側のEKSのCronJobがstreamし、DynamoDBに書き込んでいます。そのDynamoDBを参照する、Goで書かれたAPIがElasticBeanstalkで動いており、ハッシュタグ記事ランキング関連のデータを返しています。
そして今回の新実装では、以下のような流れになります。

上記構成のように、ハッシュタグ記事ランキングのデータを管理するDynamoDBをMySQLに移行するためには、以下三つのタスクを行いました。
(ElasticBeanstalk → EC2の移行は後ほど解説します。)
DynamoDBのデータ構造をもとに、RDB用のテーブルを設計する
ハッシュタグ記事ランキングのデータをBigQueryからstreamし、MySQLに書き込むバッチを再実装し、EKSのCronjobで動かす
旧DynamoDBのテーブルを参照していたAPIをElasticBeanstalkからEC2に移行する
まず最初に行ったのが、DynamoDBのデータ構造をもとに、RDB用のテーブルを設計し、構築することです。
DynamoDBでは、キーの検索効率を上げるために複数の属性を連結した複合文字列(例:ameba_entry_id_tag_name
MySQL移行にあたって、これらの複合キーを分解し、リレーショナルモデルとして適切に正規化した設計に刷新しました。
次にインデックスの再定義に関してです。GSIをもとに、RDBのインデックス設計を行いましたが、この対応でコスト面でメリットを享受できました。理由としては、DynamoDBのGSIは、メインテーブルへの書き込みのたびにGSI分の書き込みも独立して発生するため、インデックス数に比例してWCUを余分に消費する一方で、MySQLのセカンダリインデックスは、インデックス数に応じてコストが線形増加するような課金構造ではないためです。
最後にDB、テーブル構築に関しては、今回すでに利用されているRDSのクラスターの中に作成し、Batchでの書き込みはプライマリインスタンス、DBから読み込む際は、リードレプリカを参照するようにしました。
ハッシュタグ記事ランキングの元データはBigQueryに蓄積されており、ここから毎日1.5GBほどのデータを抽出し、MySQLへ反映させるバッチ処理の再実装を行いました。
BQからのstream処理では、1.5GBのデータを一度にメモリに載せると、アウトオブメモリでkillされてしまうため、Goの並列処理(ChannelとGoroutine)を利用して、「BigQueryからの読み出し」と「MySQLへの書き込み」をオーバーラップさせるパイプラインを構築しました。
func StreamHashtagEntryRankingFromBQ(ctx context.Context, bqClient *bigquery.Client) (<-chan *HashtagEntryRanking, <-chan error, error) { err := WaitForTableActive(ctx, bqClient, "prj_hashtag_entry_ranking", "hashtag_ranking_similarity_normalized_pv") if err != nil { return nil, nil, fmt.Errorf("WaitForTableActive failed: %w", err) } q := bqClient.Query(HashtagEntryRankingListQuery) job, err := q.Run(ctx) if err != nil { return nil, nil, fmt.Errorf("job run failed: %w", err) } status, err := job.Wait(ctx) if err != nil { return nil, nil, fmt.Errorf("job wait failed: %w", err) } if status.Err() != nil { return nil, nil, fmt.Errorf("job execution error: %w", status.Err()) } it, err := job.Read(ctx) if err != nil { return nil, nil, fmt.Errorf("job read failed: %w", err) } // 1ページあたり500行ずつ取得する it.PageInfo().MaxSize = 500 // データを逐次的に返すチャネルを作成 outChan := make(chan *HashtagEntryRanking) errChan := make(chan error, 1) go func() { defer close(outChan) defer close(errChan) for { bqRanking := new(HashtagEntryRanking) err := it.Next(bqRanking) if errors.Is(err, iterator.Done) { logger.Default().Info("Finished reading all BigQuery results") break } if err != nil { logger.Default().Error("Iterator Next failed", zap.Error(err)) errChan <- fmt.Errorf("it.Next: %w", err) return } outChan <- bqRanking } }() return outChan, errChan, nil }
また、ハッシュタグ記事ランキングデータは直近5日間のデータのみを保管するという仕様があり、これまでは、DynamoDBのttlで自動削除していましたが、今回は、RDBに置き換えたため、バッチ処理のロジック側で対応しました。そして、一度に削除するデータ量が1.5GBほどで、一括削除するとCPU使用率が急上昇してしまうため、メトリクスを確認しながら、バッチサイズを調整し、逐次削除で対応しました。
ハッシュタグ記事ランキング取得APIの移行
CronJobのバッチ処理で書き込まれたハッシュタグ記事のランキングデータを、既存実装ではElasticBeanstalk上のAPIでクライアントに提供していましたが、ElasticBeanstalkを管理するawsアカウントは今後廃止する動きがあり、これらAPIも、Amebaブログのサブシステムが集まるEC2側に移行することが決まりました。
既存のDynamoDBに依存するハッシュタグ関連のAPIは全部で9個ほどありましたが、期間の都合上、今回は二つのAPIを移行しました。
実装に関しては、DynamoDBを利用したinfra層のロジックを理解した上で、Amebaブログで使われているbobというGoのORMを利用しましたが、従来のコードと比べると、シンプルかつ少ないコード量になることが実感できました。
旧実装(DynamoDBから取得)
func (r *tagRankingRepositoryImplV2) listTagRankingEntriesByAmebaIDSimple( ctx context.Context, dt string, amebaID model.AmebaID, limit int, offsetRank *int, offsetEntryID *model.EntryID, offsetHashtag *model.HashtagID ) (es []blogtagRankingEntryV2, nextOffsetRank *int, nextOffsetEntryID *model.EntryID, nextOffsetHashtag *model.HashtagID, err error) { key := fmt.Sprintf("%s_%s", amebaID.String(), dt) var exclusiveStartKey map[string]types.AttributeValue if offsetRank != nil && offsetEntryID != nil && offsetHashtag != nil { exclusiveStartKey = map[string]types.AttributeValue{ "rank": &types.AttributeValueMemberN{ Value: fmt.Sprintf("%d", *offsetRank), }, "ameba_id_dt": &types.AttributeValueMemberS{ Value: key, }, "ameba_entry_id_tag_name": &types.AttributeValueMemberS{ Value: fmt.Sprintf("%s_%s_%s", amebaID.String(), offsetEntryID.String(), offsetHashtag.TagName()), }, "dt": &types.AttributeValueMemberS{ Value: dt, }, } } res, err := r.client.Query(ctx, &dynamodb.QueryInput{ TableName: &r.blogtagEntryRankingTable, IndexName: aws.String("ameba_id_dt-rank-index"), KeyConditionExpression: aws.String("ameba_id_dt = :ameba_id_dt"), FilterExpression: aws.String("official_tag_flag = :official_tag_flag"), ExpressionAttributeValues: map[string]types.AttributeValue{ ":ameba_id_dt": &types.AttributeValueMemberS{ Value: key, }, ":official_tag_flag": &types.AttributeValueMemberN{ Value: "1", }, }, Limit: aws.Int32(int32(limit)), ScanIndexForward: aws.Bool(true), ExclusiveStartKey: exclusiveStartKey, }) if err != nil { return nil, nil, nil, nil, fmt.Errorf("r.client.Query: %w", err) } if res.LastEvaluatedKey != nil { lastEvaluatedKey := res.LastEvaluatedKey var l blogtagRankingEntryV2 if err := attributevalue.UnmarshalMap(lastEvaluatedKey, &l); err != nil { return nil, nil, nil, nil, fmt.Errorf("attributevalue.UnmarshalMap: %w", err) } nextOffsetRank = &l.Rank eid := model.EntryID(l.EntryID()) nextOffsetEntryID = &eid hid := model.NewHashtagID(l.TagName()) nextOffsetHashtag = &hid } var blogtagRankingEntries []blogtagRankingEntryV2 if err := attributevalue.UnmarshalListOfMaps(res.Items, &blogtagRankingEntries); err != nil { return nil, nil, nil, nil, fmt.Errorf("attributevalue.UnmarshalListOfMaps: %w", err) } return blogtagRankingEntries, nextOffsetRank, nextOffsetEntryID, nextOffsetHashtag, nil }
func (r *hashtagEntryRankingRepositoryImpl) listTagRankingEntriesByAmebaIDSimple( ctx context.Context, dt civil.Date, amebaID model.AmebaID, limit int, offsetRanking *int, offsetEntryID *model.HashtagEntryID, offsetHashtag *model.HashtagID, ) ([]hashtagdb.BlogHashtagEntryRanking, error) { qb := mysql.Select( sm.From(hashtagdb.BlogHashtagEntryRankings.Name()), sm.Where(hashtagdb.BlogHashtagEntryRankings.Columns.AmebaID.EQ(mysql.Arg(amebaID.String()))), sm.Where(hashtagdb.BlogHashtagEntryRankings.Columns.DT.EQ(mysql.Arg(dt.String()))), sm.Where(hashtagdb.BlogHashtagEntryRankings.Columns.OfficialTagFlag.EQ(mysql.Arg(1))), ) if offsetRanking != nil && offsetEntryID != nil && offsetHashtag != nil { qb.Apply(sm.Where(mysql.Raw( "(rank, entry_id, tag_name) > (?, ?, ?)", mysql.Arg(*offsetRanking), mysql.Arg(offsetEntryID.String()), mysql.Arg(offsetHashtag.TagName()), ))) } qb.Apply( sm.OrderBy(clause.OrderDef{ Expression: hashtagdb.BlogHashtagEntryRankings.Columns.Rank, Direction: " ASC", }), sm.OrderBy(clause.OrderDef{ Expression: hashtagdb.BlogHashtagEntryRankings.Columns.EntryID, Direction: " ASC", }), sm.OrderBy(clause.OrderDef{ Expression: hashtagdb.BlogHashtagEntryRankings.Columns.TagName, Direction: " ASC", }), sm.Limit(int64(limit)), ) rows, err := bob.All(ctx, r.replicaDBs.Next(), qb, scan.StructMapper[hashtagdb.BlogHashtagEntryRanking]()) if err != nil { if errors.Is(err, sql.ErrNoRows) { return []hashtagdb.BlogHashtagEntryRanking{}, nil } return nil, fmt.Errorf("bob.All: %w", err) } return rows, nil }
上記コードを比較すると、SQLでの実装が以下二つの観点で改善されている事がわかります。
SQLでの実装の方では、クエリの意図がコードから直接読み取れる一方で、DynamoDBはやや難解です。ExclusiveStartKey
ameba_entry_id_tag_name
amebaID_entryID_tagName
ScanIndexForward: true
DynamoDB版の実装を追うと、コスト効率の観点で問題があった事がわかります。FilterExpression
official_tag_flag = 1
MySQLに置き換えたことにより、WHERE句とインデックスによって該当レコードのみを正確に読み取れるようになり、不要なレコードの読み取りが排除されました。これによりRead Capacityの無駄な消費がなくなり、コスト効率が改善されました。
この記事では、Amebaブログのハッシュタグ記事ランキングのストレージ移行とシステム刷新について紹介しました。今回の刷新により、インフラコストの削減と運用効率化が期待されます。
関連記事
今日のまとめ
AI日報で今日の重要ニュースをまとめ読み