IaC Azure

Terraform Module: Azure Stream Analytics — codified streaming jobs with inputs, outputs, and SQL queries

Quick take — A reusable hashicorp/azurerm module for Azure Stream Analytics jobs: declare streaming units, the transformation query, Event Hub inputs and Blob/SQL outputs, managed identity, and diagnostics — all version-pinned and var-driven. New here? Jump to the Quickstart below to deploy it in minutes; read on for how it works and when to reach for it.

Quickstart (copy-paste)

Minimal, runnable configuration — drop this in a .tf file and fill in the "..." placeholders (each required input is commented):

provider "azurerm" {
  features {}
}

module "stream_analytics" {
  source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-azure-stream-analytics?ref=v1.0.0"

  name                 = "..."  # Job name, 3-63 chars (letters, numbers, hyphens).
  resource_group_name  = "..."  # Resource group hosting the job.
  location             = "..."  # Azure region for the job.
  workload             = "..."  # Short workload identifier used in tags.
  transformation_query = "..."  # The Stream Analytics SQL transformation (non-empty).
}

Then terraform init && terraform apply. Every other input has a sensible default — see Inputs below to override behaviour.

What this module is

Azure Stream Analytics (ASA) is a fully managed, serverless real-time analytics engine. You point it at a streaming source (Event Hubs, IoT Hub, Blob), write a SQL-like query that filters, aggregates over tumbling/hopping/sliding windows, joins reference data, and detects anomalies, and it continuously writes results to a sink (Blob/ADLS, SQL, Event Hub, Cosmos DB, Power BI). You pay for the Streaming Units (SUs) you provision, not for individual events — so the job definition, its capacity, and its query are exactly the kind of thing you want pinned in code rather than clicked together in the portal.

The problem with hand-built ASA jobs is that the moving parts drift apart. The job, its inputs, its outputs, the transformation query, the managed identity that lets it reach Key Vault or a SQL database, and the diagnostic settings all live in different blades. A reusable Terraform module wraps azurerm_stream_analytics_job together with the input/output resources and the transformation query so that one module block produces a complete, wired-up, observable streaming pipeline. The SU count, query, and event-ordering policy become reviewable variables; the job’s principal ID becomes an output you can hand to a Key Vault access policy or a SQL role assignment downstream.

When to use it

Reach for raw resources instead only when you have a single throwaway job, or when you need an input/output connector type this module deliberately does not expose yet (e.g. Cosmos DB output) and you would rather not extend it.

Module structure

terraform-module-azure-stream-analytics/
├── versions.tf      # provider + Terraform version pinning
├── main.tf          # ASA job + input + output + transformation + diagnostics
├── variables.tf     # var-driven inputs with validation
└── outputs.tf       # id, name, principal_id, and key attributes

versions.tf

terraform {
  required_version = ">= 1.5.0"

  required_providers {
    azurerm = {
      source  = "hashicorp/azurerm"
      version = "~> 4.0"
    }
  }
}

main.tf

locals {
  # Stream Analytics job names are constrained to 3-63 chars: letters, numbers, hyphens.
  job_name = lower(var.name)

  tags = merge(
    {
      "managed-by" = "terraform"
      "module"     = "terraform-module-azure-stream-analytics"
      "workload"   = var.workload
    },
    var.tags
  )
}

resource "azurerm_stream_analytics_job" "this" {
  name                = local.job_name
  resource_group_name = var.resource_group_name
  location            = var.location

  # Capacity & SQL surface.
  streaming_units                          = var.streaming_units
  sku_name                                 = var.sku_name
  compatibility_level                      = var.compatibility_level
  data_locale                              = var.data_locale
  content_storage_policy                   = var.content_storage_policy

  # Late/early/out-of-order handling — critical for windowed aggregations.
  events_late_arrival_max_delay_in_seconds = var.events_late_arrival_max_delay_in_seconds
  events_out_of_order_max_delay_in_seconds = var.events_out_of_order_max_delay_in_seconds
  events_out_of_order_policy               = var.events_out_of_order_policy
  output_error_policy                      = var.output_error_policy

  # The SQL transformation. Keep this in a .sql file and pass via file()/templatefile().
  transformation_query = var.transformation_query

  # System-assigned managed identity lets the job reach Key Vault / SQL / ADLS
  # without connection-string secrets.
  identity {
    type = "SystemAssigned"
  }

  tags = local.tags
}

# ---------------------------------------------------------------------------
# Input: Event Hub stream (the most common ASA source in production).
# ---------------------------------------------------------------------------
resource "azurerm_stream_analytics_stream_input_eventhub" "this" {
  count = var.eventhub_input == null ? 0 : 1

  name                         = var.eventhub_input.name
  stream_analytics_job_name    = azurerm_stream_analytics_job.this.name
  resource_group_name          = var.resource_group_name
  eventhub_name                = var.eventhub_input.eventhub_name
  eventhub_consumer_group_name = var.eventhub_input.consumer_group_name
  servicebus_namespace         = var.eventhub_input.servicebus_namespace
  shared_access_policy_name    = var.eventhub_input.shared_access_policy_name
  shared_access_policy_key     = var.eventhub_input.shared_access_policy_key
  partition_key                = var.eventhub_input.partition_key

  serialization {
    type     = var.eventhub_input.serialization_type
    encoding = var.eventhub_input.serialization_type == "Avro" ? null : "UTF8"
  }
}

# ---------------------------------------------------------------------------
# Output: Blob / ADLS Gen2 sink (durable, cheap landing zone for results).
# ---------------------------------------------------------------------------
resource "azurerm_stream_analytics_output_blob" "this" {
  count = var.blob_output == null ? 0 : 1

  name                      = var.blob_output.name
  stream_analytics_job_name = azurerm_stream_analytics_job.this.name
  resource_group_name       = var.resource_group_name
  storage_account_name      = var.blob_output.storage_account_name
  storage_account_key       = var.blob_output.storage_account_key
  storage_container_name    = var.blob_output.container_name
  path_pattern              = var.blob_output.path_pattern
  date_format               = var.blob_output.date_format
  time_format               = var.blob_output.time_format
  batch_max_wait_time       = var.blob_output.batch_max_wait_time

  serialization {
    type     = var.blob_output.serialization_type
    encoding = var.blob_output.serialization_type == "Json" || var.blob_output.serialization_type == "Csv" ? "UTF8" : null
    format   = var.blob_output.serialization_type == "Json" ? var.blob_output.json_format : null
  }
}

# ---------------------------------------------------------------------------
# Diagnostics: Execution + Authoring logs and SU-utilization metrics.
# ---------------------------------------------------------------------------
resource "azurerm_monitor_diagnostic_setting" "this" {
  count = var.log_analytics_workspace_id == null ? 0 : 1

  name                       = "${local.job_name}-diag"
  target_resource_id         = azurerm_stream_analytics_job.this.id
  log_analytics_workspace_id = var.log_analytics_workspace_id

  enabled_log {
    category = "Execution"
  }

  enabled_log {
    category = "Authoring"
  }

  metric {
    category = "AllMetrics"
  }
}

variables.tf

variable "name" {
  type        = string
  description = "Name of the Stream Analytics job (3-63 chars: letters, numbers, hyphens)."

  validation {
    condition     = can(regex("^[a-zA-Z0-9-]{3,63}$", var.name))
    error_message = "name must be 3-63 characters: letters, numbers, and hyphens only."
  }
}

variable "resource_group_name" {
  type        = string
  description = "Resource group that hosts the Stream Analytics job."
}

variable "location" {
  type        = string
  description = "Azure region for the job (e.g. centralindia, eastus)."
}

variable "workload" {
  type        = string
  description = "Short workload identifier used in tags (e.g. clickstream, telemetry)."
}

variable "streaming_units" {
  type        = number
  description = "Provisioned Streaming Units. Must be 1, 3, or a multiple of 6 up to 396 for the Standard SKU."
  default     = 3

  validation {
    condition = contains(
      [1, 3, 6, 12, 18, 24, 30, 36, 42, 48, 54, 60, 66, 72, 78, 84, 90, 96, 108, 120, 144, 168, 192, 216, 240, 264, 288, 312, 336, 360, 384, 396],
      var.streaming_units
    )
    error_message = "streaming_units must be 1, 3, or a multiple of 6 (6..396) per the Standard SKU sizing rules."
  }
}

variable "sku_name" {
  type        = string
  description = "Stream Analytics SKU."
  default     = "Standard"

  validation {
    condition     = contains(["Standard", "StandardV2"], var.sku_name)
    error_message = "sku_name must be Standard or StandardV2."
  }
}

variable "compatibility_level" {
  type        = string
  description = "Compatibility level of the query engine."
  default     = "1.2"

  validation {
    condition     = contains(["1.0", "1.1", "1.2"], var.compatibility_level)
    error_message = "compatibility_level must be one of 1.0, 1.1, or 1.2."
  }
}

variable "data_locale" {
  type        = string
  description = "Data locale used for ordering and culture-sensitive operations."
  default     = "en-US"
}

variable "content_storage_policy" {
  type        = string
  description = "Where private query content is stored: SystemAccount or JobStorageAccount."
  default     = "SystemAccount"

  validation {
    condition     = contains(["SystemAccount", "JobStorageAccount"], var.content_storage_policy)
    error_message = "content_storage_policy must be SystemAccount or JobStorageAccount."
  }
}

variable "events_late_arrival_max_delay_in_seconds" {
  type        = number
  description = "Max tolerated delay (seconds) for late-arriving events. -1 means accept all, never drop."
  default     = 5

  validation {
    condition     = var.events_late_arrival_max_delay_in_seconds >= -1 && var.events_late_arrival_max_delay_in_seconds <= 1814400
    error_message = "events_late_arrival_max_delay_in_seconds must be between -1 and 1814400 (21 days)."
  }
}

variable "events_out_of_order_max_delay_in_seconds" {
  type        = number
  description = "Max tolerated out-of-order window (seconds) before the policy is applied."
  default     = 0

  validation {
    condition     = var.events_out_of_order_max_delay_in_seconds >= 0 && var.events_out_of_order_max_delay_in_seconds <= 599
    error_message = "events_out_of_order_max_delay_in_seconds must be between 0 and 599."
  }
}

variable "events_out_of_order_policy" {
  type        = string
  description = "How to treat out-of-order events: Adjust (re-order) or Drop."
  default     = "Adjust"

  validation {
    condition     = contains(["Adjust", "Drop"], var.events_out_of_order_policy)
    error_message = "events_out_of_order_policy must be Adjust or Drop."
  }
}

variable "output_error_policy" {
  type        = string
  description = "What to do with output records that cannot be written: Stop or Drop."
  default     = "Stop"

  validation {
    condition     = contains(["Stop", "Drop"], var.output_error_policy)
    error_message = "output_error_policy must be Stop or Drop."
  }
}

variable "transformation_query" {
  type        = string
  description = "The Stream Analytics SQL transformation query. Pass via file() or templatefile()."

  validation {
    condition     = length(trimspace(var.transformation_query)) > 0
    error_message = "transformation_query must be a non-empty Stream Analytics SQL statement."
  }
}

variable "eventhub_input" {
  description = "Event Hub stream input. Set to null to skip creating an Event Hub input."
  default     = null

  type = object({
    name                      = string
    eventhub_name             = string
    servicebus_namespace      = string
    shared_access_policy_name = string
    shared_access_policy_key  = string
    consumer_group_name       = optional(string, "$Default")
    partition_key             = optional(string)
    serialization_type        = optional(string, "Json")
  })

  validation {
    condition = var.eventhub_input == null ? true : contains(
      ["Json", "Avro", "Csv"], var.eventhub_input.serialization_type
    )
    error_message = "eventhub_input.serialization_type must be Json, Avro, or Csv."
  }
}

variable "blob_output" {
  description = "Blob / ADLS Gen2 output sink. Set to null to skip creating a Blob output."
  default     = null

  type = object({
    name                 = string
    storage_account_name = string
    storage_account_key  = string
    container_name       = string
    path_pattern         = optional(string, "{date}/{time}")
    date_format          = optional(string, "yyyy/MM/dd")
    time_format          = optional(string, "HH")
    batch_max_wait_time  = optional(string)
    serialization_type   = optional(string, "Json")
    json_format          = optional(string, "LineSeparated")
  })

  validation {
    condition = var.blob_output == null ? true : contains(
      ["Json", "Avro", "Csv", "Parquet"], var.blob_output.serialization_type
    )
    error_message = "blob_output.serialization_type must be Json, Avro, Csv, or Parquet."
  }

  validation {
    condition = var.blob_output == null ? true : (
      var.blob_output.serialization_type != "Json" ? true : contains(["LineSeparated", "Array"], var.blob_output.json_format)
    )
    error_message = "blob_output.json_format must be LineSeparated or Array when serialization_type is Json."
  }
}

variable "log_analytics_workspace_id" {
  type        = string
  description = "Log Analytics workspace ID for diagnostics. Set to null to disable diagnostic settings."
  default     = null
}

variable "tags" {
  type        = map(string)
  description = "Additional tags merged onto every resource."
  default     = {}
}

outputs.tf

output "id" {
  description = "Resource ID of the Stream Analytics job."
  value       = azurerm_stream_analytics_job.this.id
}

output "name" {
  description = "Name of the Stream Analytics job."
  value       = azurerm_stream_analytics_job.this.name
}

output "job_id" {
  description = "GUID job identifier assigned by Stream Analytics (distinct from the ARM resource ID)."
  value       = azurerm_stream_analytics_job.this.job_id
}

output "principal_id" {
  description = "Object ID of the job's system-assigned managed identity. Grant this Key Vault / SQL / ADLS access downstream."
  value       = try(azurerm_stream_analytics_job.this.identity[0].principal_id, null)
}

output "tenant_id" {
  description = "Tenant ID of the job's system-assigned managed identity."
  value       = try(azurerm_stream_analytics_job.this.identity[0].tenant_id, null)
}

output "eventhub_input_name" {
  description = "Name of the Event Hub stream input, if created."
  value       = try(azurerm_stream_analytics_stream_input_eventhub.this[0].name, null)
}

output "blob_output_name" {
  description = "Name of the Blob output, if created."
  value       = try(azurerm_stream_analytics_output_blob.this[0].name, null)
}

How to use it

A telemetry pipeline that reads from an Event Hub, computes a 1-minute tumbling-window average per device, and lands results in ADLS — then grants the job’s managed identity read access to the Key Vault that holds downstream secrets.

locals {
  asa_query = <<-SQL
    SELECT
        deviceId,
        System.Timestamp() AS windowEnd,
        AVG(temperature)    AS avgTemperature,
        MAX(temperature)    AS maxTemperature,
        COUNT(*)            AS readings
    INTO
        [adls-telemetry]
    FROM
        [eh-telemetry] TIMESTAMP BY eventTime
    GROUP BY
        deviceId,
        TumblingWindow(minute, 1)
  SQL
}

module "stream_analytics" {
  source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-azure-stream-analytics?ref=v1.0.0"

  name                = "asa-telemetry-prod"
  resource_group_name = azurerm_resource_group.streaming.name
  location            = azurerm_resource_group.streaming.location
  workload            = "telemetry"

  streaming_units            = 6
  compatibility_level        = "1.2"
  events_out_of_order_policy = "Adjust"
  output_error_policy        = "Drop"

  transformation_query = local.asa_query

  eventhub_input = {
    name                      = "eh-telemetry"
    eventhub_name             = azurerm_eventhub.telemetry.name
    servicebus_namespace      = azurerm_eventhub_namespace.telemetry.name
    shared_access_policy_name = "stream-analytics-listen"
    shared_access_policy_key  = azurerm_eventhub_namespace_authorization_rule.asa.primary_key
    consumer_group_name       = azurerm_eventhub_consumer_group.asa.name
    serialization_type        = "Json"
  }

  blob_output = {
    name                 = "adls-telemetry"
    storage_account_name = azurerm_storage_account.lake.name
    storage_account_key  = azurerm_storage_account.lake.primary_access_key
    container_name       = "telemetry-curated"
    path_pattern         = "device/{date}/{time}"
    serialization_type   = "Parquet"
  }

  log_analytics_workspace_id = azurerm_log_analytics_workspace.platform.id

  tags = {
    environment = "prod"
    cost-center = "data-platform"
  }
}

# Downstream: let the job's managed identity read secrets from Key Vault.
resource "azurerm_key_vault_access_policy" "asa" {
  key_vault_id = azurerm_key_vault.platform.id
  tenant_id    = module.stream_analytics.tenant_id
  object_id    = module.stream_analytics.principal_id

  secret_permissions = ["Get", "List"]
}

output "telemetry_job_id" {
  value = module.stream_analytics.id
}

With Terragrunt

Terragrunt keeps this module DRY across environments — define the backend and provider once in a root config, then a thin terragrunt.hcl per environment supplies only the inputs that differ.

1. Root configlive/terragrunt.hcl (inherited by every module):

remote_state {
  backend = "azurerm"
  generate = { path = "backend.tf", if_exists = "overwrite" }
  config = {
    # ...azurerm state bucket/container + key per path...
  }
}

2. Module configlive/prod/stream_analytics/terragrunt.hcl:

include "root" {
  path = find_in_parent_folders()
}

terraform {
  source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-azure-stream-analytics?ref=v1.0.0"
}

inputs = {
  name = "..."
  resource_group_name = "..."
  location = "..."
  workload = "..."
  transformation_query = "..."
}

3. Deploy one environment, or roll out all modules together:

cd live/prod/stream_analytics && terragrunt apply        # this module
terragrunt run-all apply                      # every module under live/prod

Why Terragrunt here: the backend and provider live in one place instead of being copy-pasted into every module; inputs is overridden per environment (dev / stage / prod) without forking the module; and run-all orchestrates dependencies across modules. Reach for it once you have more than one environment or more than a handful of modules — for a single stack, the plain Quickstart above is enough.

Inputs

Name Type Default Required Description
name string yes Job name, 3-63 chars (letters, numbers, hyphens).
resource_group_name string yes Resource group hosting the job.
location string yes Azure region for the job.
workload string yes Short workload identifier used in tags.
streaming_units number 3 no Provisioned SUs; must be 1, 3, or a multiple of 6 up to 396.
sku_name string "Standard" no SKU: Standard or StandardV2.
compatibility_level string "1.2" no Query engine compatibility level (1.0/1.1/1.2).
data_locale string "en-US" no Locale for ordering and culture-sensitive operations.
content_storage_policy string "SystemAccount" no Private content storage: SystemAccount or JobStorageAccount.
events_late_arrival_max_delay_in_seconds number 5 no Max late-arrival delay; -1 accepts all. Range -11814400.
events_out_of_order_max_delay_in_seconds number 0 no Out-of-order tolerance window. Range 0599.
events_out_of_order_policy string "Adjust" no Out-of-order handling: Adjust or Drop.
output_error_policy string "Stop" no Output write-failure handling: Stop or Drop.
transformation_query string yes The Stream Analytics SQL transformation (non-empty).
eventhub_input object null no Event Hub stream input config; null to skip.
blob_output object null no Blob/ADLS output sink config; null to skip.
log_analytics_workspace_id string null no Workspace for diagnostics; null disables diagnostic settings.
tags map(string) {} no Additional tags merged onto every resource.

Outputs

Name Description
id Resource ID of the Stream Analytics job.
name Name of the Stream Analytics job.
job_id GUID job identifier assigned by Stream Analytics (distinct from the ARM resource ID).
principal_id Object ID of the job’s system-assigned managed identity for downstream RBAC.
tenant_id Tenant ID of the job’s system-assigned managed identity.
eventhub_input_name Name of the Event Hub stream input, if created.
blob_output_name Name of the Blob output, if created.

Enterprise scenario

A logistics company ingests GPS pings from 40,000 delivery vehicles into Event Hubs. The platform team instantiates this module once per region (asa-tracking-centralindia, asa-tracking-westeurope), each with a 5-second tumbling window that computes per-vehicle speed and geofence-breach flags, writing Parquet to ADLS Gen2 for the lakehouse and a filtered alert stream onward. Because the SU count and events_out_of_order_policy are tfvars, they run 6 SUs in dev and 24 SUs in production from identical code, and the job’s managed identity — surfaced via the principal_id output — is granted ADLS Storage Blob Data Contributor without anyone pasting a key into the portal.

Best practices

TerraformAzureStream AnalyticsModuleIaC
Need this built for real?

Vinod is a Senior Cloud Architect (22+ yrs) — available for Azure / AWS / GCP architecture, landing zones, and migrations.

Work with me

Comments

Keep Reading