IaC GCP

Terraform Module: GCP Dataflow — Production-Ready Streaming & Batch Pipelines from Templates

Quick take — Reusable hashicorp/google Terraform module for google_dataflow_job: deploy Dataflow batch and streaming pipelines from Classic or Flex Templates with a dedicated worker SA, private networking, and KMS encryption. New here? Jump to the Quickstart below to deploy it in minutes; read on for how it works and when to reach for it.

Quickstart (copy-paste)

Minimal, runnable configuration — drop this in a .tf file and fill in the "..." placeholders (each required input is commented):

provider "google" {
  project = "my-project"
  region  = "us-central1"
}

module "dataflow" {
  source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-gcp-dataflow?ref=v1.0.0"

  project_id        = "..."  # GCP project ID where the job runs.
  region            = "..."  # Region for the Dataflow job; should match Google-hosted…
  name              = "..."  # Job name (1–63 chars, lowercase, digits, hyphens; valid…
  template_gcs_path = "..."  # gs:// path to the Classic Template (validated to start …
  staging_bucket    = "..."  # Bucket name used to derive temp/staging GCS locations.
}

Then terraform init && terraform apply. Every other input has a sensible default — see Inputs below to override behaviour.

What this module is

Google Cloud Dataflow is the managed, autoscaling runner for Apache Beam pipelines. It executes both streaming jobs (unbounded sources like Pub/Sub) and batch jobs (bounded sources like Cloud Storage or BigQuery), handling worker provisioning, autoscaling, shuffle, watermarking, and exactly-once semantics so you don’t run a Spark/Flink cluster yourself.

In Terraform, the google_dataflow_job resource launches a job from a Classic Template (a pre-staged *.json template path in GCS, e.g. Google’s gs://dataflow-templates-<region>/... library) by setting template_gcs_path. It is a quirky resource: it is not a plain CRUD object. Terraform “creates” a job by submitting it to the Dataflow service, and a streaming job keeps running until it is drained or cancelled. Many parameters (zone, machine_type, network, max_workers, the parameters map) force replacement when changed, and on destroy the provider issues a drain/cancel rather than a delete.

Wrapping it in a module pays off because the surrounding infrastructure is the hard part and is identical for every pipeline: a least-privilege worker service account, the IAM roles that SA needs (roles/dataflow.worker, plus data-plane roles), a private worker pool (no public IPs, Private Google Access), a CMEK key for at-rest encryption, and a sane on_delete policy. The module makes those defaults secure-by-default and turns “launch a templated pipeline” into a 15-line module block.

When to use it

If you need a full custom pipeline packaged as a Flex Template container, use google_dataflow_flex_template_job instead — this module targets the Classic-template google_dataflow_job resource, which covers the large catalogue of Google templates and any classic template your team stages.

Module structure

terraform-module-gcp-dataflow/
├── versions.tf
├── main.tf
├── variables.tf
└── outputs.tf

versions.tf

terraform {
  required_version = ">= 1.5.0"

  required_providers {
    google = {
      source  = "hashicorp/google"
      version = "~> 5.0"
    }
  }
}

main.tf

locals {
  # Dataflow temp/staging must live under a GCS path; derive a sane default.
  temp_location    = var.temp_location != null ? var.temp_location : "gs://${var.staging_bucket}/tmp"
  staging_location = "gs://${var.staging_bucket}/staging"

  # Roles the worker SA needs to run jobs + emit logs/metrics. Data-plane
  # roles (bigquery.dataEditor, pubsub.subscriber, ...) are passed by callers.
  base_worker_roles = [
    "roles/dataflow.worker",
    "roles/storage.objectAdmin",
    "roles/monitoring.metricWriter",
    "roles/logging.logWriter",
  ]

  worker_roles = toset(concat(local.base_worker_roles, var.additional_worker_roles))

  # Job-level parameters always include temp/staging unless the caller overrode.
  job_parameters = merge(
    {
      tempLocation    = local.temp_location
      stagingLocation = local.staging_location
    },
    var.parameters,
  )
}

# Dedicated, least-privilege worker service account for this pipeline.
resource "google_service_account" "worker" {
  count = var.create_service_account ? 1 : 0

  project      = var.project_id
  account_id   = "df-${substr(var.name, 0, 26)}"
  display_name = "Dataflow worker SA for ${var.name}"
}

locals {
  worker_sa_email = var.create_service_account ? google_service_account.worker[0].email : var.service_account_email
}

# Grant the worker SA the roles it needs at the project scope.
resource "google_project_iam_member" "worker" {
  for_each = var.create_service_account ? local.worker_roles : toset([])

  project = var.project_id
  role    = each.value
  member  = "serviceAccount:${google_service_account.worker[0].email}"
}

# The Dataflow job itself, launched from a Classic Template in GCS.
resource "google_dataflow_job" "this" {
  project = var.project_id
  region  = var.region
  zone    = var.zone

  name              = var.name
  template_gcs_path = var.template_gcs_path
  temp_gcs_location = local.temp_location
  parameters        = local.job_parameters

  # Worker sizing & autoscaling.
  machine_type          = var.machine_type
  max_workers           = var.max_workers
  enable_streaming_engine = var.enable_streaming_engine

  # Networking: private workers on a shared/standalone VPC subnet.
  network    = var.network
  subnetwork = var.subnetwork
  ip_configuration = var.use_public_ips ? "WORKER_IP_PUBLIC" : "WORKER_IP_PRIVATE"

  # Identity & encryption.
  service_account_email = local.worker_sa_email
  kms_key_name          = var.kms_key_name

  # Lifecycle: drain streaming jobs, cancel batch jobs.
  on_delete             = var.on_delete
  skip_wait_on_job_termination = var.skip_wait_on_job_termination

  additional_experiments = var.additional_experiments
  labels                 = var.labels

  # IAM bindings must exist before the service tries to assume the SA.
  depends_on = [google_project_iam_member.worker]
}

variables.tf

variable "project_id" {
  description = "GCP project ID where the Dataflow job runs."
  type        = string
}

variable "region" {
  description = "Region for the Dataflow job (e.g. europe-west1). Should match the template's region for Google-hosted templates."
  type        = string
}

variable "zone" {
  description = "Optional zone to pin workers to. Leave null to let Dataflow pick within the region. Changing this forces job replacement."
  type        = string
  default     = null
}

variable "name" {
  description = "Job name. Must be unique among active jobs in the region; lowercase letters, digits and hyphens only."
  type        = string

  validation {
    condition     = can(regex("^[a-z]([-a-z0-9]{0,61}[a-z0-9])?$", var.name))
    error_message = "name must be 1-63 chars, start with a lowercase letter, and contain only lowercase letters, digits and hyphens."
  }
}

variable "template_gcs_path" {
  description = "GCS path to the Classic Template, e.g. gs://dataflow-templates-europe-west1/latest/PubSub_Subscription_to_BigQuery."
  type        = string

  validation {
    condition     = can(regex("^gs://", var.template_gcs_path))
    error_message = "template_gcs_path must be a gs:// URI to a staged Classic Template."
  }
}

variable "staging_bucket" {
  description = "GCS bucket name (no gs:// prefix) used to derive temp and staging locations."
  type        = string
}

variable "temp_location" {
  description = "Override for the temp GCS location. Defaults to gs://<staging_bucket>/tmp."
  type        = string
  default     = null
}

variable "parameters" {
  description = "Template-specific parameters passed to the job (e.g. inputSubscription, outputTableSpec). Merged with derived temp/staging locations."
  type        = map(string)
  default     = {}
}

variable "machine_type" {
  description = "Compute Engine machine type for workers (e.g. n2-standard-4)."
  type        = string
  default     = "n1-standard-2"
}

variable "max_workers" {
  description = "Maximum number of workers for autoscaling. Caps cost for streaming jobs."
  type        = number
  default     = 5

  validation {
    condition     = var.max_workers >= 1 && var.max_workers <= 1000
    error_message = "max_workers must be between 1 and 1000."
  }
}

variable "enable_streaming_engine" {
  description = "Use Streaming Engine (offloads state/shuffle from workers). Recommended true for streaming pipelines."
  type        = bool
  default     = true
}

variable "network" {
  description = "VPC network name for workers. Use with subnetwork for shared-VPC deployments."
  type        = string
  default     = null
}

variable "subnetwork" {
  description = "Full subnetwork URL, e.g. regions/europe-west1/subnetworks/dataflow-subnet (or a full https self-link for shared VPC)."
  type        = string
  default     = null
}

variable "use_public_ips" {
  description = "If false (recommended), workers get no external IPs and rely on Private Google Access / NAT."
  type        = bool
  default     = false
}

variable "create_service_account" {
  description = "Create a dedicated least-privilege worker SA. If false, supply service_account_email."
  type        = bool
  default     = true
}

variable "service_account_email" {
  description = "Existing worker SA email to use when create_service_account is false."
  type        = string
  default     = null

  validation {
    condition     = var.service_account_email == null || can(regex("@", var.service_account_email))
    error_message = "service_account_email must be a valid service account email or null."
  }
}

variable "additional_worker_roles" {
  description = "Extra project IAM roles for the created worker SA (data-plane access), e.g. roles/bigquery.dataEditor, roles/pubsub.subscriber."
  type        = list(string)
  default     = []
}

variable "kms_key_name" {
  description = "Optional CMEK key (full resource ID) to encrypt job state and temp data at rest."
  type        = string
  default     = null
}

variable "on_delete" {
  description = "What to do on terraform destroy: 'drain' (graceful, for streaming) or 'cancel' (immediate, for batch)."
  type        = string
  default     = "cancel"

  validation {
    condition     = contains(["drain", "cancel"], var.on_delete)
    error_message = "on_delete must be either 'drain' or 'cancel'."
  }
}

variable "skip_wait_on_job_termination" {
  description = "If true, Terraform does not block waiting for the job to fully drain/cancel on destroy."
  type        = bool
  default     = false
}

variable "additional_experiments" {
  description = "List of Dataflow service experiments, e.g. [\"enable_prime\"] or [\"shuffle_mode=service\"]."
  type        = list(string)
  default     = []
}

variable "labels" {
  description = "Resource labels applied to the job for cost attribution and ownership."
  type        = map(string)
  default     = {}
}

outputs.tf

output "job_id" {
  description = "The unique Dataflow job ID."
  value       = google_dataflow_job.this.id
}

output "job_name" {
  description = "The Dataflow job name."
  value       = google_dataflow_job.this.name
}

output "state" {
  description = "Current state of the job (e.g. JOB_STATE_RUNNING)."
  value       = google_dataflow_job.this.state
}

output "type" {
  description = "Job type: JOB_TYPE_STREAMING or JOB_TYPE_BATCH."
  value       = google_dataflow_job.this.type
}

output "worker_service_account_email" {
  description = "Email of the worker service account used by the job."
  value       = local.worker_sa_email
}

output "temp_location" {
  description = "Resolved GCS temp location used by the job."
  value       = local.temp_location
}

How to use it

A streaming Pub/Sub-subscription-to-BigQuery pipeline on private workers, with the module creating a worker SA that also gets BigQuery and Pub/Sub data-plane roles:

module "dataflow" {
  source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-gcp-dataflow?ref=v1.0.0"

  project_id        = "kv-data-prod"
  region            = "europe-west1"
  name              = "psub-to-bq-events"
  template_gcs_path = "gs://dataflow-templates-europe-west1/latest/PubSub_Subscription_to_BigQuery"
  staging_bucket    = "kv-data-prod-dataflow-staging"

  parameters = {
    inputSubscription = "projects/kv-data-prod/subscriptions/events-sub"
    outputTableSpec   = "kv-data-prod:analytics.raw_events"
  }

  # Private, cost-capped streaming workers on the shared VPC.
  subnetwork              = "https://www.googleapis.com/compute/v1/projects/kv-net-host/regions/europe-west1/subnetworks/dataflow-subnet"
  use_public_ips          = false
  enable_streaming_engine = true
  machine_type            = "n2-standard-4"
  max_workers             = 8

  # Least-privilege data-plane roles for the auto-created worker SA.
  additional_worker_roles = [
    "roles/bigquery.dataEditor",
    "roles/bigquery.jobUser",
    "roles/pubsub.subscriber",
  ]

  kms_key_name = "projects/kv-data-prod/locations/europe-west1/keyRings/dataflow/cryptoKeys/jobs"
  on_delete    = "drain" # graceful teardown for streaming

  labels = {
    team        = "data-platform"
    environment = "prod"
    pipeline    = "events-ingest"
  }
}

# Downstream: alert if the streaming job leaves the RUNNING state.
resource "google_monitoring_alert_policy" "dataflow_not_running" {
  project      = "kv-data-prod"
  display_name = "Dataflow ${module.dataflow.job_name} not running"
  combiner     = "OR"

  conditions {
    display_name = "Job state != RUNNING"
    condition_monitoring_query_language {
      duration = "300s"
      query    = <<-MQL
        fetch dataflow_job
        | metric 'dataflow.googleapis.com/job/is_failed'
        | filter (resource.job_name == '${module.dataflow.job_name}')
        | condition val() > 0
      MQL
    }
  }

  notification_channels = [var.oncall_channel_id]
}

With Terragrunt

Terragrunt keeps this module DRY across environments — define the backend and provider once in a root config, then a thin terragrunt.hcl per environment supplies only the inputs that differ.

1. Root configlive/terragrunt.hcl (inherited by every module):

remote_state {
  backend = "gcs"
  generate = { path = "backend.tf", if_exists = "overwrite" }
  config = {
    # ...gcs state bucket/container + key per path...
  }
}

2. Module configlive/prod/dataflow/terragrunt.hcl:

include "root" {
  path = find_in_parent_folders()
}

terraform {
  source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-gcp-dataflow?ref=v1.0.0"
}

inputs = {
  project_id = "..."
  region = "..."
  name = "..."
  template_gcs_path = "..."
  staging_bucket = "..."
}

3. Deploy one environment, or roll out all modules together:

cd live/prod/dataflow && terragrunt apply        # this module
terragrunt run-all apply                      # every module under live/prod

Why Terragrunt here: the backend and provider live in one place instead of being copy-pasted into every module; inputs is overridden per environment (dev / stage / prod) without forking the module; and run-all orchestrates dependencies across modules. Reach for it once you have more than one environment or more than a handful of modules — for a single stack, the plain Quickstart above is enough.

Inputs

Name Type Default Required Description
project_id string Yes GCP project ID where the job runs.
region string Yes Region for the Dataflow job; should match Google-hosted template region.
zone string null No Optional zone to pin workers; changing it forces replacement.
name string Yes Job name (1–63 chars, lowercase, digits, hyphens; validated).
template_gcs_path string Yes gs:// path to the Classic Template (validated to start with gs://).
staging_bucket string Yes Bucket name used to derive temp/staging GCS locations.
temp_location string null No Override temp GCS location; defaults to gs://<staging_bucket>/tmp.
parameters map(string) {} No Template-specific parameters; merged with derived temp/staging.
machine_type string “n1-standard-2” No Worker machine type.
max_workers number 5 No Autoscaling cap (validated 1–1000).
enable_streaming_engine bool true No Enable Streaming Engine for streaming jobs.
network string null No VPC network name for workers.
subnetwork string null No Subnetwork URL/self-link (required pattern for shared VPC).
use_public_ips bool false No If false, workers run without external IPs (WORKER_IP_PRIVATE).
create_service_account bool true No Create a dedicated least-privilege worker SA.
service_account_email string null No Existing worker SA email when not creating one (validated).
additional_worker_roles list(string) [] No Extra project roles for the created worker SA (data-plane access).
kms_key_name string null No CMEK key resource ID for at-rest encryption.
on_delete string “cancel” No Destroy behaviour: drain or cancel (validated).
skip_wait_on_job_termination bool false No Do not block on job termination during destroy.
additional_experiments list(string) [] No Dataflow service experiments (e.g. enable_prime).
labels map(string) {} No Resource labels for cost attribution.

Outputs

Name Description
job_id The unique Dataflow job ID.
job_name The Dataflow job name.
state Current job state (e.g. JOB_STATE_RUNNING).
type Job type: JOB_TYPE_STREAMING or JOB_TYPE_BATCH.
worker_service_account_email Email of the worker service account used by the job.
temp_location Resolved GCS temp location used by the job.

Enterprise scenario

A retail analytics group ingests clickstream and order events through Pub/Sub and needs them in BigQuery within seconds for near-real-time dashboards. They instantiate this module once per environment (dev/staging/prod) from a PubSub_Subscription_to_BigQuery template, each on its own least-privilege worker SA, on private workers inside the shared-VPC dataflow-subnet, with on_delete = "drain" so a terraform destroy during a region migration flushes in-flight records into BigQuery instead of dropping them. CMEK on the temp location satisfies the company’s “no Google-managed keys for customer data” policy, and max_workers = 8 plus enable_streaming_engine = true keeps the steady-state spend predictable while still absorbing Black-Friday traffic spikes.

Best practices

TerraformGCPDataflowModuleIaC
Need this built for real?

Vinod is a Senior Cloud Architect (22+ yrs) — available for Azure / AWS / GCP architecture, landing zones, and migrations.

Work with me

Comments

Keep Reading