Quick take — A reusable hashicorp/google Terraform module for GCP Datastream: MySQL/PostgreSQL CDC streams into BigQuery or GCS, object include/exclude scoping, backfill control, write modes, CMEK, and managed desired_state. New here? Jump to the Quickstart below to deploy it in minutes; read on for how it works and when to reach for it.
Quickstart (copy-paste)
Minimal, runnable configuration — drop this in a .tf file and fill in the "..." placeholders (each required input is commented):
provider "google" {
project = "my-project"
region = "us-central1"
}
module "datastream" {
source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-gcp-datastream?ref=v1.0.0"
project_id = "..." # GCP project hosting the stream.
app = "..." # Workload short name used in `stream_id` (validated lowe…
environment = "..." # One of `dev`, `staging`, `prod`, `sandbox`.
location = "..." # Region for the stream; must match the connection profil…
location_short = "..." # Cosmetic region token for naming (e.g. `euw1`).
source_type = "..." # `mysql` or `postgresql`.
source_connection_profile_id = "..." # Fully qualified source connection profile ID.
destination = {} # BigQuery or GCS destination (see block; `type` discrimi…
}
Then terraform init && terraform apply. Every other input has a sensible default — see Inputs below to override behaviour.
What this module is
Datastream is GCP’s serverless change data capture (CDC) and replication service. It reads the transaction log of an operational database — MySQL binlog, PostgreSQL logical replication slot, or Oracle redo logs — and streams every insert, update, and delete, plus an initial historical backfill, into an analytics destination such as BigQuery or Cloud Storage. There are no workers to size and no Debezium/Kafka cluster to babysit: you point a stream at a source connection profile and a destination connection profile, scope which schemas and tables to replicate, and Datastream keeps the destination continuously in sync at a configurable freshness (down to ~15 seconds for BigQuery).
The raw resource is deceptively deep. A google_datastream_stream is not a flat object — it nests a source_config (which carries one of mysql_source_config / postgresql_source_config / oracle_source_config, each with its own include_objects / exclude_objects tree of databases → tables → columns), a destination_config (which carries either a bigquery_destination_config with single-dataset or per-source-schema dataset templating, a write mode, and data_freshness, or a gcs_destination_config with file rotation and Avro/JSON formatting), and mutually-exclusive backfill_all {} / backfill_none {} blocks. Hand-write that per stream and every team’s blocks drift; forget desired_state and Terraform happily leaves a stream PAUSED (or worse, recreates a running stream because you nudged a nested attribute).
This module wraps google_datastream_stream behind validated, var-driven inputs. You pick a source_type and pass the schema/table scope as plain lists, choose a BigQuery or GCS destination by object, set the write mode and freshness, and the module assembles the correct nested blocks, enforces backfill semantics, and pins desired_state so a stream stays RUNNING (or intentionally PAUSED) on every apply — with consistent app-env-region naming and labels.
When to use it
- You need near-real-time replication of an operational database into BigQuery for analytics, dashboards, or ML feature pipelines without standing up Debezium, Kafka Connect, or a Dataflow CDC template.
- You are running a database migration or hybrid sync (on-prem or Cloud SQL MySQL/PostgreSQL → GCP) and want a managed, log-based stream with an automatic historical backfill plus ongoing CDC.
- You want to land raw change events in Cloud Storage as Avro/JSON for a data-lake / medallion ingestion layer that other tools (Dataflow, BigQuery external tables, Spark) consume.
- You need fine-grained scoping — replicate only specific schemas/tables (and exclude noisy audit or temp tables) — expressed declaratively rather than clicked into the console.
- You are standardizing a platform team’s data-movement layer and want every stream to carry the same labels, CMEK, freshness, and
desired_stateguarantees.
Reach for Dataflow or a Pub/Sub pipeline instead when you need arbitrary in-flight transformation, enrichment, or fan-out. Datastream’s job is faithful, low-latency CDC from a database into an analytics sink — it shines when the requirement is “keep BigQuery a few seconds behind production with zero servers.”
Module structure
terraform-module-gcp-datastream/
├── versions.tf # provider + Terraform version pins
├── main.tf # google_datastream_stream wired for MySQL/PostgreSQL → BigQuery/GCS
├── variables.tf # var-driven inputs with validation
└── outputs.tf # stream id/name, state, source/destination profile refs
versions.tf
terraform {
required_version = ">= 1.5.0"
required_providers {
google = {
source = "hashicorp/google"
version = "~> 5.0"
}
}
}
main.tf
locals {
# Consistent app-env-region naming, e.g. "orders-prod-euw1".
stream_id = "${var.app}-${var.environment}-${var.location_short}"
# Datastream wants backfill_all {} XOR backfill_none {}. We expose a single
# boolean and translate it into exactly one of the two blocks below.
backfill_all = var.backfill_mode == "all"
backfill_none = var.backfill_mode == "none"
# A stream targets BigQuery OR GCS — never both. The validated var.destination
# object carries a `type` discriminator that gates the dynamic blocks.
is_bigquery = var.destination.type == "bigquery"
is_gcs = var.destination.type == "gcs"
# MySQL scopes by database; PostgreSQL scopes by schema. Both reduce to the
# same { name, tables } shape so callers pass one flat structure.
is_mysql = var.source_type == "mysql"
is_postgresql = var.source_type == "postgresql"
}
resource "google_datastream_stream" "this" {
project = var.project_id
location = var.location
stream_id = local.stream_id
display_name = coalesce(var.display_name, local.stream_id)
desired_state = var.desired_state
labels = var.labels
# Optional CMEK. Must live in the same region as the stream.
customer_managed_encryption_key = var.kms_key_name
# -- Source -------------------------------------------------------------
source_config {
source_connection_profile = var.source_connection_profile_id
dynamic "mysql_source_config" {
for_each = local.is_mysql ? [1] : []
content {
max_concurrent_cdc_tasks = var.max_concurrent_cdc_tasks
max_concurrent_backfill_tasks = var.max_concurrent_backfill_tasks
dynamic "include_objects" {
for_each = length(var.included_objects) > 0 ? [1] : []
content {
dynamic "mysql_databases" {
for_each = var.included_objects
content {
database = mysql_databases.value.name
dynamic "mysql_tables" {
for_each = mysql_databases.value.tables
content {
table = mysql_tables.value
}
}
}
}
}
}
}
}
dynamic "postgresql_source_config" {
for_each = local.is_postgresql ? [1] : []
content {
# Logical replication slot + publication must already exist in Postgres.
replication_slot = var.postgresql_replication_slot
publication = var.postgresql_publication
max_concurrent_backfill_tasks = var.max_concurrent_backfill_tasks
dynamic "include_objects" {
for_each = length(var.included_objects) > 0 ? [1] : []
content {
dynamic "postgresql_schemas" {
for_each = var.included_objects
content {
schema = postgresql_schemas.value.name
dynamic "postgresql_tables" {
for_each = postgresql_schemas.value.tables
content {
table = postgresql_tables.value
}
}
}
}
}
}
}
}
}
# -- Destination --------------------------------------------------------
destination_config {
destination_connection_profile = var.destination.connection_profile_id
dynamic "bigquery_destination_config" {
for_each = local.is_bigquery ? [1] : []
content {
# Freshness as a duration string, e.g. "900s" (15 min). Lower = more
# frequent (and more expensive) merges into BigQuery.
data_freshness = var.destination.data_freshness
# MERGE keeps a current-state mirror of each table (upserts/deletes
# applied); APPEND_ONLY lands every change as an immutable row.
dynamic "merge" {
for_each = var.destination.write_mode == "merge" ? [1] : []
content {}
}
dynamic "append_only" {
for_each = var.destination.write_mode == "append_only" ? [1] : []
content {}
}
# single_target_dataset: every source table lands in one dataset.
dynamic "single_target_dataset" {
for_each = var.destination.dataset_id != null ? [1] : []
content {
dataset_id = var.destination.dataset_id
}
}
# source_hierarchy_datasets: one dataset per source schema, auto-created
# from a template (prefix + region + optional CMEK).
dynamic "source_hierarchy_datasets" {
for_each = var.destination.dataset_template != null ? [1] : []
content {
dataset_template {
location = var.destination.dataset_template.location
dataset_id_prefix = var.destination.dataset_template.dataset_id_prefix
kms_key_name = var.destination.dataset_template.kms_key_name
}
}
}
}
}
dynamic "gcs_destination_config" {
for_each = local.is_gcs ? [1] : []
content {
path = var.destination.path
file_rotation_mb = var.destination.file_rotation_mb
file_rotation_interval = var.destination.file_rotation_interval
dynamic "avro_file_format" {
for_each = var.destination.file_format == "avro" ? [1] : []
content {}
}
dynamic "json_file_format" {
for_each = var.destination.file_format == "json" ? [1] : []
content {
schema_file_format = "NO_SCHEMA_FILE"
compression = var.destination.gcs_compression
}
}
}
}
}
# Exactly one backfill block. backfill_all replays history before CDC;
# backfill_none starts from "now" (CDC-only — use when the sink is pre-seeded).
dynamic "backfill_all" {
for_each = local.backfill_all ? [1] : []
content {}
}
dynamic "backfill_none" {
for_each = local.backfill_none ? [1] : []
content {}
}
lifecycle {
# stream_id is immutable; changing the name forces a brand-new stream and a
# full re-backfill. Make that an explicit, reviewed action, not an accident.
ignore_changes = [stream_id]
}
}
variables.tf
variable "project_id" {
description = "GCP project ID that will host the Datastream stream."
type = string
}
variable "app" {
description = "Application/workload short name, used in the stream_id (e.g. \"orders\")."
type = string
validation {
condition = can(regex("^[a-z][a-z0-9-]{1,24}$", var.app))
error_message = "app must be lowercase alphanumeric/hyphen, 2-25 chars, starting with a letter."
}
}
variable "environment" {
description = "Deployment environment (dev, staging, prod, sandbox)."
type = string
validation {
condition = contains(["dev", "staging", "prod", "sandbox"], var.environment)
error_message = "environment must be one of: dev, staging, prod, sandbox."
}
}
variable "location" {
description = "GCP region for the stream. Must match the source/destination connection profiles and any CMEK key (e.g. \"europe-west1\")."
type = string
}
variable "location_short" {
description = "Short region token for naming, e.g. \"euw1\", \"use4\". Cosmetic only."
type = string
}
variable "display_name" {
description = "Human-friendly stream display name. Defaults to the generated stream_id."
type = string
default = null
}
variable "desired_state" {
description = "Lifecycle state Terraform keeps the stream in: RUNNING or PAUSED."
type = string
default = "RUNNING"
validation {
condition = contains(["RUNNING", "PAUSED"], var.desired_state)
error_message = "desired_state must be RUNNING or PAUSED."
}
}
variable "source_type" {
description = "Source database engine: mysql or postgresql."
type = string
validation {
condition = contains(["mysql", "postgresql"], var.source_type)
error_message = "source_type must be mysql or postgresql."
}
}
variable "source_connection_profile_id" {
description = "Fully qualified ID of the source connection profile (projects/<p>/locations/<l>/connectionProfiles/<id>)."
type = string
}
variable "postgresql_replication_slot" {
description = "PostgreSQL logical replication slot name. Required when source_type = postgresql; ignored otherwise."
type = string
default = null
}
variable "postgresql_publication" {
description = "PostgreSQL publication name backing the replication slot. Required when source_type = postgresql; ignored otherwise."
type = string
default = null
}
variable "included_objects" {
description = <<-EOT
Schemas/databases to replicate, each with its tables. For MySQL `name` is a
database; for PostgreSQL `name` is a schema. An empty `tables` list means
"all tables in this database/schema". An empty top-level list means
"everything the source connection profile can see".
EOT
type = list(object({
name = string
tables = optional(list(string), [])
}))
default = []
}
variable "backfill_mode" {
description = "Historical backfill behaviour: \"all\" (replay history then CDC) or \"none\" (CDC-only from now)."
type = string
default = "all"
validation {
condition = contains(["all", "none"], var.backfill_mode)
error_message = "backfill_mode must be \"all\" or \"none\"."
}
}
variable "max_concurrent_cdc_tasks" {
description = "Max concurrent CDC tasks (MySQL only). Higher = faster CDC, more source load."
type = number
default = 5
validation {
condition = var.max_concurrent_cdc_tasks >= 0 && var.max_concurrent_cdc_tasks <= 50
error_message = "max_concurrent_cdc_tasks must be between 0 and 50."
}
}
variable "max_concurrent_backfill_tasks" {
description = "Max concurrent backfill tasks. Higher = faster historical load, more source load."
type = number
default = 12
validation {
condition = var.max_concurrent_backfill_tasks >= 0 && var.max_concurrent_backfill_tasks <= 50
error_message = "max_concurrent_backfill_tasks must be between 0 and 50."
}
}
variable "destination" {
description = <<-EOT
Destination definition. Set type = "bigquery" OR "gcs".
BigQuery:
- connection_profile_id (required)
- write_mode: "merge" (current-state mirror) or "append_only"
- data_freshness: duration string, e.g. "900s"
- EITHER dataset_id (single target dataset)
OR dataset_template { location, dataset_id_prefix, kms_key_name }
(one auto-created dataset per source schema)
GCS:
- connection_profile_id (required)
- path: object prefix within the bucket, e.g. "/cdc/orders"
- file_format: "avro" or "json"
- file_rotation_mb / file_rotation_interval
- gcs_compression (json only): "NO_COMPRESSION" or "GZIP"
EOT
type = object({
type = string
connection_profile_id = string
# BigQuery
write_mode = optional(string, "merge")
data_freshness = optional(string, "900s")
dataset_id = optional(string)
dataset_template = optional(object({
location = string
dataset_id_prefix = optional(string, "")
kms_key_name = optional(string)
}))
# GCS
path = optional(string, "/")
file_format = optional(string, "avro")
file_rotation_mb = optional(number, 50)
file_rotation_interval = optional(string, "60s")
gcs_compression = optional(string, "GZIP")
})
validation {
condition = contains(["bigquery", "gcs"], var.destination.type)
error_message = "destination.type must be bigquery or gcs."
}
validation {
condition = var.destination.type != "bigquery" || contains(["merge", "append_only"], var.destination.write_mode)
error_message = "BigQuery destination.write_mode must be merge or append_only."
}
# BigQuery requires exactly one of dataset_id / dataset_template.
validation {
condition = var.destination.type != "bigquery" || (
(var.destination.dataset_id != null) != (var.destination.dataset_template != null)
)
error_message = "BigQuery destination requires exactly one of dataset_id OR dataset_template."
}
validation {
condition = var.destination.type != "gcs" || contains(["avro", "json"], var.destination.file_format)
error_message = "GCS destination.file_format must be avro or json."
}
validation {
condition = var.destination.type != "gcs" || contains(["NO_COMPRESSION", "GZIP"], var.destination.gcs_compression)
error_message = "GCS gcs_compression must be NO_COMPRESSION or GZIP."
}
}
variable "kms_key_name" {
description = "Optional CMEK key (projects/<p>/locations/<l>/keyRings/<kr>/cryptoKeys/<k>) for the stream. Must be in var.location."
type = string
default = null
}
variable "labels" {
description = "Labels applied to the stream."
type = map(string)
default = {}
}
outputs.tf
output "stream_id" {
description = "Fully qualified stream resource ID (projects/<p>/locations/<l>/streams/<id>)."
value = google_datastream_stream.this.id
}
output "stream_name" {
description = "Short stream_id used in the console, gcloud, and API calls."
value = google_datastream_stream.this.stream_id
}
output "state" {
description = "Current state of the stream as reported by the API (RUNNING, PAUSED, ...)."
value = google_datastream_stream.this.state
}
output "desired_state" {
description = "Desired lifecycle state Terraform is enforcing."
value = google_datastream_stream.this.desired_state
}
output "source_connection_profile_id" {
description = "Source connection profile the stream reads from."
value = google_datastream_stream.this.source_config[0].source_connection_profile
}
output "destination_connection_profile_id" {
description = "Destination connection profile the stream writes to."
value = google_datastream_stream.this.destination_config[0].destination_connection_profile
}
How to use it
The example replicates two MySQL databases from a Cloud SQL source into a single BigQuery dataset in MERGE mode with 15-second freshness, doing a full historical backfill first. A downstream BigQuery scheduled query consumes the destination dataset using the module output, and a monitoring alert keys off the stream name.
module "datastream" {
source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-gcp-datastream?ref=v1.0.0"
project_id = "kv-data-prod"
app = "orders"
environment = "prod"
location = "europe-west1"
location_short = "euw1"
source_type = "mysql"
source_connection_profile_id = "projects/kv-data-prod/locations/europe-west1/connectionProfiles/cloudsql-orders-src"
# Replicate only the tables analytics needs; leave audit/temp tables out.
included_objects = [
{
name = "orders"
tables = ["order_header", "order_line", "payment"]
},
{
name = "inventory"
tables = [] # all tables in this database
},
]
backfill_mode = "all"
max_concurrent_cdc_tasks = 8
max_concurrent_backfill_tasks = 16
destination = {
type = "bigquery"
connection_profile_id = "projects/kv-data-prod/locations/europe-west1/connectionProfiles/bq-analytics-dst"
write_mode = "merge"
data_freshness = "900s"
dataset_id = "raw_orders_cdc"
}
kms_key_name = "projects/kv-data-prod/locations/europe-west1/keyRings/data-kr/cryptoKeys/datastream"
labels = {
team = "data-platform"
cost-center = "kv-1042"
workload = "orders-cdc"
}
}
# Downstream: a BigQuery scheduled query that transforms the CDC-mirrored
# tables. It depends on the stream so the dataset exists and is flowing first.
resource "google_bigquery_data_transfer_config" "orders_mart" {
project = "kv-data-prod"
location = "europe-west1"
display_name = "orders-mart-refresh"
data_source_id = "scheduled_query"
schedule = "every 1 hours"
destination_dataset_id = "marts"
params = {
query = "SELECT * FROM `kv-data-prod.raw_orders_cdc.order_header` WHERE status = 'PAID'"
}
# Don't build the mart until the stream that feeds raw_orders_cdc is RUNNING.
depends_on = [module.datastream]
}
output "orders_stream_state" {
value = module.datastream.state
}
With Terragrunt
Terragrunt keeps this module DRY across environments — define the backend and provider once in a root config, then a thin terragrunt.hcl per environment supplies only the inputs that differ.
1. Root config — live/terragrunt.hcl (inherited by every module):
remote_state {
backend = "gcs"
generate = { path = "backend.tf", if_exists = "overwrite" }
config = {
# ...gcs state bucket/container + key per path...
}
}
2. Module config — live/prod/datastream/terragrunt.hcl:
include "root" {
path = find_in_parent_folders()
}
terraform {
source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-gcp-datastream?ref=v1.0.0"
}
inputs = {
project_id = "..."
app = "..."
environment = "..."
location = "..."
location_short = "..."
source_type = "..."
source_connection_profile_id = "..."
destination = {}
}
3. Deploy one environment, or roll out all modules together:
cd live/prod/datastream && terragrunt apply # this module
terragrunt run-all apply # every module under live/prod
Why Terragrunt here: the backend and provider live in one place instead of being copy-pasted into every module; inputs is overridden per environment (dev / stage / prod) without forking the module; and run-all orchestrates dependencies across modules. Reach for it once you have more than one environment or more than a handful of modules — for a single stack, the plain Quickstart above is enough.
Inputs
| Name | Type | Default | Required | Description |
|---|---|---|---|---|
project_id |
string |
— | Yes | GCP project hosting the stream. |
app |
string |
— | Yes | Workload short name used in stream_id (validated lowercase). |
environment |
string |
— | Yes | One of dev, staging, prod, sandbox. |
location |
string |
— | Yes | Region for the stream; must match the connection profiles and CMEK key. |
location_short |
string |
— | Yes | Cosmetic region token for naming (e.g. euw1). |
display_name |
string |
null |
No | Display name; defaults to the generated stream_id. |
desired_state |
string |
"RUNNING" |
No | RUNNING or PAUSED — the state Terraform enforces. |
source_type |
string |
— | Yes | mysql or postgresql. |
source_connection_profile_id |
string |
— | Yes | Fully qualified source connection profile ID. |
postgresql_replication_slot |
string |
null |
No | Logical replication slot (required when source_type = postgresql). |
postgresql_publication |
string |
null |
No | Publication backing the slot (required when source_type = postgresql). |
included_objects |
list(object) |
[] |
No | Databases/schemas (name) and their tables to replicate; empty = all. |
backfill_mode |
string |
"all" |
No | all (history then CDC) or none (CDC-only). |
max_concurrent_cdc_tasks |
number |
5 |
No | Concurrent CDC tasks (MySQL); 0–50. |
max_concurrent_backfill_tasks |
number |
12 |
No | Concurrent backfill tasks; 0–50. |
destination |
object |
— | Yes | BigQuery or GCS destination (see block; type discriminates). |
kms_key_name |
string |
null |
No | CMEK key for the stream; must be in location. |
labels |
map(string) |
{} |
No | Labels applied to the stream. |
Outputs
| Name | Description |
|---|---|
stream_id |
Fully qualified stream resource ID (projects/<p>/locations/<l>/streams/<id>). |
stream_name |
Short stream_id used in console, gcloud, and API calls. |
state |
Current API-reported state (RUNNING, PAUSED, …). |
desired_state |
Desired lifecycle state Terraform is enforcing. |
source_connection_profile_id |
Source connection profile the stream reads from. |
destination_connection_profile_id |
Destination connection profile the stream writes to. |
Enterprise scenario
A retail data-platform team replicates their Cloud SQL for PostgreSQL orders and inventory schemas into BigQuery so the analytics warehouse stays ~15 seconds behind production. They deploy this module once per environment with source_type = "postgresql", a pre-created replication_slot / publication, included_objects scoped to the eight tables the marts actually need, and a BigQuery destination in merge mode with data_freshness = "900s" and a dataset_template so each schema lands in its own CMEK-encrypted dataset. The historical load runs once via backfill_mode = "all", after which the stream is CDC-only; desired_state = "RUNNING" keeps Terraform from silently pausing it, and the state output feeds a Cloud Monitoring alert so on-call is paged if a stream stalls — onboarding a new source database is a copy-paste of one module block plus its connection-profile ID.
Best practices
- Scope
included_objectstightly and pick the right write mode. Replicating whole databases pulls in audit, temp, and high-churn tables you don’t need, inflating BigQuery merge cost and source load. List only the tables analytics consumes, and usemergefor a queryable current-state mirror orappend_onlywhen you need an immutable change history for audit/SCD pipelines. - Tune
data_freshnessto the requirement, not the minimum. BigQuery merges are billed work —900s(15 min) is plenty for most dashboards, while0s/near-real-time multiplies write cost. Reserve sub-minute freshness for genuinely operational analytics. - Make
backfill_modea deliberate decision.backfill_allreplays full table history (heavy initial source load — schedule it off-peak and capmax_concurrent_backfill_tasks);backfill_nonestarts CDC from “now” and is correct only when the destination is already seeded by another load. - Encrypt with CMEK and keep regions aligned. Pass
kms_key_namefor compliance, and ensure the streamlocation, both connection profiles, the CMEK key, and anydataset_template.kms_key_nameall live in the same region — cross-region mismatches fail at apply. - Pin
desired_stateand treatstream_idas immutable. Always setdesired_stateexplicitly so a stream isn’t leftPAUSEDafter an apply; renaming the stream forces a full re-backfill, which is why the module keepsignore_changes = [stream_id]. Pause intentionally for source maintenance viadesired_state = "PAUSED". - Standardize naming and labels for cost attribution. The
app-env-regionstream_idplusteam/cost-centerlabels make CDC spend (Datastream GB processed + BigQuery storage/merge) traceable and streams greppable once you run more than a couple across projects.