IaC AWS

Terraform Module: AWS EMR — Reproducible Spark/Hadoop Clusters with Spot Task Fleets

Quick take — A reusable hashicorp/aws ~> 5.0 Terraform module for aws_emr_cluster: on-demand core nodes, Spot task instance fleets, autoscaling, EMRFS S3 logging, and least-privilege IAM. New here? Jump to the Quickstart below to deploy it in minutes; read on for how it works and when to reach for it.

Quickstart (copy-paste)

Minimal, runnable configuration — drop this in a .tf file and fill in the "..." placeholders (each required input is commented):

provider "aws" {
  region = "us-east-1"
}

module "emr" {
  source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-aws-emr?ref=v1.0.0"

  name                     = "..."  # Cluster name; prefix for IAM roles and the S3 log path.
  environment              = "..."  # Deployment environment tag (dev/stage/prod).
  subnet_id                = "..."  # Single-AZ subnet the nodes launch into.
  master_security_group_id = "..."  # EMR-managed SG for the master node.
  slave_security_group_id  = "..."  # EMR-managed SG for core and task nodes.
  logs_bucket              = "..."  # S3 bucket (no prefix) for EMR log archival.
}

Then terraform init && terraform apply. Every other input has a sensible default — see Inputs below to override behaviour.

What this module is

Amazon EMR (Elastic MapReduce) is AWS’s managed big-data platform for running Apache Spark, Hadoop, Hive, Presto/Trino, HBase, and Flink on transient or long-running clusters. A cluster is a set of EC2 instances grouped into three roles: a master instance group (the YARN ResourceManager and HDFS NameNode), a core group (DataNodes plus YARN NodeManagers that hold HDFS blocks), and an optional task group (compute-only NodeManagers with no HDFS, ideal for Spot capacity). EMR layers release_label (e.g. emr-7.1.0), a list of applications, bootstrap actions, EMRFS consistency, and JSON configurations for Spark/YARN tuning on top of that fleet.

The raw aws_emr_cluster resource is deceptively large: you must wire two distinct IAM roles (the service role and the EC2 instance profile, each with its own policy), an EC2 key pair or SSM, subnet placement, security groups, S3 log paths, and per-application config classifications — and a single typo in the instance-profile trust policy silently fails the cluster at launch. Wrapping it in a module fixes the safe defaults once (encryption at the EMRFS layer, log archival to S3, managed scaling bounds, debugging step) and exposes only the knobs that change between a dev sandbox and a production ETL cluster: release label, instance types, capacity, and Spark configs. Every team then provisions an identical, reviewed cluster from a tagged ref instead of copy-pasting 200 lines of HCL.

When to use it

Module structure

terraform-module-aws-emr/
├── versions.tf      # provider + Terraform version pins
├── main.tf          # IAM roles/policies, EMR cluster, managed scaling
├── variables.tf     # var-driven inputs with validation
└── outputs.tf       # cluster id/name + master DNS, ARNs, log URI

versions.tf

terraform {
  required_version = ">= 1.5.0"

  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
  }
}

main.tf

locals {
  log_uri = "s3://${var.logs_bucket}/${var.name}/"

  tags = merge(
    {
      Name        = var.name
      Environment = var.environment
      ManagedBy   = "terraform"
      Module      = "terraform-module-aws-emr"
    },
    var.tags,
  )
}

# ---------------------------------------------------------------------------
# EMR service role — lets the EMR control plane provision EC2, ENIs, etc.
# ---------------------------------------------------------------------------
data "aws_iam_policy_document" "service_assume" {
  statement {
    effect  = "Allow"
    actions = ["sts:AssumeRole"]

    principals {
      type        = "Service"
      identifiers = ["elasticmapreduce.amazonaws.com"]
    }
  }
}

resource "aws_iam_role" "service" {
  name                 = "${var.name}-emr-service"
  assume_role_policy   = data.aws_iam_policy_document.service_assume.json
  permissions_boundary = var.permissions_boundary_arn
  tags                 = local.tags
}

resource "aws_iam_role_policy_attachment" "service" {
  role       = aws_iam_role.service.name
  policy_arn = "arn:${data.aws_partition.current.partition}:iam::aws:policy/service-role/AmazonEMRServicePolicy_v2"
}

# ---------------------------------------------------------------------------
# EC2 instance profile — the role the cluster nodes themselves assume.
# Scoped to the data buckets the jobs actually read/write, plus logs.
# ---------------------------------------------------------------------------
data "aws_iam_policy_document" "ec2_assume" {
  statement {
    effect  = "Allow"
    actions = ["sts:AssumeRole"]

    principals {
      type        = "Service"
      identifiers = ["ec2.amazonaws.com"]
    }
  }
}

resource "aws_iam_role" "ec2" {
  name                 = "${var.name}-emr-ec2"
  assume_role_policy   = data.aws_iam_policy_document.ec2_assume.json
  permissions_boundary = var.permissions_boundary_arn
  tags                 = local.tags
}

data "aws_iam_policy_document" "ec2_data_access" {
  statement {
    sid    = "ListDataBuckets"
    effect = "Allow"
    actions = [
      "s3:GetBucketLocation",
      "s3:ListBucket",
    ]
    resources = [
      for b in concat(var.data_buckets, [var.logs_bucket]) :
      "arn:${data.aws_partition.current.partition}:s3:::${b}"
    ]
  }

  statement {
    sid    = "ReadWriteDataObjects"
    effect = "Allow"
    actions = [
      "s3:GetObject",
      "s3:PutObject",
      "s3:DeleteObject",
    ]
    resources = [
      for b in concat(var.data_buckets, [var.logs_bucket]) :
      "arn:${data.aws_partition.current.partition}:s3:::${b}/*"
    ]
  }
}

resource "aws_iam_role_policy" "ec2_data_access" {
  name   = "${var.name}-emr-data-access"
  role   = aws_iam_role.ec2.id
  policy = data.aws_iam_policy_document.ec2_data_access.json
}

resource "aws_iam_instance_profile" "ec2" {
  name = "${var.name}-emr-ec2"
  role = aws_iam_role.ec2.name
  tags = local.tags
}

data "aws_partition" "current" {}

# ---------------------------------------------------------------------------
# EMR cluster
# ---------------------------------------------------------------------------
resource "aws_emr_cluster" "this" {
  name          = var.name
  release_label = var.release_label
  applications  = var.applications
  service_role  = aws_iam_role.service.arn
  log_uri       = local.log_uri

  # Keep the cluster up after the last step for interactive/long-running use,
  # or auto-terminate transient ETL clusters when steps complete.
  keep_job_flow_alive_when_no_steps = var.keep_alive
  termination_protection            = var.termination_protection
  ebs_root_volume_size              = var.ebs_root_volume_size

  ec2_attributes {
    subnet_id                         = var.subnet_id
    instance_profile                  = aws_iam_instance_profile.ec2.arn
    key_name                          = var.ec2_key_name
    emr_managed_master_security_group = var.master_security_group_id
    emr_managed_slave_security_group  = var.slave_security_group_id
    service_access_security_group     = var.service_access_security_group_id
  }

  # Master — single node running the YARN ResourceManager + HDFS NameNode.
  master_instance_group {
    name           = "master"
    instance_type  = var.master_instance_type
    instance_count = 1

    ebs_config {
      size                 = var.master_ebs_size
      type                 = "gp3"
      volumes_per_instance = 1
    }
  }

  # Core — On-Demand DataNodes that hold HDFS blocks. Never put these on Spot.
  core_instance_group {
    name           = "core"
    instance_type  = var.core_instance_type
    instance_count = var.core_instance_count
    bid_price      = null # always On-Demand

    ebs_config {
      size                 = var.core_ebs_size
      type                 = "gp3"
      volumes_per_instance = var.core_ebs_volumes_per_instance
    }
  }

  # Spark/YARN/EMRFS tuning passed straight through as classifications.
  configurations_json = var.configurations_json

  # At-rest + in-transit encryption and other cluster-wide knobs.
  dynamic "auto_termination_policy" {
    for_each = var.idle_timeout_seconds > 0 ? [1] : []
    content {
      idle_timeout = var.idle_timeout_seconds
    }
  }

  security_configuration = var.security_configuration_name
  service_role_arn       = null

  lifecycle {
    # release_label upgrades replace the cluster — make that change explicit.
    ignore_changes = [step]
  }

  tags = local.tags
}

# ---------------------------------------------------------------------------
# Task instance fleet — compute-only, Spot-heavy, elastic capacity.
# ---------------------------------------------------------------------------
resource "aws_emr_instance_fleet" "task" {
  count      = var.task_target_capacity > 0 ? 1 : 0
  cluster_id = aws_emr_cluster.this.id
  name       = "task-spot"

  target_on_demand_capacity = var.task_on_demand_capacity
  target_spot_capacity      = var.task_target_capacity

  dynamic "instance_type_configs" {
    for_each = var.task_instance_types
    content {
      instance_type     = instance_type_configs.value
      weighted_capacity = 1
    }
  }

  launch_specifications {
    spot_specification {
      allocation_strategy      = "capacity-optimized"
      timeout_action           = "SWITCH_TO_ON_DEMAND"
      timeout_duration_minutes = var.spot_timeout_minutes
    }
  }
}

# ---------------------------------------------------------------------------
# Managed Scaling — EMR sizes YARN capacity between min/max units for you.
# ---------------------------------------------------------------------------
resource "aws_emr_managed_scaling_policy" "this" {
  count      = var.enable_managed_scaling ? 1 : 0
  cluster_id = aws_emr_cluster.this.id

  compute_limits {
    unit_type                       = "InstanceFleetUnits"
    minimum_capacity_units          = var.scaling_min_units
    maximum_capacity_units          = var.scaling_max_units
    maximum_core_capacity_units     = var.scaling_max_core_units
    maximum_ondemand_capacity_units = var.scaling_max_ondemand_units
  }
}

variables.tf

variable "name" {
  description = "Cluster name; also used as a prefix for IAM roles and the S3 log path."
  type        = string

  validation {
    condition     = can(regex("^[a-z0-9][a-z0-9-]{1,62}$", var.name))
    error_message = "name must be 2-63 chars, lowercase alphanumeric and hyphens, starting alphanumeric."
  }
}

variable "environment" {
  description = "Deployment environment tag (e.g. dev, stage, prod)."
  type        = string
}

variable "release_label" {
  description = "EMR release, e.g. emr-7.1.0. Changing this replaces the cluster."
  type        = string
  default     = "emr-7.1.0"

  validation {
    condition     = can(regex("^emr-[0-9]+\\.[0-9]+\\.[0-9]+$", var.release_label))
    error_message = "release_label must look like emr-X.Y.Z (e.g. emr-7.1.0)."
  }
}

variable "applications" {
  description = "EMR applications to install on the cluster."
  type        = list(string)
  default     = ["Spark", "Hadoop", "Hive"]
}

variable "subnet_id" {
  description = "Subnet (single AZ) the cluster nodes launch into. Use a private subnet in production."
  type        = string
}

variable "ec2_key_name" {
  description = "EC2 key pair for SSH to the master. Leave null and use SSM Session Manager instead."
  type        = string
  default     = null
}

variable "master_security_group_id" {
  description = "EMR-managed security group for the master node."
  type        = string
}

variable "slave_security_group_id" {
  description = "EMR-managed security group for core and task nodes."
  type        = string
}

variable "service_access_security_group_id" {
  description = "Security group used by EMR to reach nodes in a private subnet. Null for public subnets."
  type        = string
  default     = null
}

variable "logs_bucket" {
  description = "S3 bucket name (no s3:// prefix) where EMR archives cluster and step logs."
  type        = string
}

variable "data_buckets" {
  description = "S3 buckets the cluster's jobs are allowed to read/write via EMRFS."
  type        = list(string)
  default     = []
}

variable "permissions_boundary_arn" {
  description = "Optional IAM permissions boundary applied to both EMR roles."
  type        = string
  default     = null
}

variable "master_instance_type" {
  description = "Instance type for the master node."
  type        = string
  default     = "m6g.xlarge"
}

variable "master_ebs_size" {
  description = "EBS gp3 volume size (GiB) for the master node."
  type        = number
  default     = 64
}

variable "core_instance_type" {
  description = "Instance type for On-Demand core (HDFS) nodes."
  type        = string
  default     = "m6g.xlarge"
}

variable "core_instance_count" {
  description = "Number of On-Demand core nodes."
  type        = number
  default     = 2

  validation {
    condition     = var.core_instance_count >= 1
    error_message = "core_instance_count must be at least 1 (HDFS requires a DataNode)."
  }
}

variable "core_ebs_size" {
  description = "EBS gp3 volume size (GiB) per core node volume."
  type        = number
  default     = 128
}

variable "core_ebs_volumes_per_instance" {
  description = "Number of EBS volumes attached to each core node."
  type        = number
  default     = 1
}

variable "ebs_root_volume_size" {
  description = "Root EBS volume size (GiB) for all cluster instances."
  type        = number
  default     = 20
}

variable "task_instance_types" {
  description = "Candidate instance types for the Spot task fleet (diversify for capacity)."
  type        = list(string)
  default     = ["m6g.xlarge", "m6g.2xlarge", "m5.xlarge"]
}

variable "task_target_capacity" {
  description = "Target Spot capacity units for the task fleet. 0 disables the task fleet."
  type        = number
  default     = 0
}

variable "task_on_demand_capacity" {
  description = "Target On-Demand capacity units for the task fleet (baseline that never gets reclaimed)."
  type        = number
  default     = 0
}

variable "spot_timeout_minutes" {
  description = "Minutes to wait for Spot capacity before the timeout_action fires."
  type        = number
  default     = 20

  validation {
    condition     = var.spot_timeout_minutes >= 5 && var.spot_timeout_minutes <= 1440
    error_message = "spot_timeout_minutes must be between 5 and 1440."
  }
}

variable "configurations_json" {
  description = "EMR configuration classifications (Spark/YARN/EMRFS) as a JSON string."
  type        = string
  default     = "[]"

  validation {
    condition     = can(jsondecode(var.configurations_json))
    error_message = "configurations_json must be valid JSON (an array of classification objects)."
  }
}

variable "keep_alive" {
  description = "Keep the cluster running after the last step completes (true = long-running)."
  type        = bool
  default     = true
}

variable "termination_protection" {
  description = "Enable termination protection on the cluster."
  type        = bool
  default     = false
}

variable "idle_timeout_seconds" {
  description = "Auto-terminate the cluster after this many idle seconds (60-604800). 0 disables it."
  type        = number
  default     = 0

  validation {
    condition     = var.idle_timeout_seconds == 0 || (var.idle_timeout_seconds >= 60 && var.idle_timeout_seconds <= 604800)
    error_message = "idle_timeout_seconds must be 0 or between 60 and 604800."
  }
}

variable "security_configuration_name" {
  description = "Name of an aws_emr_security_configuration providing at-rest/in-transit encryption."
  type        = string
  default     = null
}

variable "enable_managed_scaling" {
  description = "Attach an EMR Managed Scaling policy to the cluster."
  type        = bool
  default     = true
}

variable "scaling_min_units" {
  description = "Minimum capacity units for Managed Scaling."
  type        = number
  default     = 2
}

variable "scaling_max_units" {
  description = "Maximum capacity units for Managed Scaling."
  type        = number
  default     = 20
}

variable "scaling_max_core_units" {
  description = "Maximum capacity units allocated to core nodes under Managed Scaling."
  type        = number
  default     = 4
}

variable "scaling_max_ondemand_units" {
  description = "Maximum On-Demand capacity units under Managed Scaling (rest comes from Spot)."
  type        = number
  default     = 6
}

variable "tags" {
  description = "Additional tags merged onto every resource."
  type        = map(string)
  default     = {}
}

outputs.tf

output "cluster_id" {
  description = "EMR cluster ID (j-XXXXXXXX), used by step submitters and downstream resources."
  value       = aws_emr_cluster.this.id
}

output "cluster_name" {
  description = "EMR cluster name."
  value       = aws_emr_cluster.this.name
}

output "cluster_arn" {
  description = "ARN of the EMR cluster."
  value       = aws_emr_cluster.this.arn
}

output "master_public_dns" {
  description = "Public/private DNS of the master node (Spark UI, SSH target)."
  value       = aws_emr_cluster.this.master_public_dns
}

output "log_uri" {
  description = "S3 URI where cluster and step logs are archived."
  value       = aws_emr_cluster.this.log_uri
}

output "ec2_instance_profile_role_arn" {
  description = "ARN of the EC2 instance-profile role the nodes assume (attach extra data-access policies here)."
  value       = aws_iam_role.ec2.arn
}

output "service_role_arn" {
  description = "ARN of the EMR service role."
  value       = aws_iam_role.service.arn
}

output "task_fleet_id" {
  description = "ID of the Spot task instance fleet, or null when no task fleet is provisioned."
  value       = try(aws_emr_instance_fleet.task[0].id, null)
}

How to use it

module "emr" {
  source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-aws-emr?ref=v1.0.0"

  name        = "etl-nightly"
  environment = "prod"

  release_label = "emr-7.1.0"
  applications  = ["Spark", "Hadoop", "Hive"]

  subnet_id                = module.network.private_subnet_ids[0]
  master_security_group_id = module.network.emr_master_sg_id
  slave_security_group_id  = module.network.emr_slave_sg_id

  logs_bucket  = "kloudvin-emr-logs"
  data_buckets = ["kloudvin-raw-events", "kloudvin-curated-parquet"]

  master_instance_type = "m6g.xlarge"
  core_instance_type   = "m6g.2xlarge"
  core_instance_count  = 3

  # Elastic Spot task fleet for the heavy Spark stages.
  task_target_capacity = 12
  task_instance_types  = ["m6g.2xlarge", "m6g.4xlarge", "m5.2xlarge"]

  # Encrypt at rest + in transit via a pre-created security configuration.
  security_configuration_name = aws_emr_security_configuration.encrypted.name

  # Bound the autoscaler and reclaim cost when idle.
  enable_managed_scaling = true
  scaling_max_units      = 24
  idle_timeout_seconds   = 3600

  configurations_json = jsonencode([
    {
      Classification = "spark-defaults"
      Properties = {
        "spark.dynamicAllocation.enabled" = "true"
        "spark.sql.shuffle.partitions"    = "400"
      }
    },
    {
      Classification = "emrfs-site"
      Properties = {
        "fs.s3.maxConnections" = "200"
      }
    },
  ])

  tags = {
    CostCenter = "data-platform"
    Team       = "analytics"
  }
}

# Downstream: schedule a Spark step on the cluster using its output ID.
resource "aws_emr_instance_fleet" "ad_hoc" {
  cluster_id                = module.emr.cluster_id
  name                      = "ad-hoc-spot"
  target_spot_capacity      = 4
  target_on_demand_capacity = 0

  instance_type_configs {
    instance_type     = "m6g.xlarge"
    weighted_capacity = 1
  }

  launch_specifications {
    spot_specification {
      allocation_strategy      = "capacity-optimized"
      timeout_action           = "TERMINATE_CLUSTER"
      timeout_duration_minutes = 10
    }
  }
}

With Terragrunt

Terragrunt keeps this module DRY across environments — define the backend and provider once in a root config, then a thin terragrunt.hcl per environment supplies only the inputs that differ.

1. Root configlive/terragrunt.hcl (inherited by every module):

remote_state {
  backend = "s3"
  generate = { path = "backend.tf", if_exists = "overwrite" }
  config = {
    # ...s3 state bucket/container + key per path...
  }
}

2. Module configlive/prod/emr/terragrunt.hcl:

include "root" {
  path = find_in_parent_folders()
}

terraform {
  source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-aws-emr?ref=v1.0.0"
}

inputs = {
  name = "..."
  environment = "..."
  subnet_id = "..."
  master_security_group_id = "..."
  slave_security_group_id = "..."
  logs_bucket = "..."
}

3. Deploy one environment, or roll out all modules together:

cd live/prod/emr && terragrunt apply        # this module
terragrunt run-all apply                      # every module under live/prod

Why Terragrunt here: the backend and provider live in one place instead of being copy-pasted into every module; inputs is overridden per environment (dev / stage / prod) without forking the module; and run-all orchestrates dependencies across modules. Reach for it once you have more than one environment or more than a handful of modules — for a single stack, the plain Quickstart above is enough.

Inputs

Name Type Default Required Description
name string yes Cluster name; prefix for IAM roles and the S3 log path.
environment string yes Deployment environment tag (dev/stage/prod).
release_label string "emr-7.1.0" no EMR release; changing it replaces the cluster.
applications list(string) ["Spark","Hadoop","Hive"] no EMR applications to install.
subnet_id string yes Single-AZ subnet the nodes launch into.
ec2_key_name string null no EC2 key pair for SSH; null to use SSM instead.
master_security_group_id string yes EMR-managed SG for the master node.
slave_security_group_id string yes EMR-managed SG for core and task nodes.
service_access_security_group_id string null no SG EMR uses to reach private-subnet nodes.
logs_bucket string yes S3 bucket (no prefix) for EMR log archival.
data_buckets list(string) [] no S3 buckets jobs may read/write via EMRFS.
permissions_boundary_arn string null no IAM permissions boundary for both roles.
master_instance_type string "m6g.xlarge" no Instance type for the master node.
master_ebs_size number 64 no gp3 volume size (GiB) for the master.
core_instance_type string "m6g.xlarge" no Instance type for On-Demand core nodes.
core_instance_count number 2 no Number of On-Demand core (HDFS) nodes.
core_ebs_size number 128 no gp3 volume size (GiB) per core volume.
core_ebs_volumes_per_instance number 1 no EBS volumes attached to each core node.
ebs_root_volume_size number 20 no Root EBS volume size (GiB) for all nodes.
task_instance_types list(string) ["m6g.xlarge","m6g.2xlarge","m5.xlarge"] no Candidate types for the Spot task fleet.
task_target_capacity number 0 no Target Spot capacity units; 0 disables the fleet.
task_on_demand_capacity number 0 no Baseline On-Demand units in the task fleet.
spot_timeout_minutes number 20 no Minutes to wait for Spot before timeout_action.
configurations_json string "[]" no Spark/YARN/EMRFS classifications as JSON.
keep_alive bool true no Keep cluster alive after the last step.
termination_protection bool false no Enable termination protection.
idle_timeout_seconds number 0 no Auto-terminate after N idle seconds (0 = off).
security_configuration_name string null no Name of an encryption security configuration.
enable_managed_scaling bool true no Attach an EMR Managed Scaling policy.
scaling_min_units number 2 no Minimum Managed Scaling capacity units.
scaling_max_units number 20 no Maximum Managed Scaling capacity units.
scaling_max_core_units number 4 no Max capacity units allocated to core nodes.
scaling_max_ondemand_units number 6 no Max On-Demand units under Managed Scaling.
tags map(string) {} no Additional tags merged onto every resource.

Outputs

Name Description
cluster_id EMR cluster ID (j-XXXXXXXX) for submitting steps and downstream references.
cluster_name EMR cluster name.
cluster_arn ARN of the EMR cluster.
master_public_dns DNS of the master node (Spark UI / SSH target).
log_uri S3 URI where cluster and step logs are archived.
ec2_instance_profile_role_arn ARN of the EC2 instance-profile role nodes assume.
service_role_arn ARN of the EMR service role.
task_fleet_id ID of the Spot task instance fleet, or null when none is provisioned.

Enterprise scenario

A retail analytics team runs a nightly pipeline that converts ~4 TB of raw clickstream JSON in kloudvin-raw-events into partitioned Parquet in kloudvin-curated-parquet for Athena. They deploy this module with three On-Demand m6g.2xlarge core nodes for stable HDFS shuffle space and a 12-unit Spot task fleet diversified across three instance families, so the expensive Spark stages run at roughly 70% off On-Demand pricing while a Spot interruption merely re-queues tasks instead of failing the job. Managed Scaling caps the cluster at 24 units and an idle_timeout_seconds of 3600 tears the cluster down after the run, so the data platform pays only for the ~90 minutes of nightly compute rather than a 24/7 cluster.

Best practices

TerraformAWSEMRModuleIaC
Need this built for real?

Vinod is a Senior Cloud Architect (22+ yrs) — available for Azure / AWS / GCP architecture, landing zones, and migrations.

Work with me

Comments

Keep Reading