Quick take — A reusable hashicorp/azurerm module for Azure Stream Analytics jobs: declare streaming units, the transformation query, Event Hub inputs and Blob/SQL outputs, managed identity, and diagnostics — all version-pinned and var-driven. New here? Jump to the Quickstart below to deploy it in minutes; read on for how it works and when to reach for it.
Quickstart (copy-paste)
Minimal, runnable configuration — drop this in a .tf file and fill in the "..." placeholders (each required input is commented):
provider "azurerm" {
features {}
}
module "stream_analytics" {
source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-azure-stream-analytics?ref=v1.0.0"
name = "..." # Job name, 3-63 chars (letters, numbers, hyphens).
resource_group_name = "..." # Resource group hosting the job.
location = "..." # Azure region for the job.
workload = "..." # Short workload identifier used in tags.
transformation_query = "..." # The Stream Analytics SQL transformation (non-empty).
}
Then terraform init && terraform apply. Every other input has a sensible default — see Inputs below to override behaviour.
What this module is
Azure Stream Analytics (ASA) is a fully managed, serverless real-time analytics engine. You point it at a streaming source (Event Hubs, IoT Hub, Blob), write a SQL-like query that filters, aggregates over tumbling/hopping/sliding windows, joins reference data, and detects anomalies, and it continuously writes results to a sink (Blob/ADLS, SQL, Event Hub, Cosmos DB, Power BI). You pay for the Streaming Units (SUs) you provision, not for individual events — so the job definition, its capacity, and its query are exactly the kind of thing you want pinned in code rather than clicked together in the portal.
The problem with hand-built ASA jobs is that the moving parts drift apart. The job, its inputs, its outputs, the transformation query, the managed identity that lets it reach Key Vault or a SQL database, and the diagnostic settings all live in different blades. A reusable Terraform module wraps azurerm_stream_analytics_job together with the input/output resources and the transformation query so that one module block produces a complete, wired-up, observable streaming pipeline. The SU count, query, and event-ordering policy become reviewable variables; the job’s principal ID becomes an output you can hand to a Key Vault access policy or a SQL role assignment downstream.
When to use it
- You run more than one streaming pipeline (clickstream, telemetry, fraud signals) and want each to be a 15-line module call with a different query and SU count, not a hand-assembled set of blades.
- You need the transformation query in version control so a windowed aggregation or watermark change goes through pull-request review and
terraform planshows the diff before it hits production. - You want the job’s system-assigned managed identity wired to downstream Key Vault / SQL / ADLS access without copy-pasting principal IDs between portal blades.
- You require consistent diagnostics — Execution and Authoring logs plus SU-utilization metrics shipped to Log Analytics — on every job by default.
- You operate across dev/test/prod and need the same job topology with different SU counts,
streaming_units, andevents_out_of_order_policyper environment viatfvars.
Reach for raw resources instead only when you have a single throwaway job, or when you need an input/output connector type this module deliberately does not expose yet (e.g. Cosmos DB output) and you would rather not extend it.
Module structure
terraform-module-azure-stream-analytics/
├── versions.tf # provider + Terraform version pinning
├── main.tf # ASA job + input + output + transformation + diagnostics
├── variables.tf # var-driven inputs with validation
└── outputs.tf # id, name, principal_id, and key attributes
versions.tf
terraform {
required_version = ">= 1.5.0"
required_providers {
azurerm = {
source = "hashicorp/azurerm"
version = "~> 4.0"
}
}
}
main.tf
locals {
# Stream Analytics job names are constrained to 3-63 chars: letters, numbers, hyphens.
job_name = lower(var.name)
tags = merge(
{
"managed-by" = "terraform"
"module" = "terraform-module-azure-stream-analytics"
"workload" = var.workload
},
var.tags
)
}
resource "azurerm_stream_analytics_job" "this" {
name = local.job_name
resource_group_name = var.resource_group_name
location = var.location
# Capacity & SQL surface.
streaming_units = var.streaming_units
sku_name = var.sku_name
compatibility_level = var.compatibility_level
data_locale = var.data_locale
content_storage_policy = var.content_storage_policy
# Late/early/out-of-order handling — critical for windowed aggregations.
events_late_arrival_max_delay_in_seconds = var.events_late_arrival_max_delay_in_seconds
events_out_of_order_max_delay_in_seconds = var.events_out_of_order_max_delay_in_seconds
events_out_of_order_policy = var.events_out_of_order_policy
output_error_policy = var.output_error_policy
# The SQL transformation. Keep this in a .sql file and pass via file()/templatefile().
transformation_query = var.transformation_query
# System-assigned managed identity lets the job reach Key Vault / SQL / ADLS
# without connection-string secrets.
identity {
type = "SystemAssigned"
}
tags = local.tags
}
# ---------------------------------------------------------------------------
# Input: Event Hub stream (the most common ASA source in production).
# ---------------------------------------------------------------------------
resource "azurerm_stream_analytics_stream_input_eventhub" "this" {
count = var.eventhub_input == null ? 0 : 1
name = var.eventhub_input.name
stream_analytics_job_name = azurerm_stream_analytics_job.this.name
resource_group_name = var.resource_group_name
eventhub_name = var.eventhub_input.eventhub_name
eventhub_consumer_group_name = var.eventhub_input.consumer_group_name
servicebus_namespace = var.eventhub_input.servicebus_namespace
shared_access_policy_name = var.eventhub_input.shared_access_policy_name
shared_access_policy_key = var.eventhub_input.shared_access_policy_key
partition_key = var.eventhub_input.partition_key
serialization {
type = var.eventhub_input.serialization_type
encoding = var.eventhub_input.serialization_type == "Avro" ? null : "UTF8"
}
}
# ---------------------------------------------------------------------------
# Output: Blob / ADLS Gen2 sink (durable, cheap landing zone for results).
# ---------------------------------------------------------------------------
resource "azurerm_stream_analytics_output_blob" "this" {
count = var.blob_output == null ? 0 : 1
name = var.blob_output.name
stream_analytics_job_name = azurerm_stream_analytics_job.this.name
resource_group_name = var.resource_group_name
storage_account_name = var.blob_output.storage_account_name
storage_account_key = var.blob_output.storage_account_key
storage_container_name = var.blob_output.container_name
path_pattern = var.blob_output.path_pattern
date_format = var.blob_output.date_format
time_format = var.blob_output.time_format
batch_max_wait_time = var.blob_output.batch_max_wait_time
serialization {
type = var.blob_output.serialization_type
encoding = var.blob_output.serialization_type == "Json" || var.blob_output.serialization_type == "Csv" ? "UTF8" : null
format = var.blob_output.serialization_type == "Json" ? var.blob_output.json_format : null
}
}
# ---------------------------------------------------------------------------
# Diagnostics: Execution + Authoring logs and SU-utilization metrics.
# ---------------------------------------------------------------------------
resource "azurerm_monitor_diagnostic_setting" "this" {
count = var.log_analytics_workspace_id == null ? 0 : 1
name = "${local.job_name}-diag"
target_resource_id = azurerm_stream_analytics_job.this.id
log_analytics_workspace_id = var.log_analytics_workspace_id
enabled_log {
category = "Execution"
}
enabled_log {
category = "Authoring"
}
metric {
category = "AllMetrics"
}
}
variables.tf
variable "name" {
type = string
description = "Name of the Stream Analytics job (3-63 chars: letters, numbers, hyphens)."
validation {
condition = can(regex("^[a-zA-Z0-9-]{3,63}$", var.name))
error_message = "name must be 3-63 characters: letters, numbers, and hyphens only."
}
}
variable "resource_group_name" {
type = string
description = "Resource group that hosts the Stream Analytics job."
}
variable "location" {
type = string
description = "Azure region for the job (e.g. centralindia, eastus)."
}
variable "workload" {
type = string
description = "Short workload identifier used in tags (e.g. clickstream, telemetry)."
}
variable "streaming_units" {
type = number
description = "Provisioned Streaming Units. Must be 1, 3, or a multiple of 6 up to 396 for the Standard SKU."
default = 3
validation {
condition = contains(
[1, 3, 6, 12, 18, 24, 30, 36, 42, 48, 54, 60, 66, 72, 78, 84, 90, 96, 108, 120, 144, 168, 192, 216, 240, 264, 288, 312, 336, 360, 384, 396],
var.streaming_units
)
error_message = "streaming_units must be 1, 3, or a multiple of 6 (6..396) per the Standard SKU sizing rules."
}
}
variable "sku_name" {
type = string
description = "Stream Analytics SKU."
default = "Standard"
validation {
condition = contains(["Standard", "StandardV2"], var.sku_name)
error_message = "sku_name must be Standard or StandardV2."
}
}
variable "compatibility_level" {
type = string
description = "Compatibility level of the query engine."
default = "1.2"
validation {
condition = contains(["1.0", "1.1", "1.2"], var.compatibility_level)
error_message = "compatibility_level must be one of 1.0, 1.1, or 1.2."
}
}
variable "data_locale" {
type = string
description = "Data locale used for ordering and culture-sensitive operations."
default = "en-US"
}
variable "content_storage_policy" {
type = string
description = "Where private query content is stored: SystemAccount or JobStorageAccount."
default = "SystemAccount"
validation {
condition = contains(["SystemAccount", "JobStorageAccount"], var.content_storage_policy)
error_message = "content_storage_policy must be SystemAccount or JobStorageAccount."
}
}
variable "events_late_arrival_max_delay_in_seconds" {
type = number
description = "Max tolerated delay (seconds) for late-arriving events. -1 means accept all, never drop."
default = 5
validation {
condition = var.events_late_arrival_max_delay_in_seconds >= -1 && var.events_late_arrival_max_delay_in_seconds <= 1814400
error_message = "events_late_arrival_max_delay_in_seconds must be between -1 and 1814400 (21 days)."
}
}
variable "events_out_of_order_max_delay_in_seconds" {
type = number
description = "Max tolerated out-of-order window (seconds) before the policy is applied."
default = 0
validation {
condition = var.events_out_of_order_max_delay_in_seconds >= 0 && var.events_out_of_order_max_delay_in_seconds <= 599
error_message = "events_out_of_order_max_delay_in_seconds must be between 0 and 599."
}
}
variable "events_out_of_order_policy" {
type = string
description = "How to treat out-of-order events: Adjust (re-order) or Drop."
default = "Adjust"
validation {
condition = contains(["Adjust", "Drop"], var.events_out_of_order_policy)
error_message = "events_out_of_order_policy must be Adjust or Drop."
}
}
variable "output_error_policy" {
type = string
description = "What to do with output records that cannot be written: Stop or Drop."
default = "Stop"
validation {
condition = contains(["Stop", "Drop"], var.output_error_policy)
error_message = "output_error_policy must be Stop or Drop."
}
}
variable "transformation_query" {
type = string
description = "The Stream Analytics SQL transformation query. Pass via file() or templatefile()."
validation {
condition = length(trimspace(var.transformation_query)) > 0
error_message = "transformation_query must be a non-empty Stream Analytics SQL statement."
}
}
variable "eventhub_input" {
description = "Event Hub stream input. Set to null to skip creating an Event Hub input."
default = null
type = object({
name = string
eventhub_name = string
servicebus_namespace = string
shared_access_policy_name = string
shared_access_policy_key = string
consumer_group_name = optional(string, "$Default")
partition_key = optional(string)
serialization_type = optional(string, "Json")
})
validation {
condition = var.eventhub_input == null ? true : contains(
["Json", "Avro", "Csv"], var.eventhub_input.serialization_type
)
error_message = "eventhub_input.serialization_type must be Json, Avro, or Csv."
}
}
variable "blob_output" {
description = "Blob / ADLS Gen2 output sink. Set to null to skip creating a Blob output."
default = null
type = object({
name = string
storage_account_name = string
storage_account_key = string
container_name = string
path_pattern = optional(string, "{date}/{time}")
date_format = optional(string, "yyyy/MM/dd")
time_format = optional(string, "HH")
batch_max_wait_time = optional(string)
serialization_type = optional(string, "Json")
json_format = optional(string, "LineSeparated")
})
validation {
condition = var.blob_output == null ? true : contains(
["Json", "Avro", "Csv", "Parquet"], var.blob_output.serialization_type
)
error_message = "blob_output.serialization_type must be Json, Avro, Csv, or Parquet."
}
validation {
condition = var.blob_output == null ? true : (
var.blob_output.serialization_type != "Json" ? true : contains(["LineSeparated", "Array"], var.blob_output.json_format)
)
error_message = "blob_output.json_format must be LineSeparated or Array when serialization_type is Json."
}
}
variable "log_analytics_workspace_id" {
type = string
description = "Log Analytics workspace ID for diagnostics. Set to null to disable diagnostic settings."
default = null
}
variable "tags" {
type = map(string)
description = "Additional tags merged onto every resource."
default = {}
}
outputs.tf
output "id" {
description = "Resource ID of the Stream Analytics job."
value = azurerm_stream_analytics_job.this.id
}
output "name" {
description = "Name of the Stream Analytics job."
value = azurerm_stream_analytics_job.this.name
}
output "job_id" {
description = "GUID job identifier assigned by Stream Analytics (distinct from the ARM resource ID)."
value = azurerm_stream_analytics_job.this.job_id
}
output "principal_id" {
description = "Object ID of the job's system-assigned managed identity. Grant this Key Vault / SQL / ADLS access downstream."
value = try(azurerm_stream_analytics_job.this.identity[0].principal_id, null)
}
output "tenant_id" {
description = "Tenant ID of the job's system-assigned managed identity."
value = try(azurerm_stream_analytics_job.this.identity[0].tenant_id, null)
}
output "eventhub_input_name" {
description = "Name of the Event Hub stream input, if created."
value = try(azurerm_stream_analytics_stream_input_eventhub.this[0].name, null)
}
output "blob_output_name" {
description = "Name of the Blob output, if created."
value = try(azurerm_stream_analytics_output_blob.this[0].name, null)
}
How to use it
A telemetry pipeline that reads from an Event Hub, computes a 1-minute tumbling-window average per device, and lands results in ADLS — then grants the job’s managed identity read access to the Key Vault that holds downstream secrets.
locals {
asa_query = <<-SQL
SELECT
deviceId,
System.Timestamp() AS windowEnd,
AVG(temperature) AS avgTemperature,
MAX(temperature) AS maxTemperature,
COUNT(*) AS readings
INTO
[adls-telemetry]
FROM
[eh-telemetry] TIMESTAMP BY eventTime
GROUP BY
deviceId,
TumblingWindow(minute, 1)
SQL
}
module "stream_analytics" {
source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-azure-stream-analytics?ref=v1.0.0"
name = "asa-telemetry-prod"
resource_group_name = azurerm_resource_group.streaming.name
location = azurerm_resource_group.streaming.location
workload = "telemetry"
streaming_units = 6
compatibility_level = "1.2"
events_out_of_order_policy = "Adjust"
output_error_policy = "Drop"
transformation_query = local.asa_query
eventhub_input = {
name = "eh-telemetry"
eventhub_name = azurerm_eventhub.telemetry.name
servicebus_namespace = azurerm_eventhub_namespace.telemetry.name
shared_access_policy_name = "stream-analytics-listen"
shared_access_policy_key = azurerm_eventhub_namespace_authorization_rule.asa.primary_key
consumer_group_name = azurerm_eventhub_consumer_group.asa.name
serialization_type = "Json"
}
blob_output = {
name = "adls-telemetry"
storage_account_name = azurerm_storage_account.lake.name
storage_account_key = azurerm_storage_account.lake.primary_access_key
container_name = "telemetry-curated"
path_pattern = "device/{date}/{time}"
serialization_type = "Parquet"
}
log_analytics_workspace_id = azurerm_log_analytics_workspace.platform.id
tags = {
environment = "prod"
cost-center = "data-platform"
}
}
# Downstream: let the job's managed identity read secrets from Key Vault.
resource "azurerm_key_vault_access_policy" "asa" {
key_vault_id = azurerm_key_vault.platform.id
tenant_id = module.stream_analytics.tenant_id
object_id = module.stream_analytics.principal_id
secret_permissions = ["Get", "List"]
}
output "telemetry_job_id" {
value = module.stream_analytics.id
}
With Terragrunt
Terragrunt keeps this module DRY across environments — define the backend and provider once in a root config, then a thin terragrunt.hcl per environment supplies only the inputs that differ.
1. Root config — live/terragrunt.hcl (inherited by every module):
remote_state {
backend = "azurerm"
generate = { path = "backend.tf", if_exists = "overwrite" }
config = {
# ...azurerm state bucket/container + key per path...
}
}
2. Module config — live/prod/stream_analytics/terragrunt.hcl:
include "root" {
path = find_in_parent_folders()
}
terraform {
source = "git::https://dev.azure.com/teknohut/kloudvin/_git/terraform-modules//terraform-module-azure-stream-analytics?ref=v1.0.0"
}
inputs = {
name = "..."
resource_group_name = "..."
location = "..."
workload = "..."
transformation_query = "..."
}
3. Deploy one environment, or roll out all modules together:
cd live/prod/stream_analytics && terragrunt apply # this module
terragrunt run-all apply # every module under live/prod
Why Terragrunt here: the backend and provider live in one place instead of being copy-pasted into every module; inputs is overridden per environment (dev / stage / prod) without forking the module; and run-all orchestrates dependencies across modules. Reach for it once you have more than one environment or more than a handful of modules — for a single stack, the plain Quickstart above is enough.
Inputs
| Name | Type | Default | Required | Description |
|---|---|---|---|---|
name |
string |
— | yes | Job name, 3-63 chars (letters, numbers, hyphens). |
resource_group_name |
string |
— | yes | Resource group hosting the job. |
location |
string |
— | yes | Azure region for the job. |
workload |
string |
— | yes | Short workload identifier used in tags. |
streaming_units |
number |
3 |
no | Provisioned SUs; must be 1, 3, or a multiple of 6 up to 396. |
sku_name |
string |
"Standard" |
no | SKU: Standard or StandardV2. |
compatibility_level |
string |
"1.2" |
no | Query engine compatibility level (1.0/1.1/1.2). |
data_locale |
string |
"en-US" |
no | Locale for ordering and culture-sensitive operations. |
content_storage_policy |
string |
"SystemAccount" |
no | Private content storage: SystemAccount or JobStorageAccount. |
events_late_arrival_max_delay_in_seconds |
number |
5 |
no | Max late-arrival delay; -1 accepts all. Range -1…1814400. |
events_out_of_order_max_delay_in_seconds |
number |
0 |
no | Out-of-order tolerance window. Range 0…599. |
events_out_of_order_policy |
string |
"Adjust" |
no | Out-of-order handling: Adjust or Drop. |
output_error_policy |
string |
"Stop" |
no | Output write-failure handling: Stop or Drop. |
transformation_query |
string |
— | yes | The Stream Analytics SQL transformation (non-empty). |
eventhub_input |
object |
null |
no | Event Hub stream input config; null to skip. |
blob_output |
object |
null |
no | Blob/ADLS output sink config; null to skip. |
log_analytics_workspace_id |
string |
null |
no | Workspace for diagnostics; null disables diagnostic settings. |
tags |
map(string) |
{} |
no | Additional tags merged onto every resource. |
Outputs
| Name | Description |
|---|---|
id |
Resource ID of the Stream Analytics job. |
name |
Name of the Stream Analytics job. |
job_id |
GUID job identifier assigned by Stream Analytics (distinct from the ARM resource ID). |
principal_id |
Object ID of the job’s system-assigned managed identity for downstream RBAC. |
tenant_id |
Tenant ID of the job’s system-assigned managed identity. |
eventhub_input_name |
Name of the Event Hub stream input, if created. |
blob_output_name |
Name of the Blob output, if created. |
Enterprise scenario
A logistics company ingests GPS pings from 40,000 delivery vehicles into Event Hubs. The platform team instantiates this module once per region (asa-tracking-centralindia, asa-tracking-westeurope), each with a 5-second tumbling window that computes per-vehicle speed and geofence-breach flags, writing Parquet to ADLS Gen2 for the lakehouse and a filtered alert stream onward. Because the SU count and events_out_of_order_policy are tfvars, they run 6 SUs in dev and 24 SUs in production from identical code, and the job’s managed identity — surfaced via the principal_id output — is granted ADLS Storage Blob Data Contributor without anyone pasting a key into the portal.
Best practices
- Keep the query in a
.sqlfile and load it withtemplatefile(), not inline strings. The transformation is the riskiest part of an ASA job; a versioned file gives clean PR diffs and lets you inject environment-specific table or window parameters without forking the query. - Size SUs to the SU-utilization metric, then leave headroom. Sustained utilization above ~80% causes watermark delay and backlog; the module ships
AllMetricsto Log Analytics by default so you can alert onSU% Utilizationand scalestreaming_units(1 → 3 → multiples of 6) deliberately rather than reactively. - Prefer managed identity over connection-string keys for sinks. Use the
principal_idoutput to grant the job RBAC on Key Vault, SQL, or ADLS; reserveshared_access_policy_key/storage_account_keyfor sources that do not support MSI, and source those secrets from Key Vault — never hard-code them intfvars. - Choose
events_out_of_order_policyand late-arrival delay per workload, not per default. Financial or ordering-sensitive streams should accept late events (-1) andAdjust; high-throughput telemetry can affordDropto protect latency. The wrong setting silently corrupts windowed aggregates. - Set
output_error_policy = "Drop"for resilient pipelines but alert on dropped events.Stophalts the whole job on a single malformed output record;Dropkeeps the stream flowing but must be paired with a Log Analytics alert on the data-conversion-error metric so silent loss is caught. - Name and tag jobs by workload and region (
asa-<workload>-<region>), and grant the input Event Hub a dedicatedListen-only authorization rule per job so a compromised job key cannot publish back into the namespace.