IaC AWS

Terraform Module: AWS MSK (Kafka) — production-grade Kafka clusters without the YAML sprawl

Quick take — Provision Amazon MSK (managed Apache Kafka) with Terraform: a reusable aws_msk_cluster module with encryption, IAM/TLS auth, broker autoscaling storage, and CloudWatch/S3 logging baked in. New here? Jump to the Quickstart below to deploy it in minutes; read on for how it works and when to reach for it.

Quickstart (copy-paste)

Minimal, runnable configuration — drop this in a .tf file and fill in the "..." placeholders (each required input is commented):

provider "aws" {
  region = "us-east-1"
}

module "msk" {
  source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-aws-msk?ref=v1.0.0"

  cluster_name       = "..."           # Cluster name; also derives configuration and log group …
  client_subnet_ids  = ["...", "..."]  # 2 or 3 private subnet IDs, one per AZ, for broker ENIs.
  security_group_ids = ["...", "..."]  # Security groups on broker ENIs (allow 9092/9094/9098).
}

Then terraform init && terraform apply. Every other input has a sensible default — see Inputs below to override behaviour.

What this module is

Amazon MSK (Managed Streaming for Apache Kafka) runs the open-source Kafka broker software for you: AWS owns the control plane, ZooKeeper/KRaft metadata, patching, and broker provisioning, while you keep the wire-compatible Kafka API your producers and consumers already speak. A “cluster” in MSK is a set of broker nodes spread across subnets/AZs, fronted by a bootstrap-broker endpoint that clients connect to.

The problem is that a correct aws_msk_cluster is deceptively wide. You have to wire up a broker_node_group_info block (instance type, subnets, security groups, EBS storage), encryption_info (at-rest KMS key + in-transit TLS mode), a client_authentication block (IAM SASL, mutual TLS, or SCRAM), a logging_info fan-out to CloudWatch/S3/Firehose, an open_monitoring block for Prometheus JMX/node exporters, and very often a separate aws_msk_configuration carrying the server.properties overrides. Hand-rolling that per environment is exactly how you end up with one cluster encrypting in-transit traffic and another silently allowing PLAINTEXT.

This module wraps aws_msk_cluster (plus a managed aws_msk_configuration) behind a small, opinionated variable surface. Encryption in transit is forced on, storage autoscaling is available out of the box, and authentication mode is a single toggle — so every cluster your platform team stamps out is consistent, observable, and secure by default.

When to use it

Reach for MSK Serverless (a different resource, aws_msk_serverless_cluster) instead when traffic is spiky/low and you do not want to size brokers — this module is for provisioned MSK where you control instance type and storage. Use Kinesis Data Streams if you do not actually need the Kafka API and want pure pay-per-shard.

Module structure

terraform-module-aws-msk/
├── versions.tf      # provider + Terraform version pins
├── main.tf          # aws_msk_configuration + aws_msk_cluster + log group
├── variables.tf     # var-driven inputs with validations
└── outputs.tf       # cluster arn, bootstrap brokers, zookeeper string

versions.tf

terraform {
  required_version = ">= 1.5.0"

  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
  }
}

main.tf

locals {
  # Bump this revision when you change server_properties so MSK applies it.
  configuration_name = "${var.cluster_name}-config"

  default_server_properties = <<-PROPERTIES
    auto.create.topics.enable=false
    default.replication.factor=${min(3, length(var.client_subnet_ids))}
    min.insync.replicas=${var.minimum_in_sync_replicas}
    num.partitions=3
    log.retention.hours=${var.log_retention_hours}
    unclean.leader.election.enable=false
  PROPERTIES

  # CloudWatch log group is only created (and wired) when CW logging is on.
  create_log_group = var.broker_logs_cloudwatch_enabled
}

# Optional managed cluster-level Kafka configuration (server.properties).
resource "aws_msk_configuration" "this" {
  name           = local.configuration_name
  kafka_versions = [var.kafka_version]

  server_properties = coalesce(var.server_properties, local.default_server_properties)

  lifecycle {
    create_before_destroy = true
  }
}

resource "aws_cloudwatch_log_group" "broker" {
  count = local.create_log_group ? 1 : 0

  name              = "/aws/msk/${var.cluster_name}"
  retention_in_days = var.cloudwatch_log_retention_days
  kms_key_id        = var.cloudwatch_log_kms_key_arn
  tags              = var.tags
}

resource "aws_msk_cluster" "this" {
  cluster_name           = var.cluster_name
  kafka_version          = var.kafka_version
  number_of_broker_nodes = var.number_of_broker_nodes

  broker_node_group_info {
    instance_type   = var.broker_instance_type
    client_subnets  = var.client_subnet_ids
    security_groups = var.security_group_ids

    storage_info {
      ebs_storage_info {
        volume_size = var.broker_ebs_volume_size

        # In-place storage autoscaling so retention growth never pages you.
        dynamic "provisioned_throughput" {
          for_each = var.broker_ebs_provisioned_throughput_mibps != null ? [1] : []
          content {
            enabled           = true
            volume_throughput = var.broker_ebs_provisioned_throughput_mibps
          }
        }
      }
    }
  }

  configuration_info {
    arn      = aws_msk_configuration.this.arn
    revision = aws_msk_configuration.this.latest_revision
  }

  encryption_info {
    # null => AWS-managed CMK; pass your own KMS key ARN for full control.
    encryption_at_rest_kms_key_arn = var.encryption_at_rest_kms_key_arn

    encryption_in_transit {
      # Force TLS on the wire; in-cluster broker traffic also encrypted.
      client_broker = var.encryption_in_transit_client_broker
      in_cluster    = true
    }
  }

  dynamic "client_authentication" {
    for_each = var.iam_auth_enabled || var.tls_certificate_authority_arns != null ? [1] : []
    content {
      dynamic "sasl" {
        for_each = var.iam_auth_enabled ? [1] : []
        content {
          iam = true
        }
      }
      dynamic "tls" {
        for_each = var.tls_certificate_authority_arns != null ? [1] : []
        content {
          certificate_authority_arns = var.tls_certificate_authority_arns
        }
      }
      # When IAM auth is on, fall back to no unauthenticated access.
      unauthenticated = false
    }
  }

  logging_info {
    broker_logs {
      cloudwatch_logs {
        enabled   = var.broker_logs_cloudwatch_enabled
        log_group = local.create_log_group ? aws_cloudwatch_log_group.broker[0].name : null
      }

      dynamic "s3" {
        for_each = var.broker_logs_s3_bucket != null ? [1] : []
        content {
          enabled = true
          bucket  = var.broker_logs_s3_bucket
          prefix  = var.broker_logs_s3_prefix
        }
      }
    }
  }

  dynamic "open_monitoring" {
    for_each = var.enhanced_monitoring_prometheus ? [1] : []
    content {
      prometheus {
        jmx_exporter {
          enabled_in_broker = true
        }
        node_exporter {
          enabled_in_broker = true
        }
      }
    }
  }

  enhanced_monitoring = var.enhanced_monitoring_level

  tags = var.tags

  # Storage autoscaling and config revisions can be applied in place; do not
  # let an unrelated plan tear the cluster down.
  lifecycle {
    ignore_changes = [broker_node_group_info[0].storage_info[0].ebs_storage_info[0].volume_size]
  }
}

variables.tf

variable "cluster_name" {
  description = "Name of the MSK cluster (also used to derive the configuration and log group names)."
  type        = string

  validation {
    condition     = can(regex("^[A-Za-z][A-Za-z0-9-]{0,63}$", var.cluster_name))
    error_message = "cluster_name must start with a letter and contain only letters, numbers, and hyphens (max 64 chars)."
  }
}

variable "kafka_version" {
  description = "Apache Kafka version for the cluster (e.g. 3.6.0, 3.7.x)."
  type        = string
  default     = "3.6.0"
}

variable "number_of_broker_nodes" {
  description = "Total number of broker nodes. Must be a multiple of the number of client subnets (AZs)."
  type        = number
  default     = 3

  validation {
    condition     = var.number_of_broker_nodes >= 2
    error_message = "number_of_broker_nodes must be at least 2 for a usable cluster."
  }
}

variable "client_subnet_ids" {
  description = "Private subnet IDs (one per AZ, 2 or 3) where broker ENIs are placed."
  type        = list(string)

  validation {
    condition     = length(var.client_subnet_ids) >= 2 && length(var.client_subnet_ids) <= 3
    error_message = "MSK requires 2 or 3 client subnets, one per Availability Zone."
  }
}

variable "security_group_ids" {
  description = "Security group IDs attached to broker ENIs (must allow Kafka ports 9092/9094/9098 from clients)."
  type        = list(string)
}

variable "broker_instance_type" {
  description = "EC2 instance type for brokers (e.g. kafka.m7g.large, kafka.m5.large)."
  type        = string
  default     = "kafka.m7g.large"

  validation {
    condition     = startswith(var.broker_instance_type, "kafka.")
    error_message = "broker_instance_type must be an MSK broker type and start with 'kafka.'."
  }
}

variable "broker_ebs_volume_size" {
  description = "Per-broker EBS volume size in GiB for log storage."
  type        = number
  default     = 100

  validation {
    condition     = var.broker_ebs_volume_size >= 1 && var.broker_ebs_volume_size <= 16384
    error_message = "broker_ebs_volume_size must be between 1 and 16384 GiB."
  }
}

variable "broker_ebs_provisioned_throughput_mibps" {
  description = "Optional provisioned EBS throughput per broker in MiB/s (requires m5.4xlarge+ class). Null disables it."
  type        = number
  default     = null
}

variable "encryption_at_rest_kms_key_arn" {
  description = "Customer-managed KMS key ARN for at-rest encryption. Null uses the AWS-managed MSK key."
  type        = string
  default     = null
}

variable "encryption_in_transit_client_broker" {
  description = "In-transit mode between clients and brokers: TLS, TLS_PLAINTEXT, or PLAINTEXT."
  type        = string
  default     = "TLS"

  validation {
    condition     = contains(["TLS", "TLS_PLAINTEXT", "PLAINTEXT"], var.encryption_in_transit_client_broker)
    error_message = "encryption_in_transit_client_broker must be one of TLS, TLS_PLAINTEXT, or PLAINTEXT."
  }
}

variable "iam_auth_enabled" {
  description = "Enable IAM (SASL/IAM) client authentication and access control."
  type        = bool
  default     = true
}

variable "tls_certificate_authority_arns" {
  description = "List of ACM Private CA ARNs for mutual TLS client auth. Null disables mTLS."
  type        = list(string)
  default     = null
}

variable "minimum_in_sync_replicas" {
  description = "min.insync.replicas applied via the managed configuration (durability vs. availability tradeoff)."
  type        = number
  default     = 2
}

variable "log_retention_hours" {
  description = "Default log.retention.hours applied to topics via the managed configuration."
  type        = number
  default     = 168
}

variable "server_properties" {
  description = "Full server.properties override. When null, a sane default block is generated from other vars."
  type        = string
  default     = null
}

variable "enhanced_monitoring_level" {
  description = "CloudWatch metrics granularity: DEFAULT, PER_BROKER, PER_TOPIC_PER_BROKER, or PER_TOPIC_PER_PARTITION."
  type        = string
  default     = "PER_BROKER"

  validation {
    condition = contains(
      ["DEFAULT", "PER_BROKER", "PER_TOPIC_PER_BROKER", "PER_TOPIC_PER_PARTITION"],
      var.enhanced_monitoring_level
    )
    error_message = "enhanced_monitoring_level must be a valid MSK enhanced monitoring value."
  }
}

variable "enhanced_monitoring_prometheus" {
  description = "Enable open monitoring with Prometheus JMX and node exporters on brokers."
  type        = bool
  default     = true
}

variable "broker_logs_cloudwatch_enabled" {
  description = "Ship broker logs to a CloudWatch Logs group (created by this module)."
  type        = bool
  default     = true
}

variable "cloudwatch_log_retention_days" {
  description = "Retention in days for the broker CloudWatch Logs group."
  type        = number
  default     = 30
}

variable "cloudwatch_log_kms_key_arn" {
  description = "Optional KMS key ARN to encrypt the broker CloudWatch Logs group."
  type        = string
  default     = null
}

variable "broker_logs_s3_bucket" {
  description = "Optional S3 bucket name for broker log delivery. Null disables S3 logging."
  type        = string
  default     = null
}

variable "broker_logs_s3_prefix" {
  description = "Key prefix for broker logs delivered to S3."
  type        = string
  default     = "msk-broker-logs/"
}

variable "tags" {
  description = "Tags applied to the cluster, configuration log group, and related resources."
  type        = map(string)
  default     = {}
}

outputs.tf

output "cluster_arn" {
  description = "ARN of the MSK cluster."
  value       = aws_msk_cluster.this.arn
}

output "cluster_name" {
  description = "Name of the MSK cluster."
  value       = aws_msk_cluster.this.cluster_name
}

output "bootstrap_brokers_tls" {
  description = "Comma-separated TLS bootstrap broker connection string (port 9094)."
  value       = aws_msk_cluster.this.bootstrap_brokers_tls
}

output "bootstrap_brokers_sasl_iam" {
  description = "Comma-separated SASL/IAM bootstrap broker connection string (port 9098). Empty unless IAM auth is enabled."
  value       = aws_msk_cluster.this.bootstrap_brokers_sasl_iam
}

output "zookeeper_connect_string" {
  description = "ZooKeeper connection string (empty on KRaft-mode clusters)."
  value       = aws_msk_cluster.this.zookeeper_connect_string
}

output "configuration_arn" {
  description = "ARN of the managed MSK configuration applied to the cluster."
  value       = aws_msk_configuration.this.arn
}

output "configuration_latest_revision" {
  description = "Latest revision number of the managed MSK configuration."
  value       = aws_msk_configuration.this.latest_revision
}

How to use it

module "msk_kafka_orders" {
  source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-aws-msk?ref=v1.0.0"

  cluster_name           = "orders-prod"
  kafka_version          = "3.6.0"
  number_of_broker_nodes = 3

  client_subnet_ids  = module.network.private_subnet_ids   # exactly 3, one per AZ
  security_group_ids = [aws_security_group.msk_clients.id]

  broker_instance_type   = "kafka.m7g.large"
  broker_ebs_volume_size = 500

  # Customer-managed key + IAM auth + everything encrypted in transit.
  encryption_at_rest_kms_key_arn      = aws_kms_key.msk.arn
  encryption_in_transit_client_broker = "TLS"
  iam_auth_enabled                    = true

  minimum_in_sync_replicas = 2
  log_retention_hours      = 336

  enhanced_monitoring_level      = "PER_TOPIC_PER_BROKER"
  enhanced_monitoring_prometheus = true
  broker_logs_cloudwatch_enabled = true
  cloudwatch_log_retention_days  = 90

  tags = {
    Environment = "prod"
    Team        = "commerce-platform"
    CostCenter  = "streaming"
  }
}

# Downstream: hand the IAM bootstrap endpoint to a consumer service as config.
resource "aws_ssm_parameter" "kafka_bootstrap" {
  name  = "/orders/prod/kafka/bootstrap-brokers"
  type  = "SecureString"
  value = module.msk_kafka_orders.bootstrap_brokers_sasl_iam
}

# ...and let an ECS task / Lambda reference the ARN for an IAM policy condition.
data "aws_iam_policy_document" "kafka_produce" {
  statement {
    actions   = ["kafka-cluster:Connect", "kafka-cluster:WriteData", "kafka-cluster:DescribeTopic"]
    resources = ["${module.msk_kafka_orders.cluster_arn}/*"]
  }
}

With Terragrunt

Terragrunt keeps this module DRY across environments — define the backend and provider once in a root config, then a thin terragrunt.hcl per environment supplies only the inputs that differ.

1. Root configlive/terragrunt.hcl (inherited by every module):

remote_state {
  backend = "s3"
  generate = { path = "backend.tf", if_exists = "overwrite" }
  config = {
    # ...s3 state bucket/container + key per path...
  }
}

2. Module configlive/prod/msk/terragrunt.hcl:

include "root" {
  path = find_in_parent_folders()
}

terraform {
  source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-aws-msk?ref=v1.0.0"
}

inputs = {
  cluster_name = "..."
  client_subnet_ids = ["...", "..."]
  security_group_ids = ["...", "..."]
}

3. Deploy one environment, or roll out all modules together:

cd live/prod/msk && terragrunt apply        # this module
terragrunt run-all apply                      # every module under live/prod

Why Terragrunt here: the backend and provider live in one place instead of being copy-pasted into every module; inputs is overridden per environment (dev / stage / prod) without forking the module; and run-all orchestrates dependencies across modules. Reach for it once you have more than one environment or more than a handful of modules — for a single stack, the plain Quickstart above is enough.

Inputs

Name Type Default Required Description
cluster_name string Yes Cluster name; also derives configuration and log group names.
kafka_version string “3.6.0” No Apache Kafka version for the cluster.
number_of_broker_nodes number 3 No Total broker nodes; must be a multiple of the AZ/subnet count.
client_subnet_ids list(string) Yes 2 or 3 private subnet IDs, one per AZ, for broker ENIs.
security_group_ids list(string) Yes Security groups on broker ENIs (allow 9092/9094/9098).
broker_instance_type string “kafka.m7g.large” No Broker EC2 instance type (must start with kafka.).
broker_ebs_volume_size number 100 No Per-broker EBS log volume size in GiB (1–16384).
broker_ebs_provisioned_throughput_mibps number null No Optional provisioned EBS throughput per broker (MiB/s).
encryption_at_rest_kms_key_arn string null No Customer-managed KMS key ARN; null uses the AWS-managed MSK key.
encryption_in_transit_client_broker string “TLS” No Client↔broker mode: TLS, TLS_PLAINTEXT, or PLAINTEXT.
iam_auth_enabled bool true No Enable SASL/IAM client authentication and access control.
tls_certificate_authority_arns list(string) null No ACM Private CA ARNs for mutual TLS auth; null disables mTLS.
minimum_in_sync_replicas number 2 No min.insync.replicas written to the managed configuration.
log_retention_hours number 168 No Default topic log.retention.hours in the managed configuration.
server_properties string null No Full server.properties override; null generates a default block.
enhanced_monitoring_level string “PER_BROKER” No CloudWatch metrics granularity for the cluster.
enhanced_monitoring_prometheus bool true No Enable Prometheus JMX + node exporters on brokers.
broker_logs_cloudwatch_enabled bool true No Ship broker logs to a module-created CloudWatch Logs group.
cloudwatch_log_retention_days number 30 No Retention (days) for the broker CloudWatch Logs group.
cloudwatch_log_kms_key_arn string null No Optional KMS key to encrypt the broker log group.
broker_logs_s3_bucket string null No Optional S3 bucket for broker log delivery; null disables it.
broker_logs_s3_prefix string “msk-broker-logs/” No Key prefix for broker logs delivered to S3.
tags map(string) {} No Tags applied to the cluster, configuration, and log group.

Outputs

Name Description
cluster_arn ARN of the MSK cluster.
cluster_name Name of the MSK cluster.
bootstrap_brokers_tls Comma-separated TLS bootstrap broker endpoint (port 9094).
bootstrap_brokers_sasl_iam Comma-separated SASL/IAM bootstrap endpoint (port 9098); empty unless IAM auth is on.
zookeeper_connect_string ZooKeeper connection string (empty on KRaft-mode clusters).
configuration_arn ARN of the managed MSK configuration applied to the cluster.
configuration_latest_revision Latest revision number of the managed configuration.

Enterprise scenario

A retail commerce platform runs Debezium connectors that stream change-data-capture events off their Aurora order database into an orders-prod MSK cluster, where a dozen downstream microservices (inventory, fulfilment, fraud, analytics) consume independently. They standardise every team onto SASL/IAM auth so producer/consumer permissions live in IAM policies scoped to the cluster ARN — no Kafka ACL sprawl, no static credentials — and pin min.insync.replicas=2 with replication factor 3 across three AZs so a single-AZ failure never loses an acknowledged order event. Enhanced PER_TOPIC_PER_BROKER metrics and the Prometheus exporters feed their existing Grafana dashboards, and 500 GiB broker volumes with autoscaling absorb the 14-day retention window required for fraud replay.

Best practices

TerraformAWSMSK (Kafka)ModuleIaC
Need this built for real?

Vinod is a Senior Cloud Architect (22+ yrs) — available for Azure / AWS / GCP architecture, landing zones, and migrations.

Work with me

Comments

Keep Reading