Stream CDC data with Amazon Redshift streaming, Amazon MSK and Debezium Connector

Episode 2: Configuring Debezium Connector for Reliable CDC

 

Introduction

In the previous episode, I covered the overall architecture and infrastructure setup for our CDC streaming pipeline. Now I’ll dive deep into Debezium — the open-source platform that captures row-level database changes in real-time and streams them to Kafka topics through MSK Connect.

This episode focuses on the practical implementation, including configuring Debezium connectors, resolving real-world data type challenges, and deploying a production-ready CDC solution.

In This Episode, I’ll Cover

  • MSK Prerequisites: Essential cluster configuration for topic auto-creation
  • Debezium Configuration: Complete connector setup with key parameters
  • Debezium Message Structure: Understanding Debezium’s JSON event format
  • Debezium Data Type Challenges: Solving decimal and datetime field issues
  • Deployment Architecture: MSK Connect setup with custom plugins and scaling
  • Configuration Management: Handling deployment challenges and solutions

Prerequisites: MSK Cluster Configuration

Before configuring Debezium, you must first set up MSK to allow automatic topic creation. By default, MSK doesn’t allow connectors to create topics automatically, which Debezium requires for its internal topics (schema history, offsets, etc.).

resource "aws_msk_configuration" "msk_config_auto_create_topics" {
  name              = "auto-create-topics-config"
  kafka_versions    = [var.msk_kafka_version]
  server_properties = <<PROPERTIES
auto.create.topics.enable = true
delete.topic.enable = true
PROPERTIES
}

resource "aws_msk_cluster" "this" {
  cluster_name           = "streaming-pipeline-msk-cluster"
  kafka_version          = var.msk_kafka_version
  number_of_broker_nodes = var.msk_broker_count
  
  broker_node_group_info {
    instance_type   = var.msk_broker_instance_type
    client_subnets  = module.vpc.private_subnets
    security_groups = [aws_security_group.msk_sg.id]
  }
  
  encryption_info {
    encryption_in_transit {
      client_broker = "TLS"
      in_cluster    = true
    }
  }

client_authentication {
    sasl {
      iam = true
    }
  }
  configuration_info {
    arn      = aws_msk_configuration.msk_config_auto_create_topics.arn
    revision = aws_msk_configuration.msk_config_auto_create_topics.latest_revision
  }
}

Key Configuration Properties:

  • auto.create.topics.enable = true: Allows Debezium to automatically create required topics for schema history and offsets
  • delete.topic.enable = true: Enables topic deletion for cleanup operations during development

Without this configuration, the Debezium connector deployment won’t be able to create topics.

Debezium Connector Configuration

Debezium provides a comprehensive set of configuration options for MySQL connectors. Let me walk through the key configurations I implemented and explain their purpose.

Core Connector Configuration

I deployed Debezium using MSK Connect, which provides a managed environment for running Kafka Connect connectors. Here’s the complete configuration:

resource "aws_mskconnect_connector" "debezium_mysql" {
  name                 = "debezium-mysql-connector"
  kafkaconnect_version = "3.7.x"
  
  connector_configuration = {
    # Core connector settings
    "connector.class"        = "io.debezium.connector.mysql.MySqlConnector"
    "tasks.max"              = "1"
    "include.schema.changes" = "true"
    "topic.prefix"           = "YOUR-TOPIC-PREFIX"
    
    # Data format configuration
    "value.converter"        = "org.apache.kafka.connect.json.JsonConverter"
    "key.converter"          = "org.apache.kafka.connect.storage.StringConverter"
    "decimal.handling.mode"  = "string"
    
    # Database connection settings
    "database.user"          = "YOUR-DB-USER"
    "database.password"      = "YOUR-DB-PASSWORD"
    "database.server.id"     = "12345"
    "database.server.name"   = "YOUR-SERVER-NAME"
    "database.hostname"      = "YOUR-AURORA-ENDPOINT"
    "database.port"          = "3306"
    
    # Schema and converter settings
    "key.converter.schemas.enable"   = "false"
    "value.converter.schemas.enable" = "false"
    
    # Table and database filtering
    "table.include.list"    = "YOUR-DATABASE.table1,YOUR-DATABASE.table2"
    "database.include.list" = "YOUR-DATABASE"
    
    # Snapshot configuration
    "snapshot.mode" = "initial"
    
    # Schema history configuration
    "schema.history.internal.kafka.topic"             = "internal.dbhistory.YOUR-TOPIC-PREFIX"
    "schema.history.internal.kafka.bootstrap.servers" = "YOUR-MSK-BOOTSTRAP-SERVERS"
    
    # IAM authentication for schema history topic
    "schema.history.internal.producer.sasl.mechanism" = "AWS_MSK_IAM"
    "schema.history.internal.consumer.sasl.mechanism" = "AWS_MSK_IAM"
    "schema.history.internal.producer.sasl.jaas.config" = "software.amazon.msk.auth.iam.IAMLoginModule required;"
    "schema.history.internal.consumer.sasl.jaas.config" = "software.amazon.msk.auth.iam.IAMLoginModule required;"
    "schema.history.internal.producer.sasl.client.callback.handler.class" = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
    "schema.history.internal.consumer.sasl.client.callback.handler.class" = "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
    "schema.history.internal.consumer.security.protocol" = "SASL_SSL"
    "schema.history.internal.producer.security.protocol" = "SASL_SSL"
  }
}

Key Configuration Parameters

Snapshot Mode Configuration

The snapshot.mode parameter is crucial for determining how Debezium handles initial data capture. Here are the different options:

"snapshot.mode" = "initial" – Performs a full snapshot of all specified tables before starting to stream changes (recommended for new deployments).

"snapshot.mode" = "no_data" – Captures only the schema without historical data and starts streaming from the current binlog position immediately.

"snapshot.mode" = "when_needed" – Performs snapshot only if no previous offset exists, good for connector restarts.

"snapshot.mode" = "never" – Never performs snapshots, only streams changes (fastest startup but requires existing offset information).

Table and Database Filtering

You can control which databases and tables are included in CDC using these filtering configurations:

"database.include.list" = "YOUR-DATABASE" – Specifies which databases to monitor for changes (comma-separated list for multiple databases).

"table.include.list" = "YOUR-DATABASE.table1,YOUR-DATABASE.table2" – Defines specific tables to capture changes from, using the format database.table (reduces processing overhead by excluding unnecessary tables).

These filters are essential for:

  • Performance optimization: Only capture changes from tables you actually need
  • Security: Exclude sensitive tables from CDC streams
  • Cost reduction: Minimize MSK topic storage and processing costs
  • Compliance: Control which data is replicated to downstream systems

Data Type Handling

"decimal.handling.mode" = "string" This configuration was critical for solving decimal field issues. By default, Debezium encodes high-precision decimals as binary to preserve exact precision, but this causes problems in Redshift. Setting it to “string” makes Debezium send readable decimal strings that can be properly cast in Redshift.

"include.schema.changes" = "true" Captures DDL changes (CREATE, ALTER, DROP statements) in addition to DML changes, providing complete database evolution tracking.

Topic and Naming Configuration

"topic.prefix" = "YOUR-TOPIC-PREFIX" This prefix is used for all Kafka topics created by Debezium. Topics follow the pattern: {topic.prefix}.{database}.{table}. Choose a meaningful prefix that identifies your data source.

"database.server.name" = "YOUR-SERVER-NAME" A logical identifier for the MySQL server. This becomes part of the topic naming and should be unique across your Debezium deployments.

MSK Connect Worker Configuration

I also configured the MSK Connect worker to optimize performance and data handling:

resource "aws_mskconnect_worker_configuration" "debezium_worker" {
  name                    = "debezium-worker-config"
  description             = "Worker configuration for Debezium MySQL source connector"
  properties_file_content = <<-EOT
    key.converter=org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable=false
    value.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable=false
    offset.storage.topic=offsets_my_debezium_source_connector
  EOT
}

This configuration ensures consistent data formatting across all connectors and disables schema information in the messages to reduce payload size.

Understanding Debezium Message Structure

Before diving into the challenges, it’s important to understand the structure of CDC events that Debezium produces. Each change event follows a standardized JSON format that provides comprehensive information about the database change. For detailed documentation on the Debezium JSON format, refer to the official Debezium JSON format documentation.

{
  "before": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.18
  },
  "after": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.15
  },
  "source": {
    "version": "2.4.0.Final",
    "connector": "mysql",
    "name": "YOUR-SERVER-NAME",
    "ts_ms": 1589362330904,
    "snapshot": "false",
    "db": "inventory",
    "sequence": null,
    "table": "products",
    "server_id": 223344,
    "gtid": null,
    "file": "mysql-bin.000003",
    "pos": 484,
    "row": 0,
    "thread": null,
    "query": null
  },
  "op": "u",
  "ts_ms": 1589362330904,
  "transaction": null
}

Message Structure Breakdown

"before" – Contains the row data before the change occurred (null for INSERT operations)

"after" – Contains the row data after the change occurred (null for DELETE operations)

"source" – Provides metadata about the change event, including:

  • Database and table information
  • Binlog position for tracking
  • Timestamp when the change occurred
  • Connector version and configuration details

"op" – Operation type indicator:

  • "c" = CREATE (INSERT)
  • "u" = UPDATE
  • "d" = DELETE
  • "r" = READ (initial snapshot)

"ts_ms" – Timestamp in milliseconds when Debezium processed the event

"transaction" – Transaction metadata (if available)

This structure allows downstream consumers like Redshift to understand exactly what changed, when it changed, and how to process the data accordingly. The before and after fields are particularly useful for implementing upsert logic and tracking data evolution over time.

Testing and Validation: During development, I was able to directly examine the structure and format of JSON CDC events by creating a testing consumer and checking the ingestion of CDC events via the bastion host. This approach allowed me to validate the data format and identify the data type issues described in the challenges section. I’ll elaborate on the testing consumer setup and validation techniques in more detail in a future episode.

Challenges and Solutions

While Debezium’s configuration appears straightforward, I encountered several critical challenges during implementation that required specific solutions. These issues are common in CDC pipelines, and understanding them will save you significant troubleshooting time.

Data Type Handling Challenges

During implementation, I discovered two major data type handling issues that affected data quality in Redshift. These challenges highlight the importance of understanding how Debezium processes different MySQL data types.

Challenge 1: Decimal Fields Encoded as Base64

Problem: Debezium converts high-precision decimal fields to Base64-encoded binary strings, causing NaN values in Redshift.

Example:

  • MySQL: order_price: 40.123
  • CDC event: "order_price": "Plo3FQ9H6dUAAA=="
  • Redshift result: NaN

Solution: Configure Debezium to send decimals as strings:

"decimal.handling.mode" = "string"

Result:

  • CDC event: "order_price": "40.123456"
  • Redshift casting: payload.after.order_price::DECIMAL(26,22) ✓

Challenge 2: DateTime Fields as Epoch Milliseconds

Problem: Debezium converts MySQL DATETIME fields to epoch milliseconds, which Redshift treats as literal numbers instead of timestamps.

Example:

  • MySQL: purchase_date: "2017-09-09 16:20:41"
  • CDC event: "purchase_date": 1753279941000
  • Redshift casting: ::TIMESTAMP → incorrect result

Solution: Convert epoch milliseconds to proper timestamps in Redshift stored procedures:

-- Before (Incorrect)
payload.after.purchase_date::TIMESTAMP
-- After (Correct)  
DATEADD(second, payload.after.purchase_date::BIGINT / 1000, '1970-01-01 00:00:00'::TIMESTAMP)

Result: 2017-09-09 16:20:41

Note: Mode details to come about Redshift setup in the next episode.

Configuration Management Challenges

Beyond data type issues, I also encountered operational challenges during development that are important to understand for production deployments.

Challenge 3: Configuration Changes Not Taking Effect

Problem: During development, I encountered situations where configuration changes to the Debezium connector were not taking effect, even after applying them through Terraform or the AWS Console.

Root Cause: MSK Connect connectors sometimes cache configuration or fail to reload certain parameters without a complete restart.

Solution: For critical configuration changes during development, I had to destroy and recreate the connector.

Production Deployment Architecture

Now that we’ve covered the configuration and challenges, let’s look at how to deploy Debezium in a production-ready environment using MSK Connect. This section covers the complete deployment architecture from plugin management to scaling and security.

Custom Plugin Configuration

Debezium requires a custom plugin to be deployed to MSK Connect. I implemented this through a three-step process using Terraform:

Step 1: Provision S3 Bucket for Plugin Storage

module "debezium_plugin_s3_bucket" {
  source = "terraform-aws-modules/s3-bucket/aws"
  
  bucket = "YOUR-DEBEZIUM-PLUGIN-BUCKET"
  
  # Enable versioning for plugin management
  versioning = {
    enabled = true
  }
  
  # Block public access for security
  block_public_acls       = true
  block_public_policy     = true
  ignore_public_acls      = true
  restrict_public_buckets = true
}

Step 2: Upload Debezium Plugin to S3 After provisioning the S3 bucket, I manually uploaded the Debezium MySQL connector plugin ZIP file. You can download the official plugin from the Debezium releases page (e.g., debezium-connector-mysql-2.4.0.Final-plugin.zip).

Step 3: Create MSK Connect Custom Plugin

resource "aws_mskconnect_custom_plugin" "debezium" {
  name         = "debezium-mysql-plugin"
  content_type = "ZIP"
  location {
    s3 {
      bucket_arn = module.debezium_plugin_s3_bucket.s3_bucket_arn
      file_key   = "debezium/debezium-connector-mysql-3.1.2.Final-plugin.zip"
    }
  }  depends_on = [module.debezium_plugin_s3_bucket]
}

The plugin ZIP file contains all necessary Debezium MySQL connector JAR files and dependencies. The MSK Connect custom plugin references the uploaded file in S3, making it available for connector deployments.

Capacity and Scaling Configuration

I configured MSK Connect with autoscaling to handle varying workloads:

capacity {
  autoscaling {
    max_worker_count = 2
    mcu_count        = 1
    min_worker_count = 1
    scale_in_policy {
      cpu_utilization_percentage = 20
    }
    scale_out_policy {
      cpu_utilization_percentage = 80
    }
  }
}

This configuration:

  • Starts with 1 worker (minimum cost)
  • Scales up to 2 workers when CPU exceeds 80%
  • Scales down when CPU drops below 20%
  • Each worker has 1 MCU (MSK Connect Unit) providing 1 vCPU and 4GB RAM

Security and IAM Configuration

The MSK Connect service requires comprehensive IAM permissions to interact with MSK, Secrets Manager, and CloudWatch:

resource "aws_iam_policy" "msk_connect_debezium" {
  name        = "msk-connect-debezium-policy"
  description = "Policy for MSK Connect Debezium connector"
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Sid    = "KafkaAccess"
        Effect = "Allow"
        Action = [
          "kafka-cluster:Connect",
          "kafka-cluster:AlterCluster",
          "kafka-cluster:DescribeCluster"
        ]
        Resource = "arn:aws:kafka:REGION:ACCOUNT:cluster/YOUR-MSK-CLUSTER/*"
      },
      {
        Sid    = "KafkaTopicAccess"
        Effect = "Allow"
        Action = [
          "kafka-cluster:DescribeTopic",
          "kafka-cluster:WriteData",
          "kafka-cluster:CreateTopic",
          "kafka-cluster:ReadData"
        ]
        Resource = "arn:aws:kafka:REGION:ACCOUNT:topic/YOUR-MSK-CLUSTER/*"
      },
      {
        Sid    = "SecretsAccess"
        Effect = "Allow"
        Action = [
          "secretsmanager:GetSecretValue",
          "secretsmanager:DescribeSecret"
        ]
        Resource = "arn:aws:secretsmanager:REGION:ACCOUNT:secret:YOUR-DB-CREDENTIALS/*"
      }
    ]
  })
}

Conclusion

Successfully implementing Debezium for reliable CDC requires understanding both the configuration details and the real-world challenges you’ll encounter. The Debezium connector now reliably captures changes from Aurora MySQL and streams them to MSK topics in a format that Redshift can properly consume. We’ve solved the critical data type challenges and established a robust deployment architecture that can scale with your data volume.

Next Steps

In Episode 3, I’ll cover setting up a bastion host for secure access to MSK topics, enabling you to inspect CDC events, validate data formats, and troubleshoot your Debezium implementation during development.

Next Steps

This article is part of a comprehensive three-part series on implementing CDC streaming with Amazon Redshift, MSK, and Debezium:

Episode 1: Designing the End-to-End CDC Architecture — Learn about the overall architecture design, infrastructure setup, and foundational components, including Aurora MySQL, Amazon MSK, Redshift Serverless, and network security configurations.

Episode 2: Configuring Debezium Connector for Reliable CDC (Current Episode) Deep dive into Debezium configuration, data type challenges, deployment architecture, and production best practices.

Episode 3: Redshift Serverless Streaming Ingestion (Coming Soon) Comprehensive coverage of Redshift Serverless streaming ingestion configuration, performance optimization, and real-time analytics implementation.

Author

Noor Sabahi | Senior AI & Cloud Engineer | AWS Ambassador

#Debezium #CDC #MSKConnect #DebeziumConfiguration #AuroraMySQL #MSK #AWS #DataStreaming #Kafka