IaC GCP

Terraform Module: GCP Datastream — serverless CDC pipelines in one block

Quick take — A reusable hashicorp/google Terraform module for GCP Datastream: MySQL/PostgreSQL CDC streams into BigQuery or GCS, object include/exclude scoping, backfill control, write modes, CMEK, and managed desired_state. New here? Jump to the Quickstart below to deploy it in minutes; read on for how it works and when to reach for it.

Quickstart (copy-paste)

Minimal, runnable configuration — drop this in a .tf file and fill in the "..." placeholders (each required input is commented):

provider "google" {
  project = "my-project"
  region  = "us-central1"
}

module "datastream" {
  source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-gcp-datastream?ref=v1.0.0"

  project_id                   = "..."  # GCP project hosting the stream.
  app                          = "..."  # Workload short name used in `stream_id` (validated lowe…
  environment                  = "..."  # One of `dev`, `staging`, `prod`, `sandbox`.
  location                     = "..."  # Region for the stream; must match the connection profil…
  location_short               = "..."  # Cosmetic region token for naming (e.g. `euw1`).
  source_type                  = "..."  # `mysql` or `postgresql`.
  source_connection_profile_id = "..."  # Fully qualified source connection profile ID.
  destination                  = {}     # BigQuery or GCS destination (see block; `type` discrimi…
}

Then terraform init && terraform apply. Every other input has a sensible default — see Inputs below to override behaviour.

What this module is

Datastream is GCP’s serverless change data capture (CDC) and replication service. It reads the transaction log of an operational database — MySQL binlog, PostgreSQL logical replication slot, or Oracle redo logs — and streams every insert, update, and delete, plus an initial historical backfill, into an analytics destination such as BigQuery or Cloud Storage. There are no workers to size and no Debezium/Kafka cluster to babysit: you point a stream at a source connection profile and a destination connection profile, scope which schemas and tables to replicate, and Datastream keeps the destination continuously in sync at a configurable freshness (down to ~15 seconds for BigQuery).

The raw resource is deceptively deep. A google_datastream_stream is not a flat object — it nests a source_config (which carries one of mysql_source_config / postgresql_source_config / oracle_source_config, each with its own include_objects / exclude_objects tree of databases → tables → columns), a destination_config (which carries either a bigquery_destination_config with single-dataset or per-source-schema dataset templating, a write mode, and data_freshness, or a gcs_destination_config with file rotation and Avro/JSON formatting), and mutually-exclusive backfill_all {} / backfill_none {} blocks. Hand-write that per stream and every team’s blocks drift; forget desired_state and Terraform happily leaves a stream PAUSED (or worse, recreates a running stream because you nudged a nested attribute).

This module wraps google_datastream_stream behind validated, var-driven inputs. You pick a source_type and pass the schema/table scope as plain lists, choose a BigQuery or GCS destination by object, set the write mode and freshness, and the module assembles the correct nested blocks, enforces backfill semantics, and pins desired_state so a stream stays RUNNING (or intentionally PAUSED) on every apply — with consistent app-env-region naming and labels.

When to use it

Reach for Dataflow or a Pub/Sub pipeline instead when you need arbitrary in-flight transformation, enrichment, or fan-out. Datastream’s job is faithful, low-latency CDC from a database into an analytics sink — it shines when the requirement is “keep BigQuery a few seconds behind production with zero servers.”

Module structure

terraform-module-gcp-datastream/
├── versions.tf      # provider + Terraform version pins
├── main.tf          # google_datastream_stream wired for MySQL/PostgreSQL → BigQuery/GCS
├── variables.tf     # var-driven inputs with validation
└── outputs.tf       # stream id/name, state, source/destination profile refs

versions.tf

terraform {
  required_version = ">= 1.5.0"

  required_providers {
    google = {
      source  = "hashicorp/google"
      version = "~> 5.0"
    }
  }
}

main.tf

locals {
  # Consistent app-env-region naming, e.g. "orders-prod-euw1".
  stream_id = "${var.app}-${var.environment}-${var.location_short}"

  # Datastream wants backfill_all {} XOR backfill_none {}. We expose a single
  # boolean and translate it into exactly one of the two blocks below.
  backfill_all  = var.backfill_mode == "all"
  backfill_none = var.backfill_mode == "none"

  # A stream targets BigQuery OR GCS — never both. The validated var.destination
  # object carries a `type` discriminator that gates the dynamic blocks.
  is_bigquery = var.destination.type == "bigquery"
  is_gcs      = var.destination.type == "gcs"

  # MySQL scopes by database; PostgreSQL scopes by schema. Both reduce to the
  # same { name, tables } shape so callers pass one flat structure.
  is_mysql      = var.source_type == "mysql"
  is_postgresql = var.source_type == "postgresql"
}

resource "google_datastream_stream" "this" {
  project       = var.project_id
  location      = var.location
  stream_id     = local.stream_id
  display_name  = coalesce(var.display_name, local.stream_id)
  desired_state = var.desired_state
  labels        = var.labels

  # Optional CMEK. Must live in the same region as the stream.
  customer_managed_encryption_key = var.kms_key_name

  # -- Source -------------------------------------------------------------
  source_config {
    source_connection_profile = var.source_connection_profile_id

    dynamic "mysql_source_config" {
      for_each = local.is_mysql ? [1] : []
      content {
        max_concurrent_cdc_tasks      = var.max_concurrent_cdc_tasks
        max_concurrent_backfill_tasks = var.max_concurrent_backfill_tasks

        dynamic "include_objects" {
          for_each = length(var.included_objects) > 0 ? [1] : []
          content {
            dynamic "mysql_databases" {
              for_each = var.included_objects
              content {
                database = mysql_databases.value.name

                dynamic "mysql_tables" {
                  for_each = mysql_databases.value.tables
                  content {
                    table = mysql_tables.value
                  }
                }
              }
            }
          }
        }
      }
    }

    dynamic "postgresql_source_config" {
      for_each = local.is_postgresql ? [1] : []
      content {
        # Logical replication slot + publication must already exist in Postgres.
        replication_slot              = var.postgresql_replication_slot
        publication                   = var.postgresql_publication
        max_concurrent_backfill_tasks = var.max_concurrent_backfill_tasks

        dynamic "include_objects" {
          for_each = length(var.included_objects) > 0 ? [1] : []
          content {
            dynamic "postgresql_schemas" {
              for_each = var.included_objects
              content {
                schema = postgresql_schemas.value.name

                dynamic "postgresql_tables" {
                  for_each = postgresql_schemas.value.tables
                  content {
                    table = postgresql_tables.value
                  }
                }
              }
            }
          }
        }
      }
    }
  }

  # -- Destination --------------------------------------------------------
  destination_config {
    destination_connection_profile = var.destination.connection_profile_id

    dynamic "bigquery_destination_config" {
      for_each = local.is_bigquery ? [1] : []
      content {
        # Freshness as a duration string, e.g. "900s" (15 min). Lower = more
        # frequent (and more expensive) merges into BigQuery.
        data_freshness = var.destination.data_freshness

        # MERGE keeps a current-state mirror of each table (upserts/deletes
        # applied); APPEND_ONLY lands every change as an immutable row.
        dynamic "merge" {
          for_each = var.destination.write_mode == "merge" ? [1] : []
          content {}
        }

        dynamic "append_only" {
          for_each = var.destination.write_mode == "append_only" ? [1] : []
          content {}
        }

        # single_target_dataset: every source table lands in one dataset.
        dynamic "single_target_dataset" {
          for_each = var.destination.dataset_id != null ? [1] : []
          content {
            dataset_id = var.destination.dataset_id
          }
        }

        # source_hierarchy_datasets: one dataset per source schema, auto-created
        # from a template (prefix + region + optional CMEK).
        dynamic "source_hierarchy_datasets" {
          for_each = var.destination.dataset_template != null ? [1] : []
          content {
            dataset_template {
              location          = var.destination.dataset_template.location
              dataset_id_prefix = var.destination.dataset_template.dataset_id_prefix
              kms_key_name      = var.destination.dataset_template.kms_key_name
            }
          }
        }
      }
    }

    dynamic "gcs_destination_config" {
      for_each = local.is_gcs ? [1] : []
      content {
        path                   = var.destination.path
        file_rotation_mb       = var.destination.file_rotation_mb
        file_rotation_interval = var.destination.file_rotation_interval

        dynamic "avro_file_format" {
          for_each = var.destination.file_format == "avro" ? [1] : []
          content {}
        }

        dynamic "json_file_format" {
          for_each = var.destination.file_format == "json" ? [1] : []
          content {
            schema_file_format = "NO_SCHEMA_FILE"
            compression        = var.destination.gcs_compression
          }
        }
      }
    }
  }

  # Exactly one backfill block. backfill_all replays history before CDC;
  # backfill_none starts from "now" (CDC-only — use when the sink is pre-seeded).
  dynamic "backfill_all" {
    for_each = local.backfill_all ? [1] : []
    content {}
  }

  dynamic "backfill_none" {
    for_each = local.backfill_none ? [1] : []
    content {}
  }

  lifecycle {
    # stream_id is immutable; changing the name forces a brand-new stream and a
    # full re-backfill. Make that an explicit, reviewed action, not an accident.
    ignore_changes = [stream_id]
  }
}

variables.tf

variable "project_id" {
  description = "GCP project ID that will host the Datastream stream."
  type        = string
}

variable "app" {
  description = "Application/workload short name, used in the stream_id (e.g. \"orders\")."
  type        = string

  validation {
    condition     = can(regex("^[a-z][a-z0-9-]{1,24}$", var.app))
    error_message = "app must be lowercase alphanumeric/hyphen, 2-25 chars, starting with a letter."
  }
}

variable "environment" {
  description = "Deployment environment (dev, staging, prod, sandbox)."
  type        = string

  validation {
    condition     = contains(["dev", "staging", "prod", "sandbox"], var.environment)
    error_message = "environment must be one of: dev, staging, prod, sandbox."
  }
}

variable "location" {
  description = "GCP region for the stream. Must match the source/destination connection profiles and any CMEK key (e.g. \"europe-west1\")."
  type        = string
}

variable "location_short" {
  description = "Short region token for naming, e.g. \"euw1\", \"use4\". Cosmetic only."
  type        = string
}

variable "display_name" {
  description = "Human-friendly stream display name. Defaults to the generated stream_id."
  type        = string
  default     = null
}

variable "desired_state" {
  description = "Lifecycle state Terraform keeps the stream in: RUNNING or PAUSED."
  type        = string
  default     = "RUNNING"

  validation {
    condition     = contains(["RUNNING", "PAUSED"], var.desired_state)
    error_message = "desired_state must be RUNNING or PAUSED."
  }
}

variable "source_type" {
  description = "Source database engine: mysql or postgresql."
  type        = string

  validation {
    condition     = contains(["mysql", "postgresql"], var.source_type)
    error_message = "source_type must be mysql or postgresql."
  }
}

variable "source_connection_profile_id" {
  description = "Fully qualified ID of the source connection profile (projects/<p>/locations/<l>/connectionProfiles/<id>)."
  type        = string
}

variable "postgresql_replication_slot" {
  description = "PostgreSQL logical replication slot name. Required when source_type = postgresql; ignored otherwise."
  type        = string
  default     = null
}

variable "postgresql_publication" {
  description = "PostgreSQL publication name backing the replication slot. Required when source_type = postgresql; ignored otherwise."
  type        = string
  default     = null
}

variable "included_objects" {
  description = <<-EOT
    Schemas/databases to replicate, each with its tables. For MySQL `name` is a
    database; for PostgreSQL `name` is a schema. An empty `tables` list means
    "all tables in this database/schema". An empty top-level list means
    "everything the source connection profile can see".
  EOT
  type = list(object({
    name   = string
    tables = optional(list(string), [])
  }))
  default = []
}

variable "backfill_mode" {
  description = "Historical backfill behaviour: \"all\" (replay history then CDC) or \"none\" (CDC-only from now)."
  type        = string
  default     = "all"

  validation {
    condition     = contains(["all", "none"], var.backfill_mode)
    error_message = "backfill_mode must be \"all\" or \"none\"."
  }
}

variable "max_concurrent_cdc_tasks" {
  description = "Max concurrent CDC tasks (MySQL only). Higher = faster CDC, more source load."
  type        = number
  default     = 5

  validation {
    condition     = var.max_concurrent_cdc_tasks >= 0 && var.max_concurrent_cdc_tasks <= 50
    error_message = "max_concurrent_cdc_tasks must be between 0 and 50."
  }
}

variable "max_concurrent_backfill_tasks" {
  description = "Max concurrent backfill tasks. Higher = faster historical load, more source load."
  type        = number
  default     = 12

  validation {
    condition     = var.max_concurrent_backfill_tasks >= 0 && var.max_concurrent_backfill_tasks <= 50
    error_message = "max_concurrent_backfill_tasks must be between 0 and 50."
  }
}

variable "destination" {
  description = <<-EOT
    Destination definition. Set type = "bigquery" OR "gcs".

    BigQuery:
      - connection_profile_id (required)
      - write_mode: "merge" (current-state mirror) or "append_only"
      - data_freshness: duration string, e.g. "900s"
      - EITHER dataset_id (single target dataset)
        OR dataset_template { location, dataset_id_prefix, kms_key_name }
          (one auto-created dataset per source schema)

    GCS:
      - connection_profile_id (required)
      - path: object prefix within the bucket, e.g. "/cdc/orders"
      - file_format: "avro" or "json"
      - file_rotation_mb / file_rotation_interval
      - gcs_compression (json only): "NO_COMPRESSION" or "GZIP"
  EOT
  type = object({
    type                  = string
    connection_profile_id = string

    # BigQuery
    write_mode     = optional(string, "merge")
    data_freshness = optional(string, "900s")
    dataset_id     = optional(string)
    dataset_template = optional(object({
      location          = string
      dataset_id_prefix = optional(string, "")
      kms_key_name      = optional(string)
    }))

    # GCS
    path                   = optional(string, "/")
    file_format            = optional(string, "avro")
    file_rotation_mb       = optional(number, 50)
    file_rotation_interval = optional(string, "60s")
    gcs_compression        = optional(string, "GZIP")
  })

  validation {
    condition     = contains(["bigquery", "gcs"], var.destination.type)
    error_message = "destination.type must be bigquery or gcs."
  }

  validation {
    condition     = var.destination.type != "bigquery" || contains(["merge", "append_only"], var.destination.write_mode)
    error_message = "BigQuery destination.write_mode must be merge or append_only."
  }

  # BigQuery requires exactly one of dataset_id / dataset_template.
  validation {
    condition = var.destination.type != "bigquery" || (
      (var.destination.dataset_id != null) != (var.destination.dataset_template != null)
    )
    error_message = "BigQuery destination requires exactly one of dataset_id OR dataset_template."
  }

  validation {
    condition     = var.destination.type != "gcs" || contains(["avro", "json"], var.destination.file_format)
    error_message = "GCS destination.file_format must be avro or json."
  }

  validation {
    condition     = var.destination.type != "gcs" || contains(["NO_COMPRESSION", "GZIP"], var.destination.gcs_compression)
    error_message = "GCS gcs_compression must be NO_COMPRESSION or GZIP."
  }
}

variable "kms_key_name" {
  description = "Optional CMEK key (projects/<p>/locations/<l>/keyRings/<kr>/cryptoKeys/<k>) for the stream. Must be in var.location."
  type        = string
  default     = null
}

variable "labels" {
  description = "Labels applied to the stream."
  type        = map(string)
  default     = {}
}

outputs.tf

output "stream_id" {
  description = "Fully qualified stream resource ID (projects/<p>/locations/<l>/streams/<id>)."
  value       = google_datastream_stream.this.id
}

output "stream_name" {
  description = "Short stream_id used in the console, gcloud, and API calls."
  value       = google_datastream_stream.this.stream_id
}

output "state" {
  description = "Current state of the stream as reported by the API (RUNNING, PAUSED, ...)."
  value       = google_datastream_stream.this.state
}

output "desired_state" {
  description = "Desired lifecycle state Terraform is enforcing."
  value       = google_datastream_stream.this.desired_state
}

output "source_connection_profile_id" {
  description = "Source connection profile the stream reads from."
  value       = google_datastream_stream.this.source_config[0].source_connection_profile
}

output "destination_connection_profile_id" {
  description = "Destination connection profile the stream writes to."
  value       = google_datastream_stream.this.destination_config[0].destination_connection_profile
}

How to use it

The example replicates two MySQL databases from a Cloud SQL source into a single BigQuery dataset in MERGE mode with 15-second freshness, doing a full historical backfill first. A downstream BigQuery scheduled query consumes the destination dataset using the module output, and a monitoring alert keys off the stream name.

module "datastream" {
  source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-gcp-datastream?ref=v1.0.0"

  project_id     = "kv-data-prod"
  app            = "orders"
  environment    = "prod"
  location       = "europe-west1"
  location_short = "euw1"

  source_type                  = "mysql"
  source_connection_profile_id = "projects/kv-data-prod/locations/europe-west1/connectionProfiles/cloudsql-orders-src"

  # Replicate only the tables analytics needs; leave audit/temp tables out.
  included_objects = [
    {
      name   = "orders"
      tables = ["order_header", "order_line", "payment"]
    },
    {
      name   = "inventory"
      tables = [] # all tables in this database
    },
  ]

  backfill_mode                 = "all"
  max_concurrent_cdc_tasks      = 8
  max_concurrent_backfill_tasks = 16

  destination = {
    type                  = "bigquery"
    connection_profile_id = "projects/kv-data-prod/locations/europe-west1/connectionProfiles/bq-analytics-dst"
    write_mode            = "merge"
    data_freshness        = "900s"
    dataset_id            = "raw_orders_cdc"
  }

  kms_key_name = "projects/kv-data-prod/locations/europe-west1/keyRings/data-kr/cryptoKeys/datastream"

  labels = {
    team        = "data-platform"
    cost-center = "kv-1042"
    workload    = "orders-cdc"
  }
}

# Downstream: a BigQuery scheduled query that transforms the CDC-mirrored
# tables. It depends on the stream so the dataset exists and is flowing first.
resource "google_bigquery_data_transfer_config" "orders_mart" {
  project                = "kv-data-prod"
  location               = "europe-west1"
  display_name           = "orders-mart-refresh"
  data_source_id         = "scheduled_query"
  schedule               = "every 1 hours"
  destination_dataset_id = "marts"

  params = {
    query = "SELECT * FROM `kv-data-prod.raw_orders_cdc.order_header` WHERE status = 'PAID'"
  }

  # Don't build the mart until the stream that feeds raw_orders_cdc is RUNNING.
  depends_on = [module.datastream]
}

output "orders_stream_state" {
  value = module.datastream.state
}

With Terragrunt

Terragrunt keeps this module DRY across environments — define the backend and provider once in a root config, then a thin terragrunt.hcl per environment supplies only the inputs that differ.

1. Root configlive/terragrunt.hcl (inherited by every module):

remote_state {
  backend = "gcs"
  generate = { path = "backend.tf", if_exists = "overwrite" }
  config = {
    # ...gcs state bucket/container + key per path...
  }
}

2. Module configlive/prod/datastream/terragrunt.hcl:

include "root" {
  path = find_in_parent_folders()
}

terraform {
  source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-gcp-datastream?ref=v1.0.0"
}

inputs = {
  project_id = "..."
  app = "..."
  environment = "..."
  location = "..."
  location_short = "..."
  source_type = "..."
  source_connection_profile_id = "..."
  destination = {}
}

3. Deploy one environment, or roll out all modules together:

cd live/prod/datastream && terragrunt apply        # this module
terragrunt run-all apply                      # every module under live/prod

Why Terragrunt here: the backend and provider live in one place instead of being copy-pasted into every module; inputs is overridden per environment (dev / stage / prod) without forking the module; and run-all orchestrates dependencies across modules. Reach for it once you have more than one environment or more than a handful of modules — for a single stack, the plain Quickstart above is enough.

Inputs

Name Type Default Required Description
project_id string Yes GCP project hosting the stream.
app string Yes Workload short name used in stream_id (validated lowercase).
environment string Yes One of dev, staging, prod, sandbox.
location string Yes Region for the stream; must match the connection profiles and CMEK key.
location_short string Yes Cosmetic region token for naming (e.g. euw1).
display_name string null No Display name; defaults to the generated stream_id.
desired_state string "RUNNING" No RUNNING or PAUSED — the state Terraform enforces.
source_type string Yes mysql or postgresql.
source_connection_profile_id string Yes Fully qualified source connection profile ID.
postgresql_replication_slot string null No Logical replication slot (required when source_type = postgresql).
postgresql_publication string null No Publication backing the slot (required when source_type = postgresql).
included_objects list(object) [] No Databases/schemas (name) and their tables to replicate; empty = all.
backfill_mode string "all" No all (history then CDC) or none (CDC-only).
max_concurrent_cdc_tasks number 5 No Concurrent CDC tasks (MySQL); 0–50.
max_concurrent_backfill_tasks number 12 No Concurrent backfill tasks; 0–50.
destination object Yes BigQuery or GCS destination (see block; type discriminates).
kms_key_name string null No CMEK key for the stream; must be in location.
labels map(string) {} No Labels applied to the stream.

Outputs

Name Description
stream_id Fully qualified stream resource ID (projects/<p>/locations/<l>/streams/<id>).
stream_name Short stream_id used in console, gcloud, and API calls.
state Current API-reported state (RUNNING, PAUSED, …).
desired_state Desired lifecycle state Terraform is enforcing.
source_connection_profile_id Source connection profile the stream reads from.
destination_connection_profile_id Destination connection profile the stream writes to.

Enterprise scenario

A retail data-platform team replicates their Cloud SQL for PostgreSQL orders and inventory schemas into BigQuery so the analytics warehouse stays ~15 seconds behind production. They deploy this module once per environment with source_type = "postgresql", a pre-created replication_slot / publication, included_objects scoped to the eight tables the marts actually need, and a BigQuery destination in merge mode with data_freshness = "900s" and a dataset_template so each schema lands in its own CMEK-encrypted dataset. The historical load runs once via backfill_mode = "all", after which the stream is CDC-only; desired_state = "RUNNING" keeps Terraform from silently pausing it, and the state output feeds a Cloud Monitoring alert so on-call is paged if a stream stalls — onboarding a new source database is a copy-paste of one module block plus its connection-profile ID.

Best practices

TerraformGCPDatastreamModuleIaC
Need this built for real?

Vinod is a Senior Cloud Architect (22+ yrs) — available for Azure / AWS / GCP architecture, landing zones, and migrations.

Work with me

Comments

Keep Reading