Quick take — Reusable hashicorp/google Terraform module for google_dataflow_job: deploy Dataflow batch and streaming pipelines from Classic or Flex Templates with a dedicated worker SA, private networking, and KMS encryption. New here? Jump to the Quickstart below to deploy it in minutes; read on for how it works and when to reach for it.
Quickstart (copy-paste)
Minimal, runnable configuration — drop this in a .tf file and fill in the "..." placeholders (each required input is commented):
provider "google" {
project = "my-project"
region = "us-central1"
}
module "dataflow" {
source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-gcp-dataflow?ref=v1.0.0"
project_id = "..." # GCP project ID where the job runs.
region = "..." # Region for the Dataflow job; should match Google-hosted…
name = "..." # Job name (1–63 chars, lowercase, digits, hyphens; valid…
template_gcs_path = "..." # gs:// path to the Classic Template (validated to start …
staging_bucket = "..." # Bucket name used to derive temp/staging GCS locations.
}
Then terraform init && terraform apply. Every other input has a sensible default — see Inputs below to override behaviour.
What this module is
Google Cloud Dataflow is the managed, autoscaling runner for Apache Beam pipelines. It executes both streaming jobs (unbounded sources like Pub/Sub) and batch jobs (bounded sources like Cloud Storage or BigQuery), handling worker provisioning, autoscaling, shuffle, watermarking, and exactly-once semantics so you don’t run a Spark/Flink cluster yourself.
In Terraform, the google_dataflow_job resource launches a job from a Classic Template (a pre-staged *.json template path in GCS, e.g. Google’s gs://dataflow-templates-<region>/... library) by setting template_gcs_path. It is a quirky resource: it is not a plain CRUD object. Terraform “creates” a job by submitting it to the Dataflow service, and a streaming job keeps running until it is drained or cancelled. Many parameters (zone, machine_type, network, max_workers, the parameters map) force replacement when changed, and on destroy the provider issues a drain/cancel rather than a delete.
Wrapping it in a module pays off because the surrounding infrastructure is the hard part and is identical for every pipeline: a least-privilege worker service account, the IAM roles that SA needs (roles/dataflow.worker, plus data-plane roles), a private worker pool (no public IPs, Private Google Access), a CMEK key for at-rest encryption, and a sane on_delete policy. The module makes those defaults secure-by-default and turns “launch a templated pipeline” into a 15-line module block.
When to use it
- You run Google-provided or in-house Classic Templates (Pub/Sub-to-BigQuery, GCS-Text-to-BigQuery, JDBC-to-BigQuery, Bulk Compress, etc.) and want them codified and reviewable rather than launched from
gcloudby hand. - You need streaming pipelines (
on_delete = "drain") that must be torn down gracefully without losing in-flight data, or repeatable batch jobs (on_delete = "cancel"). - You want enforced guardrails: no public IPs on workers, a shared-VPC subnet, Streaming Engine on, autoscaling caps to control spend, and CMEK.
- You’re standardising many similar pipelines across teams/environments and want one audited module instead of copy-pasted
gcloud dataflow jobs runscripts.
If you need a full custom pipeline packaged as a Flex Template container, use google_dataflow_flex_template_job instead — this module targets the Classic-template google_dataflow_job resource, which covers the large catalogue of Google templates and any classic template your team stages.
Module structure
terraform-module-gcp-dataflow/
├── versions.tf
├── main.tf
├── variables.tf
└── outputs.tf
versions.tf
terraform {
required_version = ">= 1.5.0"
required_providers {
google = {
source = "hashicorp/google"
version = "~> 5.0"
}
}
}
main.tf
locals {
# Dataflow temp/staging must live under a GCS path; derive a sane default.
temp_location = var.temp_location != null ? var.temp_location : "gs://${var.staging_bucket}/tmp"
staging_location = "gs://${var.staging_bucket}/staging"
# Roles the worker SA needs to run jobs + emit logs/metrics. Data-plane
# roles (bigquery.dataEditor, pubsub.subscriber, ...) are passed by callers.
base_worker_roles = [
"roles/dataflow.worker",
"roles/storage.objectAdmin",
"roles/monitoring.metricWriter",
"roles/logging.logWriter",
]
worker_roles = toset(concat(local.base_worker_roles, var.additional_worker_roles))
# Job-level parameters always include temp/staging unless the caller overrode.
job_parameters = merge(
{
tempLocation = local.temp_location
stagingLocation = local.staging_location
},
var.parameters,
)
}
# Dedicated, least-privilege worker service account for this pipeline.
resource "google_service_account" "worker" {
count = var.create_service_account ? 1 : 0
project = var.project_id
account_id = "df-${substr(var.name, 0, 26)}"
display_name = "Dataflow worker SA for ${var.name}"
}
locals {
worker_sa_email = var.create_service_account ? google_service_account.worker[0].email : var.service_account_email
}
# Grant the worker SA the roles it needs at the project scope.
resource "google_project_iam_member" "worker" {
for_each = var.create_service_account ? local.worker_roles : toset([])
project = var.project_id
role = each.value
member = "serviceAccount:${google_service_account.worker[0].email}"
}
# The Dataflow job itself, launched from a Classic Template in GCS.
resource "google_dataflow_job" "this" {
project = var.project_id
region = var.region
zone = var.zone
name = var.name
template_gcs_path = var.template_gcs_path
temp_gcs_location = local.temp_location
parameters = local.job_parameters
# Worker sizing & autoscaling.
machine_type = var.machine_type
max_workers = var.max_workers
enable_streaming_engine = var.enable_streaming_engine
# Networking: private workers on a shared/standalone VPC subnet.
network = var.network
subnetwork = var.subnetwork
ip_configuration = var.use_public_ips ? "WORKER_IP_PUBLIC" : "WORKER_IP_PRIVATE"
# Identity & encryption.
service_account_email = local.worker_sa_email
kms_key_name = var.kms_key_name
# Lifecycle: drain streaming jobs, cancel batch jobs.
on_delete = var.on_delete
skip_wait_on_job_termination = var.skip_wait_on_job_termination
additional_experiments = var.additional_experiments
labels = var.labels
# IAM bindings must exist before the service tries to assume the SA.
depends_on = [google_project_iam_member.worker]
}
variables.tf
variable "project_id" {
description = "GCP project ID where the Dataflow job runs."
type = string
}
variable "region" {
description = "Region for the Dataflow job (e.g. europe-west1). Should match the template's region for Google-hosted templates."
type = string
}
variable "zone" {
description = "Optional zone to pin workers to. Leave null to let Dataflow pick within the region. Changing this forces job replacement."
type = string
default = null
}
variable "name" {
description = "Job name. Must be unique among active jobs in the region; lowercase letters, digits and hyphens only."
type = string
validation {
condition = can(regex("^[a-z]([-a-z0-9]{0,61}[a-z0-9])?$", var.name))
error_message = "name must be 1-63 chars, start with a lowercase letter, and contain only lowercase letters, digits and hyphens."
}
}
variable "template_gcs_path" {
description = "GCS path to the Classic Template, e.g. gs://dataflow-templates-europe-west1/latest/PubSub_Subscription_to_BigQuery."
type = string
validation {
condition = can(regex("^gs://", var.template_gcs_path))
error_message = "template_gcs_path must be a gs:// URI to a staged Classic Template."
}
}
variable "staging_bucket" {
description = "GCS bucket name (no gs:// prefix) used to derive temp and staging locations."
type = string
}
variable "temp_location" {
description = "Override for the temp GCS location. Defaults to gs://<staging_bucket>/tmp."
type = string
default = null
}
variable "parameters" {
description = "Template-specific parameters passed to the job (e.g. inputSubscription, outputTableSpec). Merged with derived temp/staging locations."
type = map(string)
default = {}
}
variable "machine_type" {
description = "Compute Engine machine type for workers (e.g. n2-standard-4)."
type = string
default = "n1-standard-2"
}
variable "max_workers" {
description = "Maximum number of workers for autoscaling. Caps cost for streaming jobs."
type = number
default = 5
validation {
condition = var.max_workers >= 1 && var.max_workers <= 1000
error_message = "max_workers must be between 1 and 1000."
}
}
variable "enable_streaming_engine" {
description = "Use Streaming Engine (offloads state/shuffle from workers). Recommended true for streaming pipelines."
type = bool
default = true
}
variable "network" {
description = "VPC network name for workers. Use with subnetwork for shared-VPC deployments."
type = string
default = null
}
variable "subnetwork" {
description = "Full subnetwork URL, e.g. regions/europe-west1/subnetworks/dataflow-subnet (or a full https self-link for shared VPC)."
type = string
default = null
}
variable "use_public_ips" {
description = "If false (recommended), workers get no external IPs and rely on Private Google Access / NAT."
type = bool
default = false
}
variable "create_service_account" {
description = "Create a dedicated least-privilege worker SA. If false, supply service_account_email."
type = bool
default = true
}
variable "service_account_email" {
description = "Existing worker SA email to use when create_service_account is false."
type = string
default = null
validation {
condition = var.service_account_email == null || can(regex("@", var.service_account_email))
error_message = "service_account_email must be a valid service account email or null."
}
}
variable "additional_worker_roles" {
description = "Extra project IAM roles for the created worker SA (data-plane access), e.g. roles/bigquery.dataEditor, roles/pubsub.subscriber."
type = list(string)
default = []
}
variable "kms_key_name" {
description = "Optional CMEK key (full resource ID) to encrypt job state and temp data at rest."
type = string
default = null
}
variable "on_delete" {
description = "What to do on terraform destroy: 'drain' (graceful, for streaming) or 'cancel' (immediate, for batch)."
type = string
default = "cancel"
validation {
condition = contains(["drain", "cancel"], var.on_delete)
error_message = "on_delete must be either 'drain' or 'cancel'."
}
}
variable "skip_wait_on_job_termination" {
description = "If true, Terraform does not block waiting for the job to fully drain/cancel on destroy."
type = bool
default = false
}
variable "additional_experiments" {
description = "List of Dataflow service experiments, e.g. [\"enable_prime\"] or [\"shuffle_mode=service\"]."
type = list(string)
default = []
}
variable "labels" {
description = "Resource labels applied to the job for cost attribution and ownership."
type = map(string)
default = {}
}
outputs.tf
output "job_id" {
description = "The unique Dataflow job ID."
value = google_dataflow_job.this.id
}
output "job_name" {
description = "The Dataflow job name."
value = google_dataflow_job.this.name
}
output "state" {
description = "Current state of the job (e.g. JOB_STATE_RUNNING)."
value = google_dataflow_job.this.state
}
output "type" {
description = "Job type: JOB_TYPE_STREAMING or JOB_TYPE_BATCH."
value = google_dataflow_job.this.type
}
output "worker_service_account_email" {
description = "Email of the worker service account used by the job."
value = local.worker_sa_email
}
output "temp_location" {
description = "Resolved GCS temp location used by the job."
value = local.temp_location
}
How to use it
A streaming Pub/Sub-subscription-to-BigQuery pipeline on private workers, with the module creating a worker SA that also gets BigQuery and Pub/Sub data-plane roles:
module "dataflow" {
source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-gcp-dataflow?ref=v1.0.0"
project_id = "kv-data-prod"
region = "europe-west1"
name = "psub-to-bq-events"
template_gcs_path = "gs://dataflow-templates-europe-west1/latest/PubSub_Subscription_to_BigQuery"
staging_bucket = "kv-data-prod-dataflow-staging"
parameters = {
inputSubscription = "projects/kv-data-prod/subscriptions/events-sub"
outputTableSpec = "kv-data-prod:analytics.raw_events"
}
# Private, cost-capped streaming workers on the shared VPC.
subnetwork = "https://www.googleapis.com/compute/v1/projects/kv-net-host/regions/europe-west1/subnetworks/dataflow-subnet"
use_public_ips = false
enable_streaming_engine = true
machine_type = "n2-standard-4"
max_workers = 8
# Least-privilege data-plane roles for the auto-created worker SA.
additional_worker_roles = [
"roles/bigquery.dataEditor",
"roles/bigquery.jobUser",
"roles/pubsub.subscriber",
]
kms_key_name = "projects/kv-data-prod/locations/europe-west1/keyRings/dataflow/cryptoKeys/jobs"
on_delete = "drain" # graceful teardown for streaming
labels = {
team = "data-platform"
environment = "prod"
pipeline = "events-ingest"
}
}
# Downstream: alert if the streaming job leaves the RUNNING state.
resource "google_monitoring_alert_policy" "dataflow_not_running" {
project = "kv-data-prod"
display_name = "Dataflow ${module.dataflow.job_name} not running"
combiner = "OR"
conditions {
display_name = "Job state != RUNNING"
condition_monitoring_query_language {
duration = "300s"
query = <<-MQL
fetch dataflow_job
| metric 'dataflow.googleapis.com/job/is_failed'
| filter (resource.job_name == '${module.dataflow.job_name}')
| condition val() > 0
MQL
}
}
notification_channels = [var.oncall_channel_id]
}
With Terragrunt
Terragrunt keeps this module DRY across environments — define the backend and provider once in a root config, then a thin terragrunt.hcl per environment supplies only the inputs that differ.
1. Root config — live/terragrunt.hcl (inherited by every module):
remote_state {
backend = "gcs"
generate = { path = "backend.tf", if_exists = "overwrite" }
config = {
# ...gcs state bucket/container + key per path...
}
}
2. Module config — live/prod/dataflow/terragrunt.hcl:
include "root" {
path = find_in_parent_folders()
}
terraform {
source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-gcp-dataflow?ref=v1.0.0"
}
inputs = {
project_id = "..."
region = "..."
name = "..."
template_gcs_path = "..."
staging_bucket = "..."
}
3. Deploy one environment, or roll out all modules together:
cd live/prod/dataflow && terragrunt apply # this module
terragrunt run-all apply # every module under live/prod
Why Terragrunt here: the backend and provider live in one place instead of being copy-pasted into every module; inputs is overridden per environment (dev / stage / prod) without forking the module; and run-all orchestrates dependencies across modules. Reach for it once you have more than one environment or more than a handful of modules — for a single stack, the plain Quickstart above is enough.
Inputs
| Name | Type | Default | Required | Description |
|---|---|---|---|---|
| project_id | string | — | Yes | GCP project ID where the job runs. |
| region | string | — | Yes | Region for the Dataflow job; should match Google-hosted template region. |
| zone | string | null | No | Optional zone to pin workers; changing it forces replacement. |
| name | string | — | Yes | Job name (1–63 chars, lowercase, digits, hyphens; validated). |
| template_gcs_path | string | — | Yes | gs:// path to the Classic Template (validated to start with gs://). |
| staging_bucket | string | — | Yes | Bucket name used to derive temp/staging GCS locations. |
| temp_location | string | null | No | Override temp GCS location; defaults to gs://<staging_bucket>/tmp. |
| parameters | map(string) | {} | No | Template-specific parameters; merged with derived temp/staging. |
| machine_type | string | “n1-standard-2” | No | Worker machine type. |
| max_workers | number | 5 | No | Autoscaling cap (validated 1–1000). |
| enable_streaming_engine | bool | true | No | Enable Streaming Engine for streaming jobs. |
| network | string | null | No | VPC network name for workers. |
| subnetwork | string | null | No | Subnetwork URL/self-link (required pattern for shared VPC). |
| use_public_ips | bool | false | No | If false, workers run without external IPs (WORKER_IP_PRIVATE). |
| create_service_account | bool | true | No | Create a dedicated least-privilege worker SA. |
| service_account_email | string | null | No | Existing worker SA email when not creating one (validated). |
| additional_worker_roles | list(string) | [] | No | Extra project roles for the created worker SA (data-plane access). |
| kms_key_name | string | null | No | CMEK key resource ID for at-rest encryption. |
| on_delete | string | “cancel” | No | Destroy behaviour: drain or cancel (validated). |
| skip_wait_on_job_termination | bool | false | No | Do not block on job termination during destroy. |
| additional_experiments | list(string) | [] | No | Dataflow service experiments (e.g. enable_prime). |
| labels | map(string) | {} | No | Resource labels for cost attribution. |
Outputs
| Name | Description |
|---|---|
| job_id | The unique Dataflow job ID. |
| job_name | The Dataflow job name. |
| state | Current job state (e.g. JOB_STATE_RUNNING). |
| type | Job type: JOB_TYPE_STREAMING or JOB_TYPE_BATCH. |
| worker_service_account_email | Email of the worker service account used by the job. |
| temp_location | Resolved GCS temp location used by the job. |
Enterprise scenario
A retail analytics group ingests clickstream and order events through Pub/Sub and needs them in BigQuery within seconds for near-real-time dashboards. They instantiate this module once per environment (dev/staging/prod) from a PubSub_Subscription_to_BigQuery template, each on its own least-privilege worker SA, on private workers inside the shared-VPC dataflow-subnet, with on_delete = "drain" so a terraform destroy during a region migration flushes in-flight records into BigQuery instead of dropping them. CMEK on the temp location satisfies the company’s “no Google-managed keys for customer data” policy, and max_workers = 8 plus enable_streaming_engine = true keeps the steady-state spend predictable while still absorbing Black-Friday traffic spikes.
Best practices
- Always run private workers. Keep
use_public_ips = falseand provide asubnetworkwith Private Google Access (and Cloud NAT for any external egress). Public worker IPs are a common, avoidable attack surface and add per-IP cost. - One least-privilege SA per pipeline. Let the module create a dedicated worker SA and grant only the data-plane roles the template needs via
additional_worker_roles(e.g.roles/pubsub.subscriber,roles/bigquery.dataEditor) — never reuse the default Compute Engine SA. - Match
on_deleteto job type. Usedrainfor streaming jobs so buffered data is committed before shutdown, andcancelfor batch jobs where an immediate stop is fine. A wrong choice here silently loses data or wastes worker time. - Cap autoscaling and right-size workers. Set
max_workersdeliberately and turn onenable_streaming_engineso state/shuffle moves off VMs, shrinking worker disks and letting you use smallermachine_types — Dataflow billing is dominated by worker-vCPU-hours. - Encrypt with CMEK and label everything. Pass
kms_key_nameto encrypt job state and temp data at rest, and apply consistentlabels(team, environment, pipeline) so Dataflow spend is attributable in billing exports. - Treat parameter/zone/network changes as replacements. Because
google_dataflow_jobforces a new job when these change, version your template path (pinlatestto a dated template for prod) and plan changes during maintenance windows to avoid surprise streaming-job restarts.