Quick take — Build a reusable Terraform module for AWS AppFlow using aws_appflow_flow: wire up source/destination connectors, scheduled or event triggers, field mappings and KMS encryption as version-pinned IaC. New here? Jump to the Quickstart below to deploy it in minutes; read on for how it works and when to reach for it.
Quickstart (copy-paste)
Minimal, runnable configuration — drop this in a .tf file and fill in the "..." placeholders (each required input is commented):
provider "aws" {
region = "us-east-1"
}
module "appflow" {
source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-aws-appflow?ref=v1.0.0"
name = "..." # Flow name, unique per account/region; alphanumeric plus…
source_connector_type = "..." # Source connector: `Salesforce`, `Servicenow`, or `Zende…
source_connector_profile_name = "..." # Name of the pre-created connector profile holding sourc…
source_object = "..." # Source object to extract (e.g. `Opportunity`, `tickets`…
mapped_fields = ["...", "..."] # Ordered list of source fields to extract; must be non-e…
destination_bucket_name = "..." # Target S3 bucket name.
}
Then terraform init && terraform apply. Every other input has a sensible default — see Inputs below to override behaviour.
What this module is
Amazon AppFlow is a fully managed integration service that moves data between SaaS applications (Salesforce, Zendesk, Slack, Marketo, ServiceNow, SAP, and dozens more) and AWS services such as Amazon S3, Redshift, and EventBridge — without you writing or running any connector code. A single “flow” in AppFlow binds a source connector, a destination connector, a trigger (run-on-demand, on a schedule, or on a SaaS change event), a set of field mappings, and optional filters, validations, and transformations.
The trouble with clicking these together in the console is that an AppFlow flow is deceptively stateful: the source connector profile, the mapping tasks (each field is its own task block), the trigger cron expression, and the destination’s S3 prefix and aggregation settings all have to agree, or the flow silently produces empty or malformed output. Wrapping aws_appflow_flow in a Terraform module turns that fragile pile of console settings into a single, reviewable, version-pinned artifact. You declare the connector profiles, the field list, and the schedule once; the module assembles the repetitive task and source_flow_config/destination_flow_config blocks correctly every time, and a terraform plan shows you exactly what would change before anyone touches production data.
When to use it
- You need to ingest SaaS data (Salesforce opportunities, Zendesk tickets, Google Analytics events) into an S3 data lake on a schedule and want the pipeline defined as code alongside the bucket, Glue catalog, and IAM.
- You run the same flow shape across many environments or tenants — e.g., one Salesforce-to-S3 flow per business unit — and want to stamp them out from one module with different connector profiles and prefixes.
- You want change review and rollback on integration logic: field mappings and filters are business rules, and a PR diff is a far safer place to change them than the AppFlow console.
- You are standardising encryption and naming for compliance — every flow must land in a KMS-encrypted bucket under a predictable prefix, with consistent tags for cost allocation.
If you only need a single one-off, console-driven extract that you will never touch again, the module is overkill. The moment a flow becomes “production data plumbing,” codify it.
Module structure
terraform-module-aws-appflow/
├── versions.tf # provider + Terraform version pins
├── main.tf # aws_appflow_flow with source/destination/tasks/trigger
├── variables.tf # var-driven inputs with validation
└── outputs.tf # flow id/arn/name + trigger metadata
versions.tf
terraform {
required_version = ">= 1.5.0"
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
}
}
main.tf
locals {
# Build one Map_all-style task per mapped field. Each field becomes its
# own AppFlow "Map" task; a trailing projection task lists every field.
mapping_tasks = [
for field in var.mapped_fields : {
source_fields = [field]
destination_field = field
}
]
}
resource "aws_appflow_flow" "this" {
name = var.name
description = var.description
# Encrypt flow metadata and any intermediate data with a customer-managed key.
kms_arn = var.kms_arn
# ---------------------------------------------------------------------------
# SOURCE: a SaaS connector profile created out-of-band (Salesforce, Zendesk…)
# ---------------------------------------------------------------------------
source_flow_config {
connector_type = var.source_connector_type
connector_profile_name = var.source_connector_profile_name
api_version = var.source_api_version
source_connector_properties {
dynamic "salesforce" {
for_each = var.source_connector_type == "Salesforce" ? [1] : []
content {
object = var.source_object
enable_dynamic_field_update = var.salesforce_enable_dynamic_field_update
include_deleted_records = var.salesforce_include_deleted_records
}
}
dynamic "service_now" {
for_each = var.source_connector_type == "Servicenow" ? [1] : []
content {
object = var.source_object
}
}
dynamic "zendesk" {
for_each = var.source_connector_type == "Zendesk" ? [1] : []
content {
object = var.source_object
}
}
}
}
# ---------------------------------------------------------------------------
# DESTINATION: an S3 bucket + prefix in the data lake
# ---------------------------------------------------------------------------
destination_flow_config {
connector_type = "S3"
destination_connector_properties {
s3 {
bucket_name = var.destination_bucket_name
bucket_prefix = var.destination_bucket_prefix
s3_output_format_config {
file_type = var.s3_file_type
aggregation_config {
aggregation_type = var.s3_aggregation_type
}
prefix_config {
prefix_type = var.s3_prefix_type
prefix_format = var.s3_prefix_format
}
}
}
}
}
# ---------------------------------------------------------------------------
# TASKS: one Map task per field, plus a projection that selects the field set
# ---------------------------------------------------------------------------
dynamic "task" {
for_each = local.mapping_tasks
content {
source_fields = task.value.source_fields
destination_field = task.value.destination_field
task_type = "Map"
connector_operator {
salesforce = var.source_connector_type == "Salesforce" ? "PROJECTION" : null
service_now = var.source_connector_type == "Servicenow" ? "PROJECTION" : null
zendesk = var.source_connector_type == "Zendesk" ? "PROJECTION" : null
}
}
}
# Projection task: tells AppFlow the exact set of fields to retrieve.
task {
source_fields = var.mapped_fields
destination_field = ""
task_type = "Filter"
connector_operator {
salesforce = var.source_connector_type == "Salesforce" ? "PROJECTION" : null
service_now = var.source_connector_type == "Servicenow" ? "PROJECTION" : null
zendesk = var.source_connector_type == "Zendesk" ? "PROJECTION" : null
}
}
# ---------------------------------------------------------------------------
# TRIGGER: on-demand or scheduled (incremental pulls via a watermark field)
# ---------------------------------------------------------------------------
trigger_config {
trigger_type = var.trigger_type
dynamic "trigger_properties" {
for_each = var.trigger_type == "Scheduled" ? [1] : []
content {
scheduled {
schedule_expression = var.schedule_expression
data_pull_mode = var.data_pull_mode
schedule_start_time = var.schedule_start_time
timezone = var.schedule_timezone
schedule_offset = var.schedule_offset
}
}
}
}
tags = var.tags
}
variables.tf
variable "name" {
description = "Name of the AppFlow flow (must be unique within the account/region)."
type = string
validation {
condition = can(regex("^[a-zA-Z0-9][a-zA-Z0-9_-]{0,255}$", var.name))
error_message = "name must be alphanumeric and may contain _ and -, up to 256 chars."
}
}
variable "description" {
description = "Human-readable description of the flow's purpose."
type = string
default = "Managed by Terraform"
}
variable "kms_arn" {
description = "ARN of a customer-managed KMS key used to encrypt flow data. Leave null to use the AWS-managed AppFlow key."
type = string
default = null
validation {
condition = var.kms_arn == null || can(regex("^arn:aws[a-zA-Z-]*:kms:", var.kms_arn))
error_message = "kms_arn must be a valid KMS key ARN or null."
}
}
# ----- Source -----------------------------------------------------------------
variable "source_connector_type" {
description = "SaaS source connector type."
type = string
validation {
condition = contains(["Salesforce", "Servicenow", "Zendesk"], var.source_connector_type)
error_message = "source_connector_type must be one of: Salesforce, Servicenow, Zendesk."
}
}
variable "source_connector_profile_name" {
description = "Name of the pre-created AppFlow connector profile for the source (holds OAuth/credentials)."
type = string
}
variable "source_api_version" {
description = "API version of the source connector (e.g. Salesforce '61.0'). Null lets AppFlow choose."
type = string
default = null
}
variable "source_object" {
description = "Source object to extract (e.g. Salesforce 'Opportunity', Zendesk 'tickets')."
type = string
}
variable "salesforce_enable_dynamic_field_update" {
description = "Salesforce: import newly added fields automatically on each run."
type = bool
default = false
}
variable "salesforce_include_deleted_records" {
description = "Salesforce: include soft-deleted (recycle bin) records in the extract."
type = bool
default = false
}
# ----- Field mapping ----------------------------------------------------------
variable "mapped_fields" {
description = "Ordered list of source field names to extract and pass through to the destination."
type = list(string)
validation {
condition = length(var.mapped_fields) > 0
error_message = "mapped_fields must contain at least one field."
}
}
# ----- Destination (S3) -------------------------------------------------------
variable "destination_bucket_name" {
description = "Name of the S3 bucket that receives flow output."
type = string
}
variable "destination_bucket_prefix" {
description = "Key prefix under which records are written (no leading/trailing slash)."
type = string
default = null
}
variable "s3_file_type" {
description = "Output file format written to S3."
type = string
default = "PARQUET"
validation {
condition = contains(["CSV", "JSON", "PARQUET"], var.s3_file_type)
error_message = "s3_file_type must be one of: CSV, JSON, PARQUET."
}
}
variable "s3_aggregation_type" {
description = "How records are aggregated into output files."
type = string
default = "None"
validation {
condition = contains(["None", "SingleFile"], var.s3_aggregation_type)
error_message = "s3_aggregation_type must be None or SingleFile."
}
}
variable "s3_prefix_type" {
description = "Partition prefix granularity AppFlow appends to the output path."
type = string
default = "PATH"
validation {
condition = contains(["FILENAME", "PATH", "PATH_AND_FILENAME"], var.s3_prefix_type)
error_message = "s3_prefix_type must be FILENAME, PATH, or PATH_AND_FILENAME."
}
}
variable "s3_prefix_format" {
description = "Time granularity of the date-based prefix."
type = string
default = "DAY"
validation {
condition = contains(["YEAR", "MONTH", "DAY", "HOUR", "MINUTE"], var.s3_prefix_format)
error_message = "s3_prefix_format must be YEAR, MONTH, DAY, HOUR, or MINUTE."
}
}
# ----- Trigger ----------------------------------------------------------------
variable "trigger_type" {
description = "How the flow is triggered."
type = string
default = "OnDemand"
validation {
condition = contains(["OnDemand", "Scheduled", "Event"], var.trigger_type)
error_message = "trigger_type must be OnDemand, Scheduled, or Event."
}
}
variable "schedule_expression" {
description = "Schedule for Scheduled flows, e.g. 'rate(1hours)' or 'cron(0 6 * * ? *)'. Required when trigger_type = Scheduled."
type = string
default = null
validation {
condition = var.schedule_expression == null || can(regex("^(rate|cron)\\(", var.schedule_expression))
error_message = "schedule_expression must start with rate( or cron(."
}
}
variable "data_pull_mode" {
description = "Scheduled flows: Incremental (only changed records via a watermark) or Complete."
type = string
default = "Incremental"
validation {
condition = contains(["Incremental", "Complete"], var.data_pull_mode)
error_message = "data_pull_mode must be Incremental or Complete."
}
}
variable "schedule_start_time" {
description = "Unix epoch (seconds) at which the schedule first becomes active. Null = immediately."
type = number
default = null
}
variable "schedule_timezone" {
description = "IANA timezone for cron-based schedule expressions, e.g. 'Asia/Kolkata'."
type = string
default = null
}
variable "schedule_offset" {
description = "Seconds (0-36000) to offset the scheduled run, to spread load across many flows."
type = number
default = 0
validation {
condition = var.schedule_offset >= 0 && var.schedule_offset <= 36000
error_message = "schedule_offset must be between 0 and 36000 seconds."
}
}
variable "tags" {
description = "Tags applied to the flow."
type = map(string)
default = {}
}
outputs.tf
output "flow_id" {
description = "The AppFlow flow ID."
value = aws_appflow_flow.this.id
}
output "flow_arn" {
description = "The ARN of the AppFlow flow (use to grant EventBridge/IAM access)."
value = aws_appflow_flow.this.arn
}
output "flow_name" {
description = "The name of the AppFlow flow."
value = aws_appflow_flow.this.name
}
output "flow_status" {
description = "Current activation status of the flow (Active, Draft, etc.)."
value = aws_appflow_flow.this.flow_status
}
output "trigger_type" {
description = "The trigger type the flow was created with."
value = var.trigger_type
}
How to use it
module "salesforce_opportunities" {
source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-aws-appflow?ref=v1.0.0"
name = "sfdc-opportunities-to-lake"
description = "Hourly incremental pull of Salesforce Opportunities into the curated lake"
kms_arn = aws_kms_key.data_lake.arn
# Source: a Salesforce connector profile you created via the console/API once.
source_connector_type = "Salesforce"
source_connector_profile_name = "sfdc-prod-oauth"
source_api_version = "61.0"
source_object = "Opportunity"
mapped_fields = [
"Id",
"Name",
"StageName",
"Amount",
"CloseDate",
"AccountId",
"LastModifiedDate",
]
# Destination: the curated zone of the data lake.
destination_bucket_name = aws_s3_bucket.curated.id
destination_bucket_prefix = "salesforce/opportunity"
s3_file_type = "PARQUET"
s3_prefix_format = "DAY"
# Trigger: every hour, incremental on LastModifiedDate, offset to avoid thundering herd.
trigger_type = "Scheduled"
schedule_expression = "rate(1hours)"
data_pull_mode = "Incremental"
schedule_timezone = "Asia/Kolkata"
schedule_offset = 300
tags = {
Environment = "prod"
Team = "data-platform"
CostCenter = "analytics"
}
}
# Downstream: notify a Lambda whenever this flow completes a run, by matching
# the flow ARN/name in an EventBridge rule.
resource "aws_cloudwatch_event_rule" "opportunity_flow_complete" {
name = "appflow-${module.salesforce_opportunities.flow_name}-complete"
description = "Fires when the Salesforce Opportunity flow finishes a run"
event_pattern = jsonencode({
source = ["aws.appflow"]
detail-type = ["AppFlow End Flow Run Report"]
detail = {
"flow-name" = [module.salesforce_opportunities.flow_name]
}
})
}
With Terragrunt
Terragrunt keeps this module DRY across environments — define the backend and provider once in a root config, then a thin terragrunt.hcl per environment supplies only the inputs that differ.
1. Root config — live/terragrunt.hcl (inherited by every module):
remote_state {
backend = "s3"
generate = { path = "backend.tf", if_exists = "overwrite" }
config = {
# ...s3 state bucket/container + key per path...
}
}
2. Module config — live/prod/appflow/terragrunt.hcl:
include "root" {
path = find_in_parent_folders()
}
terraform {
source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-aws-appflow?ref=v1.0.0"
}
inputs = {
name = "..."
source_connector_type = "..."
source_connector_profile_name = "..."
source_object = "..."
mapped_fields = ["...", "..."]
destination_bucket_name = "..."
}
3. Deploy one environment, or roll out all modules together:
cd live/prod/appflow && terragrunt apply # this module
terragrunt run-all apply # every module under live/prod
Why Terragrunt here: the backend and provider live in one place instead of being copy-pasted into every module; inputs is overridden per environment (dev / stage / prod) without forking the module; and run-all orchestrates dependencies across modules. Reach for it once you have more than one environment or more than a handful of modules — for a single stack, the plain Quickstart above is enough.
Inputs
| Name | Type | Default | Required | Description |
|---|---|---|---|---|
| name | string | — | Yes | Flow name, unique per account/region; alphanumeric plus _/-. |
| description | string | "Managed by Terraform" |
No | Human-readable purpose of the flow. |
| kms_arn | string | null |
No | Customer-managed KMS key ARN for flow encryption; null uses the AWS-managed key. |
| source_connector_type | string | — | Yes | Source connector: Salesforce, Servicenow, or Zendesk. |
| source_connector_profile_name | string | — | Yes | Name of the pre-created connector profile holding source credentials. |
| source_api_version | string | null |
No | Source connector API version (e.g. Salesforce 61.0). |
| source_object | string | — | Yes | Source object to extract (e.g. Opportunity, tickets). |
| salesforce_enable_dynamic_field_update | bool | false |
No | Salesforce: auto-import newly added fields each run. |
| salesforce_include_deleted_records | bool | false |
No | Salesforce: include soft-deleted records. |
| mapped_fields | list(string) | — | Yes | Ordered list of source fields to extract; must be non-empty. |
| destination_bucket_name | string | — | Yes | Target S3 bucket name. |
| destination_bucket_prefix | string | null |
No | Key prefix for output (no leading/trailing slash). |
| s3_file_type | string | "PARQUET" |
No | Output format: CSV, JSON, or PARQUET. |
| s3_aggregation_type | string | "None" |
No | None or SingleFile aggregation of records. |
| s3_prefix_type | string | "PATH" |
No | FILENAME, PATH, or PATH_AND_FILENAME. |
| s3_prefix_format | string | "DAY" |
No | Date partition granularity: YEAR…MINUTE. |
| trigger_type | string | "OnDemand" |
No | OnDemand, Scheduled, or Event. |
| schedule_expression | string | null |
No | rate(...)/cron(...); required when trigger_type = Scheduled. |
| data_pull_mode | string | "Incremental" |
No | Scheduled flows: Incremental or Complete. |
| schedule_start_time | number | null |
No | Unix epoch (seconds) when the schedule activates. |
| schedule_timezone | string | null |
No | IANA timezone for cron schedules (e.g. Asia/Kolkata). |
| schedule_offset | number | 0 |
No | 0–36000 seconds offset to spread scheduled load. |
| tags | map(string) | {} |
No | Tags applied to the flow. |
Outputs
| Name | Description |
|---|---|
| flow_id | The AppFlow flow ID. |
| flow_arn | The flow ARN, for IAM/EventBridge grants. |
| flow_name | The flow name. |
| flow_status | Current activation status (Active, Draft, etc.). |
| trigger_type | The trigger type the flow was created with. |
Enterprise scenario
A retail analytics team needs near-real-time visibility into the sales pipeline across 14 regional Salesforce orgs. They instantiate this module once per org in a for_each over a tenants map, each with its own sfdc-<region>-oauth connector profile, a distinct salesforce/<region>/opportunity S3 prefix, and a 5-minute schedule_offset increment so the 14 hourly incremental pulls do not all fire on the same minute and trip Salesforce API limits. Every flow lands Parquet in the KMS-encrypted curated bucket, where a Glue crawler and Athena view stitch the regions into one pipeline dashboard — and because the field mappings live in version control, adding a new ForecastCategory column is a one-line PR reviewed by the data governance team rather than 14 manual console edits.
Best practices
- Always set
kms_arnto a customer-managed key in production. The AWS-managed AppFlow key cannot be scoped with key policies; a CMK lets you restrict who can decrypt landed SaaS data and gives you an audit trail in CloudTrail. - Prefer
Incrementalpull mode with a deliberate watermark field (e.g.LastModifiedDate) for scheduled flows. Full (Complete) pulls re-extract every record on every run, inflate AppFlow per-flow-run and SaaS API costs, and can blow Salesforce/Zendesk rate limits. - Stagger
schedule_offsetacross many flows. Fifty flows all onrate(1hours)will hammer the SaaS API at the top of the hour; spreading them by 60–300 second offsets smooths load and avoids throttling-induced failures. - Write
PARQUETto S3, not CSV/JSON, when the destination is a data lake. Columnar Parquet is far cheaper to scan in Athena/Redshift Spectrum and preserves types that CSV would flatten to strings. - Keep connector profiles out of the flow module. OAuth/refresh-token connector profiles hold secrets and have a different lifecycle than flow definitions; create them separately (or via a dedicated secrets pipeline) and pass only
source_connector_profile_namein, so rotating credentials never forces a flow replacement. - Name flows by
<source>-<object>-<destination-zone>(e.g.sfdc-opportunities-to-lake) and tag withEnvironment/Team/CostCenter. AppFlow billing is per flow run plus data processed, and consistent tags are the only way to attribute that spend back to the team that owns the integration.