IaC AWS

Terraform Module: AWS AppFlow — codified SaaS-to-S3 data flows that never drift

Quick take — Build a reusable Terraform module for AWS AppFlow using aws_appflow_flow: wire up source/destination connectors, scheduled or event triggers, field mappings and KMS encryption as version-pinned IaC. New here? Jump to the Quickstart below to deploy it in minutes; read on for how it works and when to reach for it.

Quickstart (copy-paste)

Minimal, runnable configuration — drop this in a .tf file and fill in the "..." placeholders (each required input is commented):

provider "aws" {
  region = "us-east-1"
}

module "appflow" {
  source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-aws-appflow?ref=v1.0.0"

  name                          = "..."           # Flow name, unique per account/region; alphanumeric plus…
  source_connector_type         = "..."           # Source connector: `Salesforce`, `Servicenow`, or `Zende…
  source_connector_profile_name = "..."           # Name of the pre-created connector profile holding sourc…
  source_object                 = "..."           # Source object to extract (e.g. `Opportunity`, `tickets`…
  mapped_fields                 = ["...", "..."]  # Ordered list of source fields to extract; must be non-e…
  destination_bucket_name       = "..."           # Target S3 bucket name.
}

Then terraform init && terraform apply. Every other input has a sensible default — see Inputs below to override behaviour.

What this module is

Amazon AppFlow is a fully managed integration service that moves data between SaaS applications (Salesforce, Zendesk, Slack, Marketo, ServiceNow, SAP, and dozens more) and AWS services such as Amazon S3, Redshift, and EventBridge — without you writing or running any connector code. A single “flow” in AppFlow binds a source connector, a destination connector, a trigger (run-on-demand, on a schedule, or on a SaaS change event), a set of field mappings, and optional filters, validations, and transformations.

The trouble with clicking these together in the console is that an AppFlow flow is deceptively stateful: the source connector profile, the mapping tasks (each field is its own task block), the trigger cron expression, and the destination’s S3 prefix and aggregation settings all have to agree, or the flow silently produces empty or malformed output. Wrapping aws_appflow_flow in a Terraform module turns that fragile pile of console settings into a single, reviewable, version-pinned artifact. You declare the connector profiles, the field list, and the schedule once; the module assembles the repetitive task and source_flow_config/destination_flow_config blocks correctly every time, and a terraform plan shows you exactly what would change before anyone touches production data.

When to use it

If you only need a single one-off, console-driven extract that you will never touch again, the module is overkill. The moment a flow becomes “production data plumbing,” codify it.

Module structure

terraform-module-aws-appflow/
├── versions.tf      # provider + Terraform version pins
├── main.tf          # aws_appflow_flow with source/destination/tasks/trigger
├── variables.tf     # var-driven inputs with validation
└── outputs.tf       # flow id/arn/name + trigger metadata

versions.tf

terraform {
  required_version = ">= 1.5.0"

  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
  }
}

main.tf

locals {
  # Build one Map_all-style task per mapped field. Each field becomes its
  # own AppFlow "Map" task; a trailing projection task lists every field.
  mapping_tasks = [
    for field in var.mapped_fields : {
      source_fields     = [field]
      destination_field = field
    }
  ]
}

resource "aws_appflow_flow" "this" {
  name        = var.name
  description = var.description

  # Encrypt flow metadata and any intermediate data with a customer-managed key.
  kms_arn = var.kms_arn

  # ---------------------------------------------------------------------------
  # SOURCE: a SaaS connector profile created out-of-band (Salesforce, Zendesk…)
  # ---------------------------------------------------------------------------
  source_flow_config {
    connector_type         = var.source_connector_type
    connector_profile_name = var.source_connector_profile_name
    api_version            = var.source_api_version

    source_connector_properties {
      dynamic "salesforce" {
        for_each = var.source_connector_type == "Salesforce" ? [1] : []
        content {
          object                     = var.source_object
          enable_dynamic_field_update = var.salesforce_enable_dynamic_field_update
          include_deleted_records    = var.salesforce_include_deleted_records
        }
      }

      dynamic "service_now" {
        for_each = var.source_connector_type == "Servicenow" ? [1] : []
        content {
          object = var.source_object
        }
      }

      dynamic "zendesk" {
        for_each = var.source_connector_type == "Zendesk" ? [1] : []
        content {
          object = var.source_object
        }
      }
    }
  }

  # ---------------------------------------------------------------------------
  # DESTINATION: an S3 bucket + prefix in the data lake
  # ---------------------------------------------------------------------------
  destination_flow_config {
    connector_type = "S3"

    destination_connector_properties {
      s3 {
        bucket_name   = var.destination_bucket_name
        bucket_prefix = var.destination_bucket_prefix

        s3_output_format_config {
          file_type = var.s3_file_type

          aggregation_config {
            aggregation_type = var.s3_aggregation_type
          }

          prefix_config {
            prefix_type   = var.s3_prefix_type
            prefix_format = var.s3_prefix_format
          }
        }
      }
    }
  }

  # ---------------------------------------------------------------------------
  # TASKS: one Map task per field, plus a projection that selects the field set
  # ---------------------------------------------------------------------------
  dynamic "task" {
    for_each = local.mapping_tasks
    content {
      source_fields     = task.value.source_fields
      destination_field = task.value.destination_field
      task_type         = "Map"

      connector_operator {
        salesforce  = var.source_connector_type == "Salesforce" ? "PROJECTION" : null
        service_now = var.source_connector_type == "Servicenow" ? "PROJECTION" : null
        zendesk     = var.source_connector_type == "Zendesk" ? "PROJECTION" : null
      }
    }
  }

  # Projection task: tells AppFlow the exact set of fields to retrieve.
  task {
    source_fields     = var.mapped_fields
    destination_field = ""
    task_type         = "Filter"

    connector_operator {
      salesforce  = var.source_connector_type == "Salesforce" ? "PROJECTION" : null
      service_now = var.source_connector_type == "Servicenow" ? "PROJECTION" : null
      zendesk     = var.source_connector_type == "Zendesk" ? "PROJECTION" : null
    }
  }

  # ---------------------------------------------------------------------------
  # TRIGGER: on-demand or scheduled (incremental pulls via a watermark field)
  # ---------------------------------------------------------------------------
  trigger_config {
    trigger_type = var.trigger_type

    dynamic "trigger_properties" {
      for_each = var.trigger_type == "Scheduled" ? [1] : []
      content {
        scheduled {
          schedule_expression  = var.schedule_expression
          data_pull_mode       = var.data_pull_mode
          schedule_start_time  = var.schedule_start_time
          timezone             = var.schedule_timezone
          schedule_offset      = var.schedule_offset
        }
      }
    }
  }

  tags = var.tags
}

variables.tf

variable "name" {
  description = "Name of the AppFlow flow (must be unique within the account/region)."
  type        = string

  validation {
    condition     = can(regex("^[a-zA-Z0-9][a-zA-Z0-9_-]{0,255}$", var.name))
    error_message = "name must be alphanumeric and may contain _ and -, up to 256 chars."
  }
}

variable "description" {
  description = "Human-readable description of the flow's purpose."
  type        = string
  default     = "Managed by Terraform"
}

variable "kms_arn" {
  description = "ARN of a customer-managed KMS key used to encrypt flow data. Leave null to use the AWS-managed AppFlow key."
  type        = string
  default     = null

  validation {
    condition     = var.kms_arn == null || can(regex("^arn:aws[a-zA-Z-]*:kms:", var.kms_arn))
    error_message = "kms_arn must be a valid KMS key ARN or null."
  }
}

# ----- Source -----------------------------------------------------------------

variable "source_connector_type" {
  description = "SaaS source connector type."
  type        = string

  validation {
    condition     = contains(["Salesforce", "Servicenow", "Zendesk"], var.source_connector_type)
    error_message = "source_connector_type must be one of: Salesforce, Servicenow, Zendesk."
  }
}

variable "source_connector_profile_name" {
  description = "Name of the pre-created AppFlow connector profile for the source (holds OAuth/credentials)."
  type        = string
}

variable "source_api_version" {
  description = "API version of the source connector (e.g. Salesforce '61.0'). Null lets AppFlow choose."
  type        = string
  default     = null
}

variable "source_object" {
  description = "Source object to extract (e.g. Salesforce 'Opportunity', Zendesk 'tickets')."
  type        = string
}

variable "salesforce_enable_dynamic_field_update" {
  description = "Salesforce: import newly added fields automatically on each run."
  type        = bool
  default     = false
}

variable "salesforce_include_deleted_records" {
  description = "Salesforce: include soft-deleted (recycle bin) records in the extract."
  type        = bool
  default     = false
}

# ----- Field mapping ----------------------------------------------------------

variable "mapped_fields" {
  description = "Ordered list of source field names to extract and pass through to the destination."
  type        = list(string)

  validation {
    condition     = length(var.mapped_fields) > 0
    error_message = "mapped_fields must contain at least one field."
  }
}

# ----- Destination (S3) -------------------------------------------------------

variable "destination_bucket_name" {
  description = "Name of the S3 bucket that receives flow output."
  type        = string
}

variable "destination_bucket_prefix" {
  description = "Key prefix under which records are written (no leading/trailing slash)."
  type        = string
  default     = null
}

variable "s3_file_type" {
  description = "Output file format written to S3."
  type        = string
  default     = "PARQUET"

  validation {
    condition     = contains(["CSV", "JSON", "PARQUET"], var.s3_file_type)
    error_message = "s3_file_type must be one of: CSV, JSON, PARQUET."
  }
}

variable "s3_aggregation_type" {
  description = "How records are aggregated into output files."
  type        = string
  default     = "None"

  validation {
    condition     = contains(["None", "SingleFile"], var.s3_aggregation_type)
    error_message = "s3_aggregation_type must be None or SingleFile."
  }
}

variable "s3_prefix_type" {
  description = "Partition prefix granularity AppFlow appends to the output path."
  type        = string
  default     = "PATH"

  validation {
    condition     = contains(["FILENAME", "PATH", "PATH_AND_FILENAME"], var.s3_prefix_type)
    error_message = "s3_prefix_type must be FILENAME, PATH, or PATH_AND_FILENAME."
  }
}

variable "s3_prefix_format" {
  description = "Time granularity of the date-based prefix."
  type        = string
  default     = "DAY"

  validation {
    condition     = contains(["YEAR", "MONTH", "DAY", "HOUR", "MINUTE"], var.s3_prefix_format)
    error_message = "s3_prefix_format must be YEAR, MONTH, DAY, HOUR, or MINUTE."
  }
}

# ----- Trigger ----------------------------------------------------------------

variable "trigger_type" {
  description = "How the flow is triggered."
  type        = string
  default     = "OnDemand"

  validation {
    condition     = contains(["OnDemand", "Scheduled", "Event"], var.trigger_type)
    error_message = "trigger_type must be OnDemand, Scheduled, or Event."
  }
}

variable "schedule_expression" {
  description = "Schedule for Scheduled flows, e.g. 'rate(1hours)' or 'cron(0 6 * * ? *)'. Required when trigger_type = Scheduled."
  type        = string
  default     = null

  validation {
    condition     = var.schedule_expression == null || can(regex("^(rate|cron)\\(", var.schedule_expression))
    error_message = "schedule_expression must start with rate( or cron(."
  }
}

variable "data_pull_mode" {
  description = "Scheduled flows: Incremental (only changed records via a watermark) or Complete."
  type        = string
  default     = "Incremental"

  validation {
    condition     = contains(["Incremental", "Complete"], var.data_pull_mode)
    error_message = "data_pull_mode must be Incremental or Complete."
  }
}

variable "schedule_start_time" {
  description = "Unix epoch (seconds) at which the schedule first becomes active. Null = immediately."
  type        = number
  default     = null
}

variable "schedule_timezone" {
  description = "IANA timezone for cron-based schedule expressions, e.g. 'Asia/Kolkata'."
  type        = string
  default     = null
}

variable "schedule_offset" {
  description = "Seconds (0-36000) to offset the scheduled run, to spread load across many flows."
  type        = number
  default     = 0

  validation {
    condition     = var.schedule_offset >= 0 && var.schedule_offset <= 36000
    error_message = "schedule_offset must be between 0 and 36000 seconds."
  }
}

variable "tags" {
  description = "Tags applied to the flow."
  type        = map(string)
  default     = {}
}

outputs.tf

output "flow_id" {
  description = "The AppFlow flow ID."
  value       = aws_appflow_flow.this.id
}

output "flow_arn" {
  description = "The ARN of the AppFlow flow (use to grant EventBridge/IAM access)."
  value       = aws_appflow_flow.this.arn
}

output "flow_name" {
  description = "The name of the AppFlow flow."
  value       = aws_appflow_flow.this.name
}

output "flow_status" {
  description = "Current activation status of the flow (Active, Draft, etc.)."
  value       = aws_appflow_flow.this.flow_status
}

output "trigger_type" {
  description = "The trigger type the flow was created with."
  value       = var.trigger_type
}

How to use it

module "salesforce_opportunities" {
  source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-aws-appflow?ref=v1.0.0"

  name        = "sfdc-opportunities-to-lake"
  description = "Hourly incremental pull of Salesforce Opportunities into the curated lake"

  kms_arn = aws_kms_key.data_lake.arn

  # Source: a Salesforce connector profile you created via the console/API once.
  source_connector_type         = "Salesforce"
  source_connector_profile_name = "sfdc-prod-oauth"
  source_api_version            = "61.0"
  source_object                 = "Opportunity"

  mapped_fields = [
    "Id",
    "Name",
    "StageName",
    "Amount",
    "CloseDate",
    "AccountId",
    "LastModifiedDate",
  ]

  # Destination: the curated zone of the data lake.
  destination_bucket_name   = aws_s3_bucket.curated.id
  destination_bucket_prefix = "salesforce/opportunity"
  s3_file_type              = "PARQUET"
  s3_prefix_format          = "DAY"

  # Trigger: every hour, incremental on LastModifiedDate, offset to avoid thundering herd.
  trigger_type        = "Scheduled"
  schedule_expression = "rate(1hours)"
  data_pull_mode      = "Incremental"
  schedule_timezone   = "Asia/Kolkata"
  schedule_offset     = 300

  tags = {
    Environment = "prod"
    Team        = "data-platform"
    CostCenter  = "analytics"
  }
}

# Downstream: notify a Lambda whenever this flow completes a run, by matching
# the flow ARN/name in an EventBridge rule.
resource "aws_cloudwatch_event_rule" "opportunity_flow_complete" {
  name        = "appflow-${module.salesforce_opportunities.flow_name}-complete"
  description = "Fires when the Salesforce Opportunity flow finishes a run"

  event_pattern = jsonencode({
    source      = ["aws.appflow"]
    detail-type = ["AppFlow End Flow Run Report"]
    detail = {
      "flow-name" = [module.salesforce_opportunities.flow_name]
    }
  })
}

With Terragrunt

Terragrunt keeps this module DRY across environments — define the backend and provider once in a root config, then a thin terragrunt.hcl per environment supplies only the inputs that differ.

1. Root configlive/terragrunt.hcl (inherited by every module):

remote_state {
  backend = "s3"
  generate = { path = "backend.tf", if_exists = "overwrite" }
  config = {
    # ...s3 state bucket/container + key per path...
  }
}

2. Module configlive/prod/appflow/terragrunt.hcl:

include "root" {
  path = find_in_parent_folders()
}

terraform {
  source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-aws-appflow?ref=v1.0.0"
}

inputs = {
  name = "..."
  source_connector_type = "..."
  source_connector_profile_name = "..."
  source_object = "..."
  mapped_fields = ["...", "..."]
  destination_bucket_name = "..."
}

3. Deploy one environment, or roll out all modules together:

cd live/prod/appflow && terragrunt apply        # this module
terragrunt run-all apply                      # every module under live/prod

Why Terragrunt here: the backend and provider live in one place instead of being copy-pasted into every module; inputs is overridden per environment (dev / stage / prod) without forking the module; and run-all orchestrates dependencies across modules. Reach for it once you have more than one environment or more than a handful of modules — for a single stack, the plain Quickstart above is enough.

Inputs

Name Type Default Required Description
name string Yes Flow name, unique per account/region; alphanumeric plus _/-.
description string "Managed by Terraform" No Human-readable purpose of the flow.
kms_arn string null No Customer-managed KMS key ARN for flow encryption; null uses the AWS-managed key.
source_connector_type string Yes Source connector: Salesforce, Servicenow, or Zendesk.
source_connector_profile_name string Yes Name of the pre-created connector profile holding source credentials.
source_api_version string null No Source connector API version (e.g. Salesforce 61.0).
source_object string Yes Source object to extract (e.g. Opportunity, tickets).
salesforce_enable_dynamic_field_update bool false No Salesforce: auto-import newly added fields each run.
salesforce_include_deleted_records bool false No Salesforce: include soft-deleted records.
mapped_fields list(string) Yes Ordered list of source fields to extract; must be non-empty.
destination_bucket_name string Yes Target S3 bucket name.
destination_bucket_prefix string null No Key prefix for output (no leading/trailing slash).
s3_file_type string "PARQUET" No Output format: CSV, JSON, or PARQUET.
s3_aggregation_type string "None" No None or SingleFile aggregation of records.
s3_prefix_type string "PATH" No FILENAME, PATH, or PATH_AND_FILENAME.
s3_prefix_format string "DAY" No Date partition granularity: YEARMINUTE.
trigger_type string "OnDemand" No OnDemand, Scheduled, or Event.
schedule_expression string null No rate(...)/cron(...); required when trigger_type = Scheduled.
data_pull_mode string "Incremental" No Scheduled flows: Incremental or Complete.
schedule_start_time number null No Unix epoch (seconds) when the schedule activates.
schedule_timezone string null No IANA timezone for cron schedules (e.g. Asia/Kolkata).
schedule_offset number 0 No 0–36000 seconds offset to spread scheduled load.
tags map(string) {} No Tags applied to the flow.

Outputs

Name Description
flow_id The AppFlow flow ID.
flow_arn The flow ARN, for IAM/EventBridge grants.
flow_name The flow name.
flow_status Current activation status (Active, Draft, etc.).
trigger_type The trigger type the flow was created with.

Enterprise scenario

A retail analytics team needs near-real-time visibility into the sales pipeline across 14 regional Salesforce orgs. They instantiate this module once per org in a for_each over a tenants map, each with its own sfdc-<region>-oauth connector profile, a distinct salesforce/<region>/opportunity S3 prefix, and a 5-minute schedule_offset increment so the 14 hourly incremental pulls do not all fire on the same minute and trip Salesforce API limits. Every flow lands Parquet in the KMS-encrypted curated bucket, where a Glue crawler and Athena view stitch the regions into one pipeline dashboard — and because the field mappings live in version control, adding a new ForecastCategory column is a one-line PR reviewed by the data governance team rather than 14 manual console edits.

Best practices

TerraformAWSAppFlowModuleIaC
Need this built for real?

Vinod is a Senior Cloud Architect (22+ yrs) — available for Azure / AWS / GCP architecture, landing zones, and migrations.

Work with me

Comments

Keep Reading