AIニュース最前線
最新ニュースAI日報Hacker日報週報動画AIツールトレンド企業

AIニュース最前線

世界中のAI最新情報を日本語で毎時更新

最新ニュース日報トレンド企業プレミアムRSS
© 2026 ainew.jp特定商取引法に基づく表記
ニュース一覧元記事を開く
CyberAgent Developers Blog·2026年4月9日 09:23·約4分で読める

KubernetesでCDC基盤を構築した話 ~ Strimzi(Kafka)・Debezium・Snowflakeを使ったMySQLデータの自動同期基盤 ~

#データエンジニアリング#CDC#Kubernetes#オープンソース#クラウドネイティブ#データ同期
TL;DR

CyberAgentのDynalystは、コスト・運用負荷・制御性・整合性の要件を満たすため、Strimzi(Kafka)・Debezium・Snowflakeを組み合わせたKubernetesベースのCDC基盤を構築し、Aurora MySQLからSnowflakeへのデータ連携課題を解決した。

AI深層分析2026年4月9日 10:42
3
注目/ 5段階
深度40%
4
関連度30%
3
実用性20%
5
革新性10%
2

キーポイント

1

従来構成の課題と要件定義

SnowflakeのExternal Function経由でAWS LambdaからAurora MySQLデータを取得する従来構成では、データ量が多いテーブルでLambdaのレスポンスサイズ制限(6MB)に引っかかりデータ取得が不可能となる課題が発生した。これを解決するため、コスト抑制・運用負荷軽減・同期頻度の制御性・データ整合性の4要件を満たすCDC基盤の構築が必要となった。

2

複数ソリューションの検討と比較

AWS DMS+S3+Snowpipe、Amazon MSK+MSK Connect+Debezium、Snowflake Openflow(BYOC/SPCS)、SaaS(Fivetran・Airbyte・Troccoなど)を検討したが、いずれも常時稼働コストが高い、細かい制御に制約があるなどの理由で採用を見送った。

3

採用ソリューションの選定理由

Debezium + Kafka on Kubernetes(Strimzi)を採用した理由は、既存のKubernetes環境(AKE)を活用できるためインフラコストを最小限に抑えられ、Strimziによる宣言的運用で負荷が軽減され、テーブル単位での同期頻度制御とデータ整合性が確保できるため、4つの要件をすべて満たしたためである。

4

技術構成と実現方法

StrimziでKubernetes上にKafkaを構築し、DebeziumがAurora MySQLのバイナリログを読み取って変更イベントをKafkaトピックに配信し、Snowflake側でStream + TaskによるMERGE INTO処理でINSERT/UPDATE/DELETEを正確に反映する構成を実現した。

5

総合評価と将来展望

要件を満たすことに加え、OSSによるベンダー非依存性、円安の影響を受けにくさ、将来的なpub/subアーキテクチャへの横展開可能性を総合的に評価してこのソリューションを採用した。

6

Snowflakeでの更新・削除処理の実装方法

Snowflake Sink Connectorは追記のみ対応のため、ステージングテーブルでイベントを受け取り、StreamとTaskによるMERGE INTO処理でINSERT/UPDATE/DELETEを本テーブルに反映する構成を採用している。

7

事前作成が必要なSnowflakeリソース

ステージングテーブルと本テーブルはKafkaConnector起動前に事前作成する必要があり、Streamも同時に作成することで初回スナップショットのINSERTが確実に反映されるようにしている。

影響分析・編集コメントを表示

影響分析

この事例は、クラウドネイティブ環境におけるデータ連携の実践的課題を、オープンソース技術を組み合わせて解決した成功例として、特にコスト敏感な企業や既存Kubernetes環境を活用したい組織にとって参考価値が高い。AI/データ分析分野において、基盤データのリアルタイム連携の重要性が高まる中、実装パターンの一つとして認知される可能性がある。

編集コメント

技術選定の判断プロセスが詳細に記述されており、実務での意思決定の参考になる。ただし、AI技術そのものの進展ではなく、AI/データ分析を支える基盤構築の事例という位置付け。

apiVersion: kafka.strimzi.io/v1

kind: KafkaConnector

metadata:

name: snowflake-sink-connector

namespace: debezium-kafka

labels:

strimzi.io/cluster: debezium-kafka-connect

spec:

class: com.snowflake.kafka.connector.SnowflakeSinkConnector

tasksMax: 3

config:

# Snowflake接続設定(Kubernetes Secret参照)

snowflake.url.name: ${secrets:debezium-kafka/snowflake-credentials:url}

snowflake.user.name: ${secrets:debezium-kafka/snowflake-credentials:user}

snowflake.private.key: ${secrets:debezium-kafka/snowflake-credentials:private_key}

snowflake.private.key.passphrase: ${secrets:debezium-kafka/snowflake-credentials:private_key_passphrase}

snowflake.database.name: ${secrets:debezium-kafka/snowflake-credentials:database}

snowflake.schema.name: ${secrets:debezium-kafka/snowflake-credentials:schema}

snowflake.role.name: ${secrets:debezium-kafka/snowflake-credentials:role}

# データ転送設定

topics.regex: "mysql-server\\.sample_db\\.*"

snowflake.topic2table.map: "mysql-server.sample_db.sample_table_1:SAMPLE_TABLE_1_STAGING,..."

snowflake.ingestion.method: SNOWPIPE_STREAMING

snowflake.enable.schematization: false

# メッセージのkey/valueのシリアライズ形式指定

key.converter: org.apache.kafka.connect.json.JsonConverter

value.converter: org.apache.kafka.connect.json.JsonConverter

# エラーハンドリング

errors.tolerance: all

errors.log.enable: true

errors.log.include.messages: true

errors.deadletterqueue.topic.name: dlq-snowflake

errors.deadletterqueue.context.headers.enable: true

以下のコマンドでKafkaConnectorのステータスを確認します。

bash
$ kubectl get kafkaconnector
NAME                        CLUSTER                 CONNECTOR CLASS                              MAX TASKS   READY
debezium-mysql-connector    debezium-kafka-connect  io.debezium.connector.mysql.MySqlConnector              True
snowflake-sink-connector    debezium-kafka-connect  com.snowflake.kafka.connector.SnowflakeSinkConnector  3  True

構築した環境でMySQLへの変更がSnowflakeに反映されることを確認します。

1. INSERTの確認

MySQLでレコードを挿入します。

sql
INSERT INTO sample_table (id, name) VALUES (1, 'test');

ステージングテーブルのRECORD_CONTENTカラムには、以下のJSON形式のデータが格納されます(主要なキーと値以外は省略)。INSERTイベントではbeforeはnullになります。

json
{
  "after": { "created_at": null, "id": 1, "name": "test", "updated_at": null },
  "before": null,
  "op": "c",
  "source": { "connector": "mysql", ... },
  ...
}

Snowflakeのステージングテーブルにレコードが挿入され、Taskの実行が完了すると、本テーブルにもレコードが挿入されていることが確認できます。

2. UPDATEの確認

MySQLでレコードを更新します。

sql
UPDATE sample_table SET name = 'test_updated' WHERE id = 1;

ステージングテーブルのRECORD_CONTENTカラムには、以下のJSON形式のデータが格納されます。UPDATEイベントではbeforeとafterの両方に値が含まれます。

json
{
  "after": { "created_at": null, "id": 1, "name": "test_updated", "updated_at": null },
  "before": { "created_at": null, "id": 1, "name": "test", "updated_at": null },
  "op": "u",
  "source": { "connector": "mysql", ... },
  ...
}

Snowflakeのステージングテーブルにレコードが挿入され、Taskの実行が完了すると、本テーブルのレコードが更新されていることが確認できます。

3. DELETEの確認

MySQLでレコードを削除します。

sql
DELETE FROM sample_table WHERE id = 1;

ステージングテーブルのRECORD_CONTENTカラムには、以下のJSON形式のデータが格納されます。DELETEイベントではafterはnullになります。

json
{
  "after": null,
  "before": { "created_at": null, "id": 1, "name": "test_updated", "updated_at": null },
  "op": "d",
  "source": { "connector": "mysql", ... },
  ...
}

Snowflakeのステージングテーブルにレコードが挿入され、Taskの実行が完了すると、本テーブルのレコードが削除されていることが確認できます。

INSERT・UPDATE・DELETEがすべて正しく本テーブルに反映されていれば動作確認完了です。なお、TaskはStreamにデータが存在する場合のみ実行されるため、Taskに設定したスケジュール間隔分の遅延が発生します。

CDC対象テーブルを追加する手順

新たにsample_table_newを追加する場合を例に説明します。

#### Incremental Snapshot(増分スナップショット)について

テーブルを追加する際にはIncremental Snapshotの実行が必要です。DebeziumのInitial Snapshot(初期スナップショット)はConnectorの初回起動時に一度だけ実行される仕組みのため、後からtable.include.listにテーブルを追加しても自動的にはスナップショットが実行されません。

Incremental Snapshotのトリガーには、構築手順で設定した以下の3点が関わっています。

  1. GTID有効化: 通常のIncremental SnapshotはDBへの書き込み権限を必要とします。read.only: "true"を設定しているため、GTIDベースのIncremental Snapshotを使用します。
  2. KafkaConnectorのシグナル関連設定: Incremental SnapshotはKafkaのシグナルトピックにメッセージを送ることでトリガーします。signal.enabled.channels: kafkaとsignal.kafka.topic: debezium-signalを設定しています。
  3. シグナルトピック: KafkaTopicで作成したdebezium-signal

#### 追加手順

  1. Snowflakeリソースの作成

構築手順のSnowflake設定と同様に、ステージングテーブル・Stream・Task・本テーブルをTerraformで追加します。

  1. KafkaConnectorの更新
  • 新テーブル用のKafkaTopicリソースを追加します。
  • あわせてDebezium MySQL ConnectorのKafkaConnectorリソースのtable.include.listを更新します。
yaml
        table.include.list: sample_db.sample_table_1,sample_db.sample_table_2,sample_db.sample_table_new
  • Snowflake Sink ConnectorのKafkaConnectorリソースのsnowflake.topic2table.mapを更新します。
yaml
        snowflake.topic2table.map: "mysql-server.sample_db.sample_table_1:SAMPLE_TABLE_1_STAGING,...,mysql-server.sample_db.sample_table_new:SAMPLE_TABLE_NEW_STAGING"
  1. Incremental Snapshotの実行

以下のコマンドでシグナルトピックにメッセージを送信してIncremental Snapshotをトリガーします。なお、メッセージのキーはtopic.prefixで指定した値(例: mysql-server)にする必要があります。

bash
    kubectl exec -it debezium-kafka-kafka-0 -n debezium-kafka -- \
    bin/kafka-console-producer.sh \
    --bootstrap-server localhost:9092 \
    --topic debezium-signal \
    --property "parse.key=true" \
    --property "key.separator=:" <<EOF
    mysql-server:{"type":"execute-snapshot","data":{"data-collections":["sample_db.sample_t
原文を表示

AI事業本部 アドテクカンパニー Dynalystに所属している平田聡一朗と申します。本記事ではStrimziを用いてKubernetes上でKafkaを構築し、MySQLからSnowflakeへのデータ連携を実現したCDC(Change Data Capture)基盤の導入事例を共有します。

DynalystではアプリケーションログをSnowflakeに送信し分析業務で活用していましたが、Aurora MySQLに保存されたデータも分析に使いたい要望がありました。これまでは下図のようにSnowflakeのExternal Function経由でAWS Lambdaを読み出し、Aurora MySQLのデータを取得する構成でした。この構成ではSnowflakeのViewからAurora MySQLのデータをTableのように扱うことができました。しかしAurora MySQL側のデータ量が多いテーブルではLambdaのレスポンスサイズ制限(6MB)に引っかかり、データを取得できない問題が発生しました。この問題を解決するため、リアルタイムに近い形でデータ量が多いテーブルをSnowflakeに同期するCDC基盤の構築が必要となりました。

コスト: CDC基盤は常時稼働が前提となるため、定常的なコストを抑えられる構成を要件としました。

運用負荷: リソースが限られているため、運用作業の負担が少ないことを要件としました。

同期頻度の制御性: テーブルごとに同期頻度の要件が異なるため、テーブル単位で制御できることを要件としました。

データ整合性: INSERT/UPDATE/DELETEのすべての操作をSnowflakeに正確に反映できることを要件としました。

2-2.検討したソリューション

AWS DMS + S3 + Snowpipe

AWSのフルマネージドで完結できるため運用負荷は低く、コミュニティでの実績事例で参考になる情報が多いです。しかしレプリケーションインスタンスの料金は同等スペックのEC2インスタンスの料金と比較して割高な料金体系であり、CDC基盤において常時稼働のコストがかかり円安の影響を受けやすいと判断し、採用を見送りました。

Amazon MSK + MSK Connect + Debezium

AWSが提供するマネージドKafkaサービスであるAmazon MSKとMSK Connectを組み合わせることで、Debeziumをコネクタープラグインとして利用し、Aurora MySQLのバイナリログを読み取るCDC基盤を構築できます(参考)。しかしMSKプロビジョンドのブローカーインスタンスとMSK ConnectのMCU(MSK Connect Unit)単位の課金が常時発生するため、AWS DMS同様に常時稼働のコストがかかり円安の影響を受けやすいと判断し、採用を見送りました。

Snowflake Openflow(BYOC/SPCS)

Snowflake OpenflowとはSnowflakeが提供するCDC機能で、BYOC(Bring Your Own Cloud)とSPCS(Snowpark Container Services)の方式があります。Snowflakeの機能だけでCDC基盤を構築できる点は魅力ですが常時稼働のコストがかかります。具体的にBYOCはSnowflakeが用意したCloudFormationを利用して自身のAWS環境にEKSを構築し、そこからAurora MySQL等のデータソースに接続する方式ですが、EKSやNAT Gatewayなどのリソースは自前で管理する必要があり、データ量が少ない場合でも起動時間に応じたコストが発生してしまいます(参考)。 SPCSはそのインフラ管理をSnowflakeに委ねる方式で運用負荷はさらに低くなりますが、Openflow制御プール(Openflowのデプロイメントの管理を担うリソース)がアクティブな間はコストが発生し、ランタイム(コネクタが実際に動作してデータを処理する実行環境)利用時にはその分のコストも加算されてしまいます(参考)。また、Aurora MySQLをプライベートサブネットに配置している場合はNLBやプロキシサーバーのコストも発生します(参考)。以上の理由からどちらの方式でもコストがかかりやすいと判断し採用を見送りました。

SaaS(Fivetran・Airbyte・Troccoなど)

環境構築が不要でサポートも充実していますが、いずれも利用規模や必要機能に応じてコストが増加しやすいです(Fivetran料金・Airbyte料金・Trocco料金)。また標準プランでも基本的なスケジュール設定は可能ですが、Fivetranは1分間隔の同期にEnterprise以上のプランが必要(参考)、AirbyteのCloud版は最短1時間が下限(参考)、TroccoはCRON式のような時刻指定ができず4パターンに限定されるなど(参考)、細かい制御には制約が生じてしまうため採用を見送りました。

Debezium + Kafka on Kubernetes(Strimzi)

管理するコンポーネントが多く構成の理解にキャッチアップが必要ですが、今回検討したどのソリューションもチームにとって新規導入となるため、キャッチアップコストに大きな差はないと判断しました。コストに関して、bidやadサーバーなどの主要アプリケーションはサイバーエージェントのプライベートクラウドであるCycloudが提供するKubernetes環境(AKE)でデプロイされており、そのインフラをそのまま活用できるためインフラコストを最小限に抑えられます。運用負荷に関して、KafkaをCNCFのIncubatingプロジェクトであるStrimziを利用することで、Kubernetesのマニフェストで宣言的に管理できるため、バージョンアップや設定変更もマニフェストで対応でき、運用がシンプルに保つことができます。同期頻度の制御性に関して、DebeziumがDBのバイナリログを読み取ることで変更を検知しKafkaトピックに配信し、SnowflakeではテーブルごとにTaskのスケジュールを設定できるため、同期頻度をテーブル単位で制御できます。データ整合性に関して、DebeziumはINSERT/UPDATE/DELETEの操作を変更イベントとしてKafkaトピックに配信し、SnowflakeではStream + TaskによるMERGE INTO処理でこれらの操作を正確に反映できます。

このように4つの判断軸において要件を満たしていることに加え、OSSによるベンダー非依存、円安の影響を受けにくい、将来的にpub/subアーキテクチャへ横展開できる拡張性を総合的に評価し、Debezium + Kafka on Kubernetesを採用しました。

今回構築したアーキテクチャは下図のようにDebeziumがMySQLのバイナリログを読み取り、KafkaとSnowflake Kafka Connectorを経由してSnowflakeにデータを同期する構成です。

シークレット管理: AWS Secrets ManagerにMySQL・Snowflakeの認証情報を保管し、External SecretsがKubernetesのSecretとして同期します。これによりマニフェストに認証情報を記述することなく管理しています。

MySQL構成: Aurora MySQLをプライマリ、Cycloud上のMySQLをレプリカとしたレプリケーション構成を組んでいます。KafkaはAKE上で動作しているため、Cycloud内のMySQLのバイナリログを参照する構成としています。

Kafka構成: KafkaはStrimziによってAKE上で構築します。主なコンポーネントは以下の通りです。 Kafka: Kafkaクラスター全体を管理する中心的なリソースです。レプリケーション係数やリスナー設定などクラスター全体の設定を定義します。

KafkaNodePool: ノードグループを定義するリソースです。各ノードはBroker(メッセージの受け渡し)とController(クラスターのメタデータ管理)を共存させており、Affinityにより異なるノードに分散配置することで耐障害性を確保しています。

KafkaConnect: MySQLおよびSnowflakeとの接続を担うプラグインの実行環境です。Debezium MySQL ConnectorとSnowflake Sink Connectorをビルド機能でカスタムイメージに組み込みPod上で動作させています。

KafkaConnector: Kafka Connect上で動作するコネクタの設定を定義するリソースです。Debezium MySQL Connectorはバイナリログから読み取ったイベントをKafkaトピックに配信し、Snowflake Sink ConnectorはそのイベントをSnowflakeのステージングテーブルに書き込みます。

KafkaTopic: トピックを定義するリソースです。パーティション数・レプリカ数・保持期間などの設定を管理します。CDCデータトピックのほか、Connectorのオフセットやステータスなど内部管理に必要なトピックもすべてマニフェストで管理しています。

Snowflake側構成: Snowflake Sink ConnectorはトピックのメッセージをそのままSnowflakeテーブルに追記するのみで、更新・削除の反映には対応していません(参考)。そこでステージングテーブルでイベントをいったん受け取り、Stream + TaskによるMERGE INTO処理でINSERT/UPDATE/DELETEを本テーブルに反映する構成としています。Taskのスケジュールはテーブルごとに設定できるため、同期タイミングをテーブル単位で制御できます。

KafkaConnectのビルド機能でカスタムイメージをプッシュするためのECRリポジトリ(kafka-connect-debezium

resource "aws_ecr_repository" "kafka_connect_debezium" { name = "kafka-connect-debezium" image_scanning_configuration { scan_on_push = true } }

Debezium専用ユーザー作成

Debezium用ユーザーを作成します。SELECT

REPLICATION SLAVE

REPLICATION CLIENT

-- ユーザー作成 CREATE USER 'debezium'@'%' IDENTIFIED BY '<password>'; -- 必須権限の付与 GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';

Debeziumはバイナリログを読み取る必要があるため、log_bin

binlog_format = ROW

binlog_row_image = FULL

SHOW VARIABLES LIKE 'log_bin'; -- ON SHOW VARIABLES LIKE 'binlog_format'; -- ROW SHOW VARIABLES LIKE 'binlog_row_image'; -- FULL

GTIDも有効化しておきます。Debeziumにおいて必要な設定ではありませんが、動作確認の項目で説明するIncremental Snapshotの実行で必要になります(参考)。

SHOW VARIABLES LIKE 'gtid_mode'; -- ON SHOW VARIABLES LIKE 'enforce_gtid_consistency'; -- ON

4-3.Snowflake設定

Snowflake Sink ConnectorはKafkaトピックへの追記のみ対応しており、更新・削除をSnowflakeテーブルに直接反映できません。そのためステージングテーブルでイベントをいったん受け取り、Stream + TaskによるMERGE INTO処理でINSERT/UPDATE/DELETEを本テーブルに反映する構成としています。ステージングテーブルのカラム構成やVARIANT型を採用した理由については後述のSnowflake Sink Connectorの設定箇所で説明します。各リソースはTerraformで管理しています。

ステージングテーブルと本テーブルはKafkaConnector起動前に事前作成する必要があります。Streamはソーステーブルへの変更をオフセットで管理しており、Streamの作成時点以降に発生した変更のみを検知します(参考)。そのためKafkaConnectorでステージングテーブルを自動作成した場合、テーブル作成後にStreamを作成することになり、KafkaConnector起動直後に実行される初回スナップショットのINSERTがStreamに検知されず、本テーブルへ反映できません。これを防ぐためにConnector起動前にステージングテーブルとStreamを事前作成しています。

ステージングテーブルにはRECORD_METADATA

resource "snowflake_table" "sample_table_staging" { database = var.database schema = var.schema name = "SAMPLE_TABLE_STAGING" change_tracking = true # Streamの作成に必要 column { name = "RECORD_METADATA" type = "VARIANT" nullable = true } column { name = "RECORD_CONTENT" type = "VARIANT" nullable = true } } resource "snowflake_table" "sample_table" { database = var.database schema = var.schema name = "SAMPLE_TABLE" column { name = "ID" type = "NUMBER(38,0)" nullable = false } column { name = "NAME" type = "VARCHAR(100)" nullable = true } column { name = "CREATED_AT" type = "TIMESTAMP_NTZ" nullable = true } column { name = "UPDATED_AT" type = "TIMESTAMP_NTZ" nullable = true } } resource "snowflake_stream_on_table" "sample_table_stream" { database = var.database schema = var.schema name = "SAMPLE_TABLE_STREAM" table = snowflake_table.sample_table_staging.fully_qualified_name append_only = true # ステージングテーブルへの書き込みはINSERTのみのためappend_onlyモードで効率化 } resource "snowflake_task" "sync_sample_table" { database = var.database schema = var.schema name = "SYNC_SAMPLE_TABLE" user_task_managed_initial_warehouse_size = "XSMALL" # サーバーレスTaskとして動作 started = true schedule { minutes = 5 } when = "SYSTEM$STREAM_HAS_DATA('${snowflake_stream_on_table.sample_table_stream.name}')" sql_statement = templatefile("${path.module}/task/sync_sample_table.sql.tpl", { target_table = snowflake_table.sample_table.fully_qualified_name stream_name = snowflake_stream_on_table.sample_table_stream.fully_qualified_name }) }

MERGE処理ではDebeziumのイベントに含まれるop

COALESCE(after:id, before:id)

TO_TIMESTAMP_NTZ(... / 1000)

QUALIFY ROW_NUMBER()

-- sync_sample_table.sql.tpl MERGE INTO ${target_table} target USING ( SELECT COALESCE(RECORD_CONTENT:after:id, RECORD_CONTENT:before:id)::NUMBER(38,0) AS ID, RECORD_CONTENT:after:name::VARCHAR(100) AS NAME, TO_TIMESTAMP_NTZ(RECORD_CONTENT:after:created_at::NUMBER / 1000) AS CREATED_AT, TO_TIMESTAMP_NTZ(RECORD_CONTENT:after:updated_at::NUMBER / 1000) AS UPDATED_AT, RECORD_CONTENT:op::VARCHAR(1) AS OP, RECORD_CONTENT:ts_ms::NUMBER AS TS_MS, RECORD_CONTENT:source:pos::NUMBER AS BINLOG_POS FROM ${stream_name} QUALIFY ROW_NUMBER() OVER ( PARTITION BY COALESCE(RECORD_CONTENT:after:id, RECORD_CONTENT:before:id) ORDER BY RECORD_CONTENT:ts_ms DESC, RECORD_CONTENT:source:pos DESC ) = 1 ) source ON target.ID = source.ID WHEN MATCHED AND source.OP = 'd' THEN DELETE WHEN MATCHED AND source.OP IN ('c', 'u', 'r') THEN UPDATE SET target.NAME = source.NAME, target.CREATED_AT = source.CREATED_AT, target.UPDATED_AT = source.UPDATED_AT WHEN NOT MATCHED AND source.OP IN ('c', 'u', 'r') THEN INSERT ( ID, NAME, CREATED_AT, UPDATED_AT ) VALUES ( source.ID, source.NAME, source.CREATED_AT, source.UPDATED_AT );

4-1で作成したECRへの権限を付与したIAMユーザーのアクセスキーとシークレットキーをSecrets Managerに登録し、ExternalSecretでKubernetesのSecretとして同期します。ECRの認証トークンは12時間で失効するため、トークンを5分ごとに再生成するCronJobを設定しています。CronJobのマニフェストは詳細を割愛しますが、ここで作成されるSecret(ecr-token

apiVersion: external-secrets.io/v1beta1 kind: ExternalSecret metadata: name: ecr-aws-secret spec: secretStoreRef: kind: ClusterSecretStore name: aws-secret-store data: - remoteRef: key: ecr-aws-access-keys # Secrets ManagerのキーID property: access-key-id # 取得するプロパティ secretKey: AWS_ACCESS_KEY_ID # Kubernetes Secret上のキー名 # AWS_SECRET_ACCESS_KEYも同様に同期

MySQL・Snowflake認証情報

MySQLはホスト・ポート・ユーザー名・パスワード、Snowflakeは接続URL・ユーザー名・秘密鍵・パスフレーズ・データベース・スキーマ・ロールをそれぞれSecrets Managerに登録しています。ECR認証情報と同様にExternalSecretでKubernetes Secretとして同期しており、それぞれmysql-credentials

snowflake-credentials

Strimzi Kafka Operatorのインストール

Kafkaクラスターを構築する前にStrimzi Kafka Operatorをインストールする必要があります。チームではすでにArgoCDを導入しているため、OperatorもArgoCDのApplicationリソースとして管理しています。Helmチャートからデプロイするように定義しており、watchNamespaces

kubectl create namespace debezium-kafka

次にArgoCDのApplicationリソースを作成してOperatorをデプロイします。必要に応じてspec.source.helm.valuesObject

apiVersion: argoproj.io/v1alpha1 kind: Application metadata: name: strimzi-kafka-operator namespace: argocd spec: source: repoURL: https://strimzi.io/charts/ chart: strimzi-kafka-operator targetRevision: 0.50.1 helm: valuesObject: watchNamespaces: - debezium-kafka replicas: 3 destination: server: https://kubernetes.default.svc namespace: debezium-kafka

以下コマンドでoperatorが起動したことを確認します。

$ kubectl get pods -l name=strimzi-cluster-operator NAME READY STATUS RESTARTS AGE strimzi-cluster-operator-7668c9b9bb-5tvnf 3/3 Running 0 16d

Strimziではロギング設定やメトリクス設定などをConfigMapで管理できます。今回はその中でもメトリクス設定をConfigMapで管理しており、KafkaおよびKafka ConnectリソースからこのConfigMapを参照することでPrometheus JMX Exporterによるメトリクス収集を有効化しています。収集対象のメトリクスの詳細は後述の監視設定の項目で説明します。

apiVersion: v1 kind: ConfigMap metadata: name: kafka-metrics namespace: debezium-kafka data: kafka-metrics-config.yml: | lowercaseOutputName: true rules: # 収集するメトリクスを定義

KafkaリソースはStrimziがKafkaクラスター全体を管理するための中心的なリソースで、ブローカーの設定やリスナー、メトリクス設定などを定義します。replication.factor

min.insync.replicas

KafkaTopicリソースで全トピックの設定をマニフェストで管理できます。Topic OperatorがKafkaTopicリソースとKafka内部のトピック状態を常に同期するため、マニフェスト以外での自動作成を防止するためにauto.create.topics.enable: false

なおKafka 4.0以降はZooKeeperが廃止されKRaftモードのみがサポートされています。後述のKafkaNodePoolでbrokerとcontrollerのロールを定義することでクラスターが構成されます。

apiVersion: kafka.strimzi.io/v1 kind: Kafka metadata: name: debezium-kafka namespace: debezium-kafka spec: kafka: version: 4.1.1 jmxOptions: {} listeners: - name: plain port: 9092 type: internal # クラスター内部からのみアクセス可能なリスナー tls: false config: default.replication.factor: 3 # トピック作成時のデフォルトレプリカ数 min.insync.replicas: 2 # 書き込み成功に必要な最低同期レプリカ数 auto.create.topics.enable: false metricsConfig: # Prometheus JMX ExporterによるメトリクスのConfigMapを参照 type: jmxPrometheusExporter valueFrom: configMapKeyRef: name: kafka-metrics key: kafka-metrics-config.yml entityOperator: topicOperator: {} # KafkaTopicリソースとKafka内部トピックの状態を同期

KafkaNodePoolの作成

KafkaNodePoolはKafkaクラスターを構成するノードグループを定義するリソースで、各ノードのロール・ストレージ・配置ルールを設定します。KafkaNodePoolで定義されていない設定はKafkaリソースから継承され、KafkaNodePool側で上書き可能です。

brokerとcontrollerは同一ノードに共存させています。本来は別ノードへの分離が推奨されますが、現時点ではCDC対象のテーブル数が限られているため同一ノードへの共存で問題ないと判断しています。Kafkaのメッセージデータを永続化するためPersistentVolumeClaimを使用しています。

nodeAffinityとpodAntiAffinityの2つを組み合わせてPodの配置を制御しています。nodeAffinityで専用ノードプールにのみPodがスケジュールされるよう制御し、podAntiAffinityでPodが同一のノードに重複配置されないようにしています。これにより1台のノードが停止しても残り2台でクラスターを維持できます。

チームの監視ツールにはDatadogを利用しており、Prometheus JMX Exporterが9404番ポートでメトリクスを公開し、Datadog Agentはannotationsに設定したエンドポイントからメトリクスを収集します。これにより上記のConfigMapで定義したメトリクスをDatadogで収集できます。

apiVersion: kafka.strimzi.io/v1 kind: KafkaNodePool metadata: name: kafka-pool namespace: debezium-kafka labels: strimzi.io/cluster: debezium-kafka spec: replicas: 3 roles: - broker - controller storage: type: persistent-claim class: {storageclass name} size: 30Gi deleteClaim: false resources: # KafkaNodePoolのリソース設定 requests: memory: 6Gi cpu: "3" limits: memory: 8Gi template: pod: metadata: annotations: ad.datadoghq.com/kafka.check_names: '["openmetrics"]' ad.datadoghq.com/kafka.init_configs: '[{}]' ad.datadoghq.com/kafka.instances: | [{ "openmetrics_endpoint": "http://%%host%%:9404/metrics", "namespace": "kafka", "metrics": [".*"] }] affinity: nodeAffinity: # Kafka専用ノードプールへのスケジューリングを制御 ... podAntiAffinity: # 同一物理ノードへの重複配置を禁止 ...

以下コマンドでKafkaとKafkaNodePoolが起動したことを確認します。

$ kubectl get kafka debezium-kafka NAME READY WARNINGS KAFKA VERSION METADATA VERSION debezium-kafka True 4.1.1 4.1-IV1 $ kubectl get pods -l app.kubernetes.io/name=kafka NAME READY STATUS RESTARTS AGE debezium-kafka-kafka-pool-az01-0 1/1 Running 0 16d debezium-kafka-kafka-pool-az02-1 1/1 Running 0 16d debezium-kafka-kafka-pool-az03-2 1/1 Running 0 16d

KafkaTopicリソースはトピックを管理するリソースです。今回の構成では以下の用途ごとにトピックを作成しています。

Kafka Connect内部管理トピック: connect-configs

connect-offsets

cleanup.policy: compact

シグナルトピック: debezium-signal

retention.ms: 86400000

DLQトピック: dlq-snowflake

retention.ms: 2592000000

スキーマ変更履歴トピック: schema-changes.<mysql schema name>

schema.history.internal.kafka.topic

retention.ms: -1

CDCデータトピック: mysql-server.<mysql schema name>.*

cleanup.policy: compact,delete

KafkaTopicリソースの設定例を以下に示します。他トピックも同様の構造で、用途に応じて上述のパーティション数などを変更しています。

apiVersion: kafka.strimzi.io/v1 kind: KafkaTopic metadata: name: connect-configs namespace: debezium-kafka labels: strimzi.io/cluster: debezium-kafka app.kubernetes.io/name: kafka-topic app.kubernetes.io/part-of: debezium-kafka spec: topicName: connect-configs partitions: 1 replicas: 3 config: min.insync.replicas: 2 cleanup.policy: compact

以下コマンドでKafkaTopicが作成されたことを確認します。

$ kubectl get kafkatopic NAME CLUSTER PARTITIONS REPLICATION FACTOR READY connect-configs debezium-kafka 1 3 True

KafkaConnectの作成

KafkaConnectリソースはKafkaと外部システム間の連携を担うプラグインを動作させるための実行環境を定義します。これらのプラグインはKafka Connect Podの起動時にあらかじめPod内に含める必要があるため、ビルド機能を使うことでプラグインを含むカスタムイメージを自動ビルドしECRにプッシュ、それをKafka Connect Podのイメージとして使用できます。今回は以下の3つのプラグインを含むイメージをビルドしています。利用可能なプラグインはConfluent Hubで検索でき、各プラグインのJARのダウンロードURLはMaven Centralで確認できます。

debezium-mysql-connector

snowflake-kafka-connector

kafka-connect-secret-provider

exactly.once.source.support: enabled

apiVersion: kafka.strimzi.io/v1 kind: KafkaConnect metadata: name: debezium-kafka-connect namespace: debezium-kafka annotations: strimzi.io/use-connector-resources: "true" # KafkaConnectorリソースでコネクタを管理 spec: version: 4.1.1 replicas: 3 bootstrapServers: debezium-kafka-kafka-bootstrap:9092 # Kafkaクラスターへの接続先のDNS名 offsetStorageTopic: connect-offsets # KafkaTopicリソースで作成したリソースを定義 configStorageTopic: connect-configs statusStorageTopic: connect-status build: output: type: docker image: 118102704242.dkr.ecr.ap-northeast-1.amazonaws.com/kafka-connect-debezium:latest pushSecret: ecr-token # ECR認証トークンが格納されたSecret名 plugins: - name: debezium-mysql-connector artifacts: - type: tgz url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/3.4.1.Final/debezium-connector-mysql-3.4.1.Final-plugin.tar.gz - name: snowflake-kafka-connector artifacts: - type: jar url: https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/3.5.3/snowflake-kafka-connector-3.5.3.jar - type: jar url: https://repo1.maven.org/maven2/org/bouncycastle/bc-fips/2.1.2/bc-fips-2.1.2.jar # 秘密鍵認証に必要 - type: jar url: https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-fips/2.1.9/bcpkix-fips-2.1.9.jar # 秘密鍵認証に必要 - name: kafka-connect-secret-provider artifacts: - type: jar url: https://repo1.maven.org/maven2/io/strimzi/kafka-kubernetes-config-provider/1.2.2/kafka-kubernetes-config-provider-1.2.2.jar config: exactly.once.source.support: enabled # DebeziumソースコネクタのExactly-Onceを有効化 config.providers: secrets config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider # KubernetesシークレットからConnector設定を読み込む metricsConfig: type: jmxPrometheusExporter valueFrom: configMapKeyRef: name: kafka-connect-metrics key: kafka-connect-metrics-config.yml template: pod: metadata: annotations: # メトリクス収集設定 ad.datadoghq.com/debezium-kafka-connect-connect.check_names: '["openmetrics"]' ad.datadoghq.com/debezium-kafka-connect-connect.init_configs: '[{}]' ad.datadoghq.com/debezium-kafka-connect-connect.instances: | [{ "openmetrics_endpoint": "http://%%host%%:9404/metrics", "namespace": "kafka_connect", "metrics": [".*"] }] affinity: # KafkaNodePoolと同様に分散配置で耐障害性を確保 ... imagePullSecrets: - name: ecr-token # ECR認証トークンが格納されたSecret名

またKafkaConnectリソースをデプロイすると、StrimziはKafka Connect Podが使用するServiceAccount(debezium-kafka-connect-connect

kafka-connect-secret-provider

apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: debezium-kafka-connect-secret-reader namespace: debezium-kafka labels: app.kubernetes.io/name: kafka-connect app.kubernetes.io/part-of: debezium-kafka rules: - apiGroups: [""] resources: ["secrets"] verbs: ["get", "list"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: debezium-kafka-connect-secret-reader namespace: debezium-kafka labels: app.kubernetes.io/name: kafka-connect app.kubernetes.io/part-of: debezium-kafka roleRef: apiGroup: rbac.authorization.k8s.io kind: Role name: debezium-kafka-connect-secret-reader subjects: - kind: ServiceAccount name: debezium-kafka-connect-connect namespace: debezium-kafka

以下のコマンドでKafka Connect Podが起動していることを確認します。

$ kubectl get pods -l strimzi.io/cluster=debezium-kafka-connect NAME READY STATUS RESTARTS AGE debezium-kafka-connect-connect-0 1/1 Running 0 16d debezium-kafka-connect-connect-1 1/1 Running 0 16d debezium-kafka-connect-connect-2 1/1 Running 0 16d

Debezium MySQL Connectorの作成

KafkaConnectorリソースはKafka Connect上で動作するコネクタの設定を定義します。Debezium MySQL Connectorはバイナリログを読み取り、変更イベントをKafkaトピックに配信します。topic.prefix

database.include.list

table.include.list

schema.history.internal.kafka.topic

read.only: "true"

apiVersion: kafka.strimzi.io/v1 kind: KafkaConnector metadata: name: debezium-mysql-connector namespace: debezium-kafka labels: strimzi.io/cluster: debezium-kafka-connect spec: class: io.debezium.connector.mysql.MySqlConnector # 使用するコネクタプラグインのJavaクラス名 config: # MySQL接続設定(Kubernetes Secret参照) database.hostname: ${secrets:debezium-kafka/mysql-credentials:host} database.port: ${secrets:debezium-kafka/mysql-credentials:port} database.user: ${secrets:debezium-kafka/mysql-credentials:username} database.password: ${secrets:debezium-kafka/mysql-credentials:password} database.server.id: "1001" read.only: "true" # テーブルロックを使わずGTIDベースでスナップショットを取得 # CDC対象の設定 topic.prefix: mysql-server database.include.list: sample_db table.include.list: sample_db.sample_table_1,sample_db.sample_table_2 # スキーマ履歴の保存設定 schema.history.internal.kafka.bootstrap.servers: debezium-kafka-kafka-bootstrap:9092 schema.history.internal.kafka.topic: schema-changes.sample_db include.schema.changes: "false" # DDL変更イベント配信の無効化 # Incremental Snapshot用シグナル設定 signal.enabled.channels: kafka signal.kafka.topic: debezium-signal signal.kafka.bootstrap.servers: debezium-kafka-kafka-bootstrap:9092 signal.kafka.groupId: debezium-mysql-signal-consumer

Snowflake Sink Connectorの作成

Snowflake Sink Connectorは4-3で作成したステージングテーブルに対してKafkaトピックをマッピングしメッセージを書き込みます。メッセージはステージングテーブルのRECORD_METADATA

RECORD_METADATA

snowflake.enable.schematization

{"id": 1, "name": "foo"}

snowflake.enable.schematization: false

value.converter

snowflake.enable.schematization: false

snowflake.ingestion.method

apiVersion: kafka.strimzi.io/v1 kind: KafkaConnector metadata: name: snowflake-sink-connector namespace: debezium-kafka labels: strimzi.io/cluster: debezium-kafka-connect spec: class: com.snowflake.kafka.connector.SnowflakeSinkConnector # 使用するコネクタプラグインのJavaクラス名 tasksMax: 3 # Kafka Connectのreplicas数およびパーティション数に合わせている config: # Snowflake接続設定(Kubernetes Secret参照) snowflake.url.name: ${secrets:debezium-kafka/snowflake-credentials:url} snowflake.user.name: ${secrets:debezium-kafka/snowflake-credentials:user} snowflake.private.key: ${secrets:debezium-kafka/snowflake-credentials:private_key} snowflake.private.key.passphrase: ${secrets:debezium-kafka/snowflake-credentials:private_key_passphrase} snowflake.database.name: ${secrets:debezium-kafka/snowflake-credentials:database} snowflake.schema.name: ${secrets:debezium-kafka/snowflake-credentials:schema} snowflake.role.name: ${secrets:debezium-kafka/snowflake-credentials:role} # データ転送の設定 topics.regex: "mysql-server\\.sample_db\\..*" # 転送対象トピックを正規表現で指定 snowflake.topic2table.map: "mysql-server.sample_db.sample_table_1:SAMPLE_TABLE_1_STAGING,..." # トピックとSnowflakeテーブルのマッピング snowflake.ingestion.method: SNOWPIPE_STREAMING snowflake.enable.schematization: false # CDCイベントをVARIANT型のままステージングテーブルに格納 # メッセージのkey/valueのシリアライズ形式を指定 key.converter: org.apache.kafka.connect.json.JsonConverter value.converter: org.apache.kafka.connect.json.JsonConverter # エラーハンドリング errors.tolerance: all errors.log.enable: true errors.log.include.messages: true errors.deadletterqueue.topic.name: dlq-snowflake # DLQトピックの指定 errors.deadletterqueue.context.headers.enable: true

以下のコマンドでKafkaConnectorのステータスを確認します。

$ kubectl get kafkaconnector NAME CLUSTER CONNECTOR CLASS MAX TASKS READY debezium-mysql-connector debezium-kafka-connect io.debezium.connector.mysql.MySqlConnector True snowflake-sink-connector debezium-kafka-connect com.snowflake.kafka.connector.SnowflakeSinkConnector 3 True

構築した環境でMySQLへの変更がSnowflakeに反映されることを確認します。

MySQLでレコードを挿入します。

INSERT INTO sample_table (id, name) VALUES (1, 'test');

ステージングテーブルのRECORD_CONTENTカラムには、以下のjson形式のデータが格納されます(主要なキーと値以外は省略)。INSERTイベントではbefore

{ "after": { "created_at": null, "id": 1, "name": "test", "updated_at": null }, "before": null, "op": "c", "source": { "connector": "mysql", ... }, ... }

Snowflakeのステージングテーブルにレコードが挿入されタスクの実行が完了すると、本テーブルにもレコードが挿入されていることが確認できます。

MySQLでレコードを更新します。

UPDATE sample_table SET name = 'test_updated' WHERE id = 1;

ステージングテーブルのRECORD_CONTENTカラムには、以下のjson形式のデータが格納されます。UPDATEイベントではbefore

{ "after": { "created_at": null, "id": 1, "name": "test_updated", "updated_at": null }, "before": { "created_at": null, "id": 1, "name": "test", "updated_at": null }, "op": "u", "source": { "connector": "mysql", ... }, ... }

Snowflakeのステージングテーブルにレコードが挿入されタスクの実行が完了すると、本テーブルのレコードが更新されていることが確認できます。

MySQLでレコードを削除します。

DELETE FROM sample_table WHERE id = 1;

ステージングテーブルのRECORD_CONTENTカラムには、以下のjson形式のデータが格納されます。DELETEイベントではafter

COALESCE(RECORD_CONTENT:after:id, RECORD_CONTENT:before:id)

{ "after": null, "before": { "created_at": null, "id": 1, "name": "test_updated", "updated_at": null }, "op": "d", "source": { "connector": "mysql", ... }, ... }

Snowflakeのステージングテーブルにレコードが挿入されタスクの実行が完了すると、本テーブルのレコードが削除されていることが確認できます。

INSERT・UPDATE・DELETEがすべて正しく本テーブルに反映されていれば動作確認完了です。なおTaskはStreamにデータが存在する場合のみ実行されるため、Taskに設定したスケジュール間隔分の遅延が発生します。

CDC対象テーブルを追加する手順を説明します。新たにsample_table_newを追加する場合を例に説明します。

Incremental Snapshotについて

テーブルを追加する際にはIncremental Snapshotの実行が必要です。DebeziumのInitial SnapshotはConnectorの初回起動時に一度だけ実行される仕組みのため、後からtable.include.list

Incremental Snapshotのトリガーには構築手順で設定した以下の3点が関わっています。

GTID有効化: 通常のIncremental SnapshotはDBへの書き込み権限を必要とします。read.only: "true"

KafkaConnectorのシグナル関連設定: Incremental SnapshotはKafkaのシグナルトピックにメッセージを送ることでトリガーします。signal.enabled.channels: kafka

signal.kafka.topic: debezium-signal

シグナルトピック:KafkaTopicで作成したdebezium-signal

Snowflakeリソースの作成

構築手順のSnowflake設定と同様に、ステージングテーブル・Stream・Task・本テーブルをTerraformで追加します。

KafkaConnectorの更新

新テーブル用のKafkaTopicリソースを追加します。あわせてDebezium MySQL ConnectorのKafkaConnectorリソースのtable.include.list

table.include.list: sample_db.sample_table_1,sample_db.sample_table_2,sample_db.sample_table_new

Snowflake Sink ConnectorのKafkaConnectorリソースのsnowflake.topic2table.map

snowflake.topic2table.map: "mysql-server.sample_db.sample_table_1:SAMPLE_TABLE_1_STAGING,...,mysql-server.sample_db.sample_table_new:SAMPLE_TABLE_NEW_STAGING"

Incremental Snapshotの実行

以下のコマンドでシグナルトピックにメッセージを送信してIncremental Snapshotをトリガーします。なおメッセージのキーはtopic.prefix

kubectl exec -it debezium-kafka-kafka-0 -n debezium-kafka -- \ bin/kafka-console-producer.sh \ --bootstrap-server localhost:9092 \ --topic debezium-signal \ --property "parse.key=true" \ --property "key.separator=:" <<EOF mysql-server:{"type":"execute-snapshot","data":{"data-collections":["sample_db.sample_t

この記事をシェア

関連記事

InfoQ★42026年3月21日 19:01

QCon London 2026:Tansu.ioの紹介 - リーン運用のためのKafka再考

Peter MorganがQCon Londonで、オープンソースでKafka互換、ステートレス、リーダーレス、ゼロスケーリング可能なブローカー「Tansu」を発表した。Rust製で、20MBのRAM使用、10ミリ秒起動、プラガブルストレージ、ブローカー側スキーマ検証、Iceberg/Delta Lakeへの直接書き込みを特徴とする。

InfoQ★32026年4月17日 21:00

CNCF、KubernetesだけではLLMワークロードのセキュリティ確保に不十分と警告

CNCFが、Kubernetes上でのLLM展開には、AIシステムの動作を理解・制御できないという根本的なセキュリティギャップがあると指摘した。

NVIDIA Developer Blog★32026年4月10日 02:00

Kubernetes上でSlurmを使用した大規模GPUワークロードの実行

NVIDIAが、オープンソースのクラスタ管理システムSlurmをKubernetesと統合し、大規模GPUワークロードを効率的に管理・スケジューリングする方法を紹介している。SlurmはTOP500システムの65%以上で採用されている実績を持つ。

ニュース一覧に戻る元記事を読む