Quick take — Provision Amazon MSK (managed Apache Kafka) with Terraform: a reusable aws_msk_cluster module with encryption, IAM/TLS auth, broker autoscaling storage, and CloudWatch/S3 logging baked in. New here? Jump to the Quickstart below to deploy it in minutes; read on for how it works and when to reach for it.
Quickstart (copy-paste)
Minimal, runnable configuration — drop this in a .tf file and fill in the "..." placeholders (each required input is commented):
provider "aws" {
region = "us-east-1"
}
module "msk" {
source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-aws-msk?ref=v1.0.0"
cluster_name = "..." # Cluster name; also derives configuration and log group …
client_subnet_ids = ["...", "..."] # 2 or 3 private subnet IDs, one per AZ, for broker ENIs.
security_group_ids = ["...", "..."] # Security groups on broker ENIs (allow 9092/9094/9098).
}
Then terraform init && terraform apply. Every other input has a sensible default — see Inputs below to override behaviour.
What this module is
Amazon MSK (Managed Streaming for Apache Kafka) runs the open-source Kafka broker software for you: AWS owns the control plane, ZooKeeper/KRaft metadata, patching, and broker provisioning, while you keep the wire-compatible Kafka API your producers and consumers already speak. A “cluster” in MSK is a set of broker nodes spread across subnets/AZs, fronted by a bootstrap-broker endpoint that clients connect to.
The problem is that a correct aws_msk_cluster is deceptively wide. You have to wire up a broker_node_group_info block (instance type, subnets, security groups, EBS storage), encryption_info (at-rest KMS key + in-transit TLS mode), a client_authentication block (IAM SASL, mutual TLS, or SCRAM), a logging_info fan-out to CloudWatch/S3/Firehose, an open_monitoring block for Prometheus JMX/node exporters, and very often a separate aws_msk_configuration carrying the server.properties overrides. Hand-rolling that per environment is exactly how you end up with one cluster encrypting in-transit traffic and another silently allowing PLAINTEXT.
This module wraps aws_msk_cluster (plus a managed aws_msk_configuration) behind a small, opinionated variable surface. Encryption in transit is forced on, storage autoscaling is available out of the box, and authentication mode is a single toggle — so every cluster your platform team stamps out is consistent, observable, and secure by default.
When to use it
- You are standing up persistent, durable event streams (order events, CDC from Debezium, clickstream, IoT telemetry) and want the Kafka protocol without operating brokers, ZooKeeper, or KRaft yourself.
- You need multi-AZ durability with replication factor 3 and in-place EBS storage that grows as topics retain more data.
- Multiple teams or microservices consume the same topics and you want uniform auth (IAM access control or mTLS) and encryption everywhere enforced by code, not by a wiki page.
- You want JMX/Prometheus metrics scraped into your existing monitoring stack and broker logs landing in CloudWatch or S3 for audit.
Reach for MSK Serverless (a different resource, aws_msk_serverless_cluster) instead when traffic is spiky/low and you do not want to size brokers — this module is for provisioned MSK where you control instance type and storage. Use Kinesis Data Streams if you do not actually need the Kafka API and want pure pay-per-shard.
Module structure
terraform-module-aws-msk/
├── versions.tf # provider + Terraform version pins
├── main.tf # aws_msk_configuration + aws_msk_cluster + log group
├── variables.tf # var-driven inputs with validations
└── outputs.tf # cluster arn, bootstrap brokers, zookeeper string
versions.tf
terraform {
required_version = ">= 1.5.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
}
}
main.tf
locals {
# Bump this revision when you change server_properties so MSK applies it.
configuration_name = "${var.cluster_name}-config"
default_server_properties = <<-PROPERTIES
auto.create.topics.enable=false
default.replication.factor=${min(3, length(var.client_subnet_ids))}
min.insync.replicas=${var.minimum_in_sync_replicas}
num.partitions=3
log.retention.hours=${var.log_retention_hours}
unclean.leader.election.enable=false
PROPERTIES
# CloudWatch log group is only created (and wired) when CW logging is on.
create_log_group = var.broker_logs_cloudwatch_enabled
}
# Optional managed cluster-level Kafka configuration (server.properties).
resource "aws_msk_configuration" "this" {
name = local.configuration_name
kafka_versions = [var.kafka_version]
server_properties = coalesce(var.server_properties, local.default_server_properties)
lifecycle {
create_before_destroy = true
}
}
resource "aws_cloudwatch_log_group" "broker" {
count = local.create_log_group ? 1 : 0
name = "/aws/msk/${var.cluster_name}"
retention_in_days = var.cloudwatch_log_retention_days
kms_key_id = var.cloudwatch_log_kms_key_arn
tags = var.tags
}
resource "aws_msk_cluster" "this" {
cluster_name = var.cluster_name
kafka_version = var.kafka_version
number_of_broker_nodes = var.number_of_broker_nodes
broker_node_group_info {
instance_type = var.broker_instance_type
client_subnets = var.client_subnet_ids
security_groups = var.security_group_ids
storage_info {
ebs_storage_info {
volume_size = var.broker_ebs_volume_size
# In-place storage autoscaling so retention growth never pages you.
dynamic "provisioned_throughput" {
for_each = var.broker_ebs_provisioned_throughput_mibps != null ? [1] : []
content {
enabled = true
volume_throughput = var.broker_ebs_provisioned_throughput_mibps
}
}
}
}
}
configuration_info {
arn = aws_msk_configuration.this.arn
revision = aws_msk_configuration.this.latest_revision
}
encryption_info {
# null => AWS-managed CMK; pass your own KMS key ARN for full control.
encryption_at_rest_kms_key_arn = var.encryption_at_rest_kms_key_arn
encryption_in_transit {
# Force TLS on the wire; in-cluster broker traffic also encrypted.
client_broker = var.encryption_in_transit_client_broker
in_cluster = true
}
}
dynamic "client_authentication" {
for_each = var.iam_auth_enabled || var.tls_certificate_authority_arns != null ? [1] : []
content {
dynamic "sasl" {
for_each = var.iam_auth_enabled ? [1] : []
content {
iam = true
}
}
dynamic "tls" {
for_each = var.tls_certificate_authority_arns != null ? [1] : []
content {
certificate_authority_arns = var.tls_certificate_authority_arns
}
}
# When IAM auth is on, fall back to no unauthenticated access.
unauthenticated = false
}
}
logging_info {
broker_logs {
cloudwatch_logs {
enabled = var.broker_logs_cloudwatch_enabled
log_group = local.create_log_group ? aws_cloudwatch_log_group.broker[0].name : null
}
dynamic "s3" {
for_each = var.broker_logs_s3_bucket != null ? [1] : []
content {
enabled = true
bucket = var.broker_logs_s3_bucket
prefix = var.broker_logs_s3_prefix
}
}
}
}
dynamic "open_monitoring" {
for_each = var.enhanced_monitoring_prometheus ? [1] : []
content {
prometheus {
jmx_exporter {
enabled_in_broker = true
}
node_exporter {
enabled_in_broker = true
}
}
}
}
enhanced_monitoring = var.enhanced_monitoring_level
tags = var.tags
# Storage autoscaling and config revisions can be applied in place; do not
# let an unrelated plan tear the cluster down.
lifecycle {
ignore_changes = [broker_node_group_info[0].storage_info[0].ebs_storage_info[0].volume_size]
}
}
variables.tf
variable "cluster_name" {
description = "Name of the MSK cluster (also used to derive the configuration and log group names)."
type = string
validation {
condition = can(regex("^[A-Za-z][A-Za-z0-9-]{0,63}$", var.cluster_name))
error_message = "cluster_name must start with a letter and contain only letters, numbers, and hyphens (max 64 chars)."
}
}
variable "kafka_version" {
description = "Apache Kafka version for the cluster (e.g. 3.6.0, 3.7.x)."
type = string
default = "3.6.0"
}
variable "number_of_broker_nodes" {
description = "Total number of broker nodes. Must be a multiple of the number of client subnets (AZs)."
type = number
default = 3
validation {
condition = var.number_of_broker_nodes >= 2
error_message = "number_of_broker_nodes must be at least 2 for a usable cluster."
}
}
variable "client_subnet_ids" {
description = "Private subnet IDs (one per AZ, 2 or 3) where broker ENIs are placed."
type = list(string)
validation {
condition = length(var.client_subnet_ids) >= 2 && length(var.client_subnet_ids) <= 3
error_message = "MSK requires 2 or 3 client subnets, one per Availability Zone."
}
}
variable "security_group_ids" {
description = "Security group IDs attached to broker ENIs (must allow Kafka ports 9092/9094/9098 from clients)."
type = list(string)
}
variable "broker_instance_type" {
description = "EC2 instance type for brokers (e.g. kafka.m7g.large, kafka.m5.large)."
type = string
default = "kafka.m7g.large"
validation {
condition = startswith(var.broker_instance_type, "kafka.")
error_message = "broker_instance_type must be an MSK broker type and start with 'kafka.'."
}
}
variable "broker_ebs_volume_size" {
description = "Per-broker EBS volume size in GiB for log storage."
type = number
default = 100
validation {
condition = var.broker_ebs_volume_size >= 1 && var.broker_ebs_volume_size <= 16384
error_message = "broker_ebs_volume_size must be between 1 and 16384 GiB."
}
}
variable "broker_ebs_provisioned_throughput_mibps" {
description = "Optional provisioned EBS throughput per broker in MiB/s (requires m5.4xlarge+ class). Null disables it."
type = number
default = null
}
variable "encryption_at_rest_kms_key_arn" {
description = "Customer-managed KMS key ARN for at-rest encryption. Null uses the AWS-managed MSK key."
type = string
default = null
}
variable "encryption_in_transit_client_broker" {
description = "In-transit mode between clients and brokers: TLS, TLS_PLAINTEXT, or PLAINTEXT."
type = string
default = "TLS"
validation {
condition = contains(["TLS", "TLS_PLAINTEXT", "PLAINTEXT"], var.encryption_in_transit_client_broker)
error_message = "encryption_in_transit_client_broker must be one of TLS, TLS_PLAINTEXT, or PLAINTEXT."
}
}
variable "iam_auth_enabled" {
description = "Enable IAM (SASL/IAM) client authentication and access control."
type = bool
default = true
}
variable "tls_certificate_authority_arns" {
description = "List of ACM Private CA ARNs for mutual TLS client auth. Null disables mTLS."
type = list(string)
default = null
}
variable "minimum_in_sync_replicas" {
description = "min.insync.replicas applied via the managed configuration (durability vs. availability tradeoff)."
type = number
default = 2
}
variable "log_retention_hours" {
description = "Default log.retention.hours applied to topics via the managed configuration."
type = number
default = 168
}
variable "server_properties" {
description = "Full server.properties override. When null, a sane default block is generated from other vars."
type = string
default = null
}
variable "enhanced_monitoring_level" {
description = "CloudWatch metrics granularity: DEFAULT, PER_BROKER, PER_TOPIC_PER_BROKER, or PER_TOPIC_PER_PARTITION."
type = string
default = "PER_BROKER"
validation {
condition = contains(
["DEFAULT", "PER_BROKER", "PER_TOPIC_PER_BROKER", "PER_TOPIC_PER_PARTITION"],
var.enhanced_monitoring_level
)
error_message = "enhanced_monitoring_level must be a valid MSK enhanced monitoring value."
}
}
variable "enhanced_monitoring_prometheus" {
description = "Enable open monitoring with Prometheus JMX and node exporters on brokers."
type = bool
default = true
}
variable "broker_logs_cloudwatch_enabled" {
description = "Ship broker logs to a CloudWatch Logs group (created by this module)."
type = bool
default = true
}
variable "cloudwatch_log_retention_days" {
description = "Retention in days for the broker CloudWatch Logs group."
type = number
default = 30
}
variable "cloudwatch_log_kms_key_arn" {
description = "Optional KMS key ARN to encrypt the broker CloudWatch Logs group."
type = string
default = null
}
variable "broker_logs_s3_bucket" {
description = "Optional S3 bucket name for broker log delivery. Null disables S3 logging."
type = string
default = null
}
variable "broker_logs_s3_prefix" {
description = "Key prefix for broker logs delivered to S3."
type = string
default = "msk-broker-logs/"
}
variable "tags" {
description = "Tags applied to the cluster, configuration log group, and related resources."
type = map(string)
default = {}
}
outputs.tf
output "cluster_arn" {
description = "ARN of the MSK cluster."
value = aws_msk_cluster.this.arn
}
output "cluster_name" {
description = "Name of the MSK cluster."
value = aws_msk_cluster.this.cluster_name
}
output "bootstrap_brokers_tls" {
description = "Comma-separated TLS bootstrap broker connection string (port 9094)."
value = aws_msk_cluster.this.bootstrap_brokers_tls
}
output "bootstrap_brokers_sasl_iam" {
description = "Comma-separated SASL/IAM bootstrap broker connection string (port 9098). Empty unless IAM auth is enabled."
value = aws_msk_cluster.this.bootstrap_brokers_sasl_iam
}
output "zookeeper_connect_string" {
description = "ZooKeeper connection string (empty on KRaft-mode clusters)."
value = aws_msk_cluster.this.zookeeper_connect_string
}
output "configuration_arn" {
description = "ARN of the managed MSK configuration applied to the cluster."
value = aws_msk_configuration.this.arn
}
output "configuration_latest_revision" {
description = "Latest revision number of the managed MSK configuration."
value = aws_msk_configuration.this.latest_revision
}
How to use it
module "msk_kafka_orders" {
source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-aws-msk?ref=v1.0.0"
cluster_name = "orders-prod"
kafka_version = "3.6.0"
number_of_broker_nodes = 3
client_subnet_ids = module.network.private_subnet_ids # exactly 3, one per AZ
security_group_ids = [aws_security_group.msk_clients.id]
broker_instance_type = "kafka.m7g.large"
broker_ebs_volume_size = 500
# Customer-managed key + IAM auth + everything encrypted in transit.
encryption_at_rest_kms_key_arn = aws_kms_key.msk.arn
encryption_in_transit_client_broker = "TLS"
iam_auth_enabled = true
minimum_in_sync_replicas = 2
log_retention_hours = 336
enhanced_monitoring_level = "PER_TOPIC_PER_BROKER"
enhanced_monitoring_prometheus = true
broker_logs_cloudwatch_enabled = true
cloudwatch_log_retention_days = 90
tags = {
Environment = "prod"
Team = "commerce-platform"
CostCenter = "streaming"
}
}
# Downstream: hand the IAM bootstrap endpoint to a consumer service as config.
resource "aws_ssm_parameter" "kafka_bootstrap" {
name = "/orders/prod/kafka/bootstrap-brokers"
type = "SecureString"
value = module.msk_kafka_orders.bootstrap_brokers_sasl_iam
}
# ...and let an ECS task / Lambda reference the ARN for an IAM policy condition.
data "aws_iam_policy_document" "kafka_produce" {
statement {
actions = ["kafka-cluster:Connect", "kafka-cluster:WriteData", "kafka-cluster:DescribeTopic"]
resources = ["${module.msk_kafka_orders.cluster_arn}/*"]
}
}
With Terragrunt
Terragrunt keeps this module DRY across environments — define the backend and provider once in a root config, then a thin terragrunt.hcl per environment supplies only the inputs that differ.
1. Root config — live/terragrunt.hcl (inherited by every module):
remote_state {
backend = "s3"
generate = { path = "backend.tf", if_exists = "overwrite" }
config = {
# ...s3 state bucket/container + key per path...
}
}
2. Module config — live/prod/msk/terragrunt.hcl:
include "root" {
path = find_in_parent_folders()
}
terraform {
source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-aws-msk?ref=v1.0.0"
}
inputs = {
cluster_name = "..."
client_subnet_ids = ["...", "..."]
security_group_ids = ["...", "..."]
}
3. Deploy one environment, or roll out all modules together:
cd live/prod/msk && terragrunt apply # this module
terragrunt run-all apply # every module under live/prod
Why Terragrunt here: the backend and provider live in one place instead of being copy-pasted into every module; inputs is overridden per environment (dev / stage / prod) without forking the module; and run-all orchestrates dependencies across modules. Reach for it once you have more than one environment or more than a handful of modules — for a single stack, the plain Quickstart above is enough.
Inputs
| Name | Type | Default | Required | Description |
|---|---|---|---|---|
| cluster_name | string | — | Yes | Cluster name; also derives configuration and log group names. |
| kafka_version | string | “3.6.0” | No | Apache Kafka version for the cluster. |
| number_of_broker_nodes | number | 3 | No | Total broker nodes; must be a multiple of the AZ/subnet count. |
| client_subnet_ids | list(string) | — | Yes | 2 or 3 private subnet IDs, one per AZ, for broker ENIs. |
| security_group_ids | list(string) | — | Yes | Security groups on broker ENIs (allow 9092/9094/9098). |
| broker_instance_type | string | “kafka.m7g.large” | No | Broker EC2 instance type (must start with kafka.). |
| broker_ebs_volume_size | number | 100 | No | Per-broker EBS log volume size in GiB (1–16384). |
| broker_ebs_provisioned_throughput_mibps | number | null | No | Optional provisioned EBS throughput per broker (MiB/s). |
| encryption_at_rest_kms_key_arn | string | null | No | Customer-managed KMS key ARN; null uses the AWS-managed MSK key. |
| encryption_in_transit_client_broker | string | “TLS” | No | Client↔broker mode: TLS, TLS_PLAINTEXT, or PLAINTEXT. |
| iam_auth_enabled | bool | true | No | Enable SASL/IAM client authentication and access control. |
| tls_certificate_authority_arns | list(string) | null | No | ACM Private CA ARNs for mutual TLS auth; null disables mTLS. |
| minimum_in_sync_replicas | number | 2 | No | min.insync.replicas written to the managed configuration. |
| log_retention_hours | number | 168 | No | Default topic log.retention.hours in the managed configuration. |
| server_properties | string | null | No | Full server.properties override; null generates a default block. |
| enhanced_monitoring_level | string | “PER_BROKER” | No | CloudWatch metrics granularity for the cluster. |
| enhanced_monitoring_prometheus | bool | true | No | Enable Prometheus JMX + node exporters on brokers. |
| broker_logs_cloudwatch_enabled | bool | true | No | Ship broker logs to a module-created CloudWatch Logs group. |
| cloudwatch_log_retention_days | number | 30 | No | Retention (days) for the broker CloudWatch Logs group. |
| cloudwatch_log_kms_key_arn | string | null | No | Optional KMS key to encrypt the broker log group. |
| broker_logs_s3_bucket | string | null | No | Optional S3 bucket for broker log delivery; null disables it. |
| broker_logs_s3_prefix | string | “msk-broker-logs/” | No | Key prefix for broker logs delivered to S3. |
| tags | map(string) | {} | No | Tags applied to the cluster, configuration, and log group. |
Outputs
| Name | Description |
|---|---|
| cluster_arn | ARN of the MSK cluster. |
| cluster_name | Name of the MSK cluster. |
| bootstrap_brokers_tls | Comma-separated TLS bootstrap broker endpoint (port 9094). |
| bootstrap_brokers_sasl_iam | Comma-separated SASL/IAM bootstrap endpoint (port 9098); empty unless IAM auth is on. |
| zookeeper_connect_string | ZooKeeper connection string (empty on KRaft-mode clusters). |
| configuration_arn | ARN of the managed MSK configuration applied to the cluster. |
| configuration_latest_revision | Latest revision number of the managed configuration. |
Enterprise scenario
A retail commerce platform runs Debezium connectors that stream change-data-capture events off their Aurora order database into an orders-prod MSK cluster, where a dozen downstream microservices (inventory, fulfilment, fraud, analytics) consume independently. They standardise every team onto SASL/IAM auth so producer/consumer permissions live in IAM policies scoped to the cluster ARN — no Kafka ACL sprawl, no static credentials — and pin min.insync.replicas=2 with replication factor 3 across three AZs so a single-AZ failure never loses an acknowledged order event. Enhanced PER_TOPIC_PER_BROKER metrics and the Prometheus exporters feed their existing Grafana dashboards, and 500 GiB broker volumes with autoscaling absorb the 14-day retention window required for fraud replay.
Best practices
- Force TLS and never ship
PLAINTEXTto prod. Keepencryption_in_transit_client_broker = "TLS"andin_cluster = true; pair it with a customer-managed KMS key so at-rest encryption is in your control plane and key rotation/audit is yours. - Prefer SASL/IAM over Kafka ACLs. IAM auth lets you express produce/consume permissions as policies on
cluster_arn/*withkafka-cluster:*actions, which is far easier to audit than per-topic ACLs and avoids long-lived SCRAM secrets. - Set
default.replication.factor=3andmin.insync.replicas=2across three AZs, and keepunclean.leader.election.enable=false— that combination is the durability floor for acknowledged writes and is encoded in this module’s defaultserver.properties. - Right-size brokers on Graviton and lean on storage autoscaling.
kafka.m7g.*instances give the best price/performance; size EBS for your retention window and let in-place storage autoscaling grow volumes so retention spikes never page on-call. Theignore_changesonvolume_sizekeeps autoscaled growth from being reverted by Terraform. - Name and tag for cost attribution. Derive the configuration and log group from
cluster_name, and tag every cluster withEnvironment,Team, andCostCenter— MSK broker-hours and EBS are a real line item and per-team tags make the streaming bill explainable. - Apply config changes via new revisions, not cluster replacement. Because
aws_msk_configurationusescreate_before_destroyand the cluster referenceslatest_revision, tuningserver.propertiesrolls forward as an in-place update rather than destroying brokers.