A logistics company runs 40,000 GPS-and-telemetry-emitting vehicles, and the operations desk wants a live map plus exception alerts (“this refrigerated trailer breached 8 °C twelve minutes ago”) instead of the hourly batch they have today. Telemetry arrives as a Kafka topic at roughly 18,000 events/second at peak. The data team’s mandate is to land every event in Snowflake within a couple of seconds, transform it into clean per-vehicle and per-alert tables continuously, and do it without standing up a Spark cluster or paying for an always-on warehouse that idles between bursts. This guide builds exactly that: Snowpipe Streaming for sub-second row-level ingestion into a raw table, then Streams and Tasks to incrementally transform that raw table into curated, queryable models — the serverless, no-files, near-real-time pattern Snowflake is built for.
The shape that makes this work: Snowpipe Streaming writes rows directly through a channel (no staged files, no per-file overhead, billed by throughput), a Stream is a change-tracking cursor over the raw table that exposes only new rows since you last consumed it, and a Task runs on a schedule or trigger to drain that Stream into the next layer. Chained together you get a continuous medallion pipeline (raw → enriched → curated) with offset-exactly-once semantics and second-scale latency.
Prerequisites
- A Snowflake account on Enterprise edition or higher (Tasks and Streams need it; serverless Tasks need it too), and a role with
ACCOUNTADMINavailable for the one-time grants. - The Snowflake Ingest SDK (Java 2.x,
snowflake-ingest-sdk) or a Kafka Connector ≥ 2.1 withsnowflake.ingestion.method=SNOWPIPE_STREAMING. This guide uses the Kafka Connector path since the source is Kafka. - A running Kafka (or Confluent) cluster with the
vehicle.telemetrytopic, and a Kafka Connect worker you can deploy a connector to. snowsqlCLI configured, plus Terraform ≥ 1.6 with theSnowflake-Labs/snowflakeprovider for the account objects.- An RSA key pair for key-pair authentication (Snowpipe Streaming clients authenticate with a JWT, not a password).
- Workforce SSO via Okta federated to Microsoft Entra ID already governing human access to Snowsight; HashiCorp Vault available to hold the connector’s private key and the Snowflake credentials.
Target topology
Two planes share the warehouse but run on different clocks. The ingestion plane is push-driven and serverless: Kafka Connect opens a Snowpipe Streaming channel per topic-partition and streams rows straight into RAW.TELEMETRY with no compute warehouse involved — Snowflake bills the streaming throughput directly. The transformation plane is change-driven: a Stream on RAW.TELEMETRY tracks unconsumed rows, and a chain of Tasks (running on a small dedicated warehouse, or serverless) drains the Stream every minute into ENRICHED.VEHICLE_STATE and CURATED.COLD_CHAIN_ALERTS.
Around the data path sit the enterprise controls the platform team actually operates: Okta → Entra ID federates human SSO into Snowsight with SCIM-provisioned roles; HashiCorp Vault issues the connector’s private key and Snowflake login so no secret lands in a Kafka Connect config file; Terraform provisions every Snowflake object (database, schemas, roles, warehouse, tasks) declaratively; Datadog scrapes the Kafka Connector JMX metrics and Snowflake’s SNOWPIPE_STREAMING_* views for lag and cost dashboards; Wiz runs posture checks against the Snowflake account and the connector’s host; CrowdStrike Falcon protects the Kafka Connect VM at runtime; GitHub Actions applies the Terraform and dbt-style SQL through an OIDC-authenticated pipeline; and ServiceNow is the change gate before a new Task or schema goes live in production.
1. Provision the Snowflake objects with Terraform
Create the database, schemas, a dedicated transform warehouse, and a least-privilege role. Keep this in version control; the GitHub Actions pipeline applies it.
# main.tf — Snowflake-Labs/snowflake provider
terraform {
required_providers {
snowflake = { source = "Snowflake-Labs/snowflake", version = "~> 0.95" }
}
}
resource "snowflake_database" "telemetry" {
name = "TELEMETRY"
}
resource "snowflake_schema" "raw" { database = snowflake_database.telemetry.name, name = "RAW" }
resource "snowflake_schema" "enriched" { database = snowflake_database.telemetry.name, name = "ENRICHED" }
resource "snowflake_schema" "curated" { database = snowflake_database.telemetry.name, name = "CURATED" }
# A small, dedicated warehouse for the transform Tasks (separate from BI/ad-hoc)
resource "snowflake_warehouse" "transform" {
name = "WH_TELEMETRY_TRANSFORM"
warehouse_size = "XSMALL"
auto_suspend = 60 # seconds; suspend fast between task runs
auto_resume = true
initially_suspended = true
}
# Role the Kafka connector and the tasks run as
resource "snowflake_role" "ingest" { name = "ROLE_TELEMETRY_INGEST" }
resource "snowflake_grant_privileges_to_account_role" "db_usage" {
account_role_name = snowflake_role.ingest.name
privileges = ["USAGE"]
on_account_object { object_type = "DATABASE", object_name = snowflake_database.telemetry.name }
}
resource "snowflake_grant_privileges_to_account_role" "wh_usage" {
account_role_name = snowflake_role.ingest.name
privileges = ["USAGE", "OPERATE"]
on_account_object { object_type = "WAREHOUSE", object_name = snowflake_warehouse.transform.name }
}
Apply it:
terraform init
terraform plan -out=telemetry.plan
terraform apply telemetry.plan
The auto_suspend = 60 matters: the transform warehouse should resume only when a Task fires and suspend the instant it idles, so you pay for seconds of compute per minute, not a running warehouse.
2. Create the raw landing table and the ingest user
Snowpipe Streaming writes into an ordinary table — you do not pre-create a “stream object” to write to. Define the raw table with the columns your events carry plus an ingest timestamp, then create the service user the connector authenticates as.
USE ROLE SYSADMIN;
USE DATABASE TELEMETRY;
USE SCHEMA RAW;
CREATE OR REPLACE TABLE RAW.TELEMETRY (
RECORD_METADATA VARIANT, -- Kafka connector writes offset/partition/topic here
VEHICLE_ID STRING,
EVENT_TS TIMESTAMP_NTZ,
LAT FLOAT,
LON FLOAT,
SPEED_KMH NUMBER(6,2),
TEMP_C NUMBER(5,2),
RAW_PAYLOAD VARIANT,
LOADED_AT TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);
Create the user with key-pair auth. Generate the key first (the private key goes into HashiCorp Vault, never to disk on the connector host long-term):
# Generate an encrypted RSA private key + public key
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Store the private key in Vault; the connector pulls it at startup
vault kv put secret/snowflake/telemetry-ingest \
private_key=@rsa_key.p8 \
account="kv-org.eu-west-1" user="SVC_TELEMETRY_INGEST"
Register the public key on the Snowflake user:
USE ROLE ACCOUNTADMIN;
CREATE USER SVC_TELEMETRY_INGEST
DEFAULT_ROLE = ROLE_TELEMETRY_INGEST
RSA_PUBLIC_KEY = 'MIIBIjANBgkqh...<contents of rsa_key.pub, no header/footer>...'
COMMENT = 'Snowpipe Streaming service account (key-pair auth, Vault-held key)';
GRANT ROLE ROLE_TELEMETRY_INGEST TO USER SVC_TELEMETRY_INGEST;
-- The ingest role needs INSERT on the landing table
GRANT USAGE ON SCHEMA TELEMETRY.RAW TO ROLE ROLE_TELEMETRY_INGEST;
GRANT INSERT ON TABLE TELEMETRY.RAW.TELEMETRY TO ROLE ROLE_TELEMETRY_INGEST;
Human access is a separate path: analysts reach Snowsight through Okta federated to Entra ID (SAML SSO with SCIM provisioning), so no person ever shares the SVC_TELEMETRY_INGEST credential.
3. Deploy the Kafka Connector in Snowpipe Streaming mode
The Snowflake Kafka Connector, told to use the streaming ingestion method, opens one Snowpipe Streaming channel per topic-partition and pushes rows continuously. The connector config pulls its private key from Vault at deploy time rather than hardcoding it.
{
"name": "telemetry-snowpipe-streaming",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max": "8",
"topics": "vehicle.telemetry",
"snowflake.url.name": "kv-org.eu-west-1.snowflakecomputing.com:443",
"snowflake.user.name": "SVC_TELEMETRY_INGEST",
"snowflake.role.name": "ROLE_TELEMETRY_INGEST",
"snowflake.private.key": "${vault:secret/snowflake/telemetry-ingest:private_key}",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"snowflake.streaming.enable.single.buffer": "true",
"snowflake.enable.schematization": "true",
"snowflake.database.name": "TELEMETRY",
"snowflake.schema.name": "RAW",
"snowflake.topic2table.map": "vehicle.telemetry:TELEMETRY",
"buffer.flush.time": "1",
"buffer.count.records": "10000",
"buffer.size.bytes": "20000000",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
Deploy it to the Connect worker:
curl -s -X PUT -H "Content-Type: application/json" \
--data @telemetry-snowpipe-streaming.json \
http://kafka-connect:8083/connectors/telemetry-snowpipe-streaming/config | jq .
# Confirm it is RUNNING and tasks are healthy
curl -s http://kafka-connect:8083/connectors/telemetry-snowpipe-streaming/status | jq .
The three knobs that control latency-vs-cost: buffer.flush.time = 1 second is the dominant lever (lower latency, more frequent commits), while buffer.count.records and buffer.size.bytes cap the buffer so a quiet partition still flushes on the timer. With tasks.max=8 over an 8-partition topic, you get eight parallel channels. The CrowdStrike Falcon sensor on the Connect VM and Wiz posture scanning of the host run independently of this config.
4. Create the Stream on the raw table
A Stream is a change-tracking object: it records an offset into the table and, when queried, returns only the rows inserted (or changed) since that offset was last advanced. For an append-only landing table, an append-only stream is cheaper and simpler than a standard one because it ignores deletes/updates and never needs to compute them.
USE ROLE SYSADMIN;
USE SCHEMA TELEMETRY.RAW;
CREATE OR REPLACE STREAM RAW.TELEMETRY_STREAM
ON TABLE RAW.TELEMETRY
APPEND_ONLY = TRUE
COMMENT = 'CDC cursor feeding the transform tasks';
-- Inspect what the stream currently exposes (does NOT advance the offset)
SELECT SYSTEM$STREAM_HAS_DATA('RAW.TELEMETRY_STREAM');
SELECT VEHICLE_ID, EVENT_TS, TEMP_C, METADATA$ACTION
FROM RAW.TELEMETRY_STREAM
LIMIT 20;
Key behavior to internalise: the Stream’s offset only advances when it is consumed inside a DML statement that succeeds in a transaction — which is exactly what a Task’s INSERT ... FROM stream does. Merely SELECT-ing the stream (as above) shows you the rows but does not move the cursor, so it is safe to inspect.
5. Build the transform Tasks and chain them
Now the transformation plane. Task 1 drains the Stream into the enriched per-vehicle table; Task 2 runs after Task 1 (a dependency, not its own schedule) to derive cold-chain alerts. Use WHEN SYSTEM$STREAM_HAS_DATA(...) so a Task that finds an empty stream skips its run and does not consume a credit — the single most important cost control in this whole design.
USE ROLE SYSADMIN;
USE SCHEMA TELEMETRY.ENRICHED;
-- Targets
CREATE OR REPLACE TABLE ENRICHED.VEHICLE_STATE (
VEHICLE_ID STRING, EVENT_TS TIMESTAMP_NTZ, LAT FLOAT, LON FLOAT,
SPEED_KMH NUMBER(6,2), TEMP_C NUMBER(5,2), LOADED_AT TIMESTAMP_LTZ
);
CREATE OR REPLACE TABLE CURATED.COLD_CHAIN_ALERTS (
VEHICLE_ID STRING, EVENT_TS TIMESTAMP_NTZ, TEMP_C NUMBER(5,2),
THRESHOLD_C NUMBER(5,2), DETECTED_AT TIMESTAMP_LTZ
);
-- Task 1: raw stream -> enriched, runs every minute
CREATE OR REPLACE TASK ENRICHED.T_LOAD_VEHICLE_STATE
WAREHOUSE = WH_TELEMETRY_TRANSFORM
SCHEDULE = '1 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('TELEMETRY.RAW.TELEMETRY_STREAM')
AS
INSERT INTO ENRICHED.VEHICLE_STATE
SELECT VEHICLE_ID, EVENT_TS, LAT, LON, SPEED_KMH, TEMP_C, CURRENT_TIMESTAMP()
FROM TELEMETRY.RAW.TELEMETRY_STREAM
WHERE METADATA$ACTION = 'INSERT';
-- Task 2: depends on Task 1 (no schedule of its own), derives alerts
CREATE OR REPLACE TASK CURATED.T_DERIVE_COLD_CHAIN_ALERTS
WAREHOUSE = WH_TELEMETRY_TRANSFORM
AFTER ENRICHED.T_LOAD_VEHICLE_STATE
AS
INSERT INTO CURATED.COLD_CHAIN_ALERTS
SELECT VEHICLE_ID, EVENT_TS, TEMP_C, 8.00, CURRENT_TIMESTAMP()
FROM ENRICHED.VEHICLE_STATE
WHERE TEMP_C > 8.00
AND LOADED_AT > DATEADD('minute', -2, CURRENT_TIMESTAMP());
A Task is created suspended. Resume the chain from the leaf upward (children first, root last) so the dependency graph is fully armed before the scheduler can fire the root:
USE ROLE ACCOUNTADMIN; -- EXECUTE TASK privilege
ALTER TASK CURATED.T_DERIVE_COLD_CHAIN_ALERTS RESUME;
ALTER TASK ENRICHED.T_LOAD_VEHICLE_STATE RESUME;
To skip the dedicated warehouse entirely and let Snowflake size and bill the compute per run, drop WAREHOUSE = ... and instead set USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL' — that turns it into a serverless Task, which suits spiky telemetry where a fixed warehouse would idle.
Promotion of any new Task or schema change to production goes through a ServiceNow change request, and the SQL itself is applied by the OIDC-authenticated GitHub Actions pipeline — never hand-run in production by a person.
Validation
Verify each plane independently, then end-to-end latency.
-- (a) Streaming plane: are rows actually landing, and how fresh?
SELECT COUNT(*) AS rows_total,
MAX(LOADED_AT) AS last_load,
DATEDIFF('second', MAX(EVENT_TS), MAX(LOADED_AT)) AS ingest_lag_s
FROM TELEMETRY.RAW.TELEMETRY;
-- (b) Streaming billing/throughput (no warehouse involved)
SELECT * FROM TABLE(INFORMATION_SCHEMA.SNOWPIPE_STREAMING_FILE_MIGRATION_HISTORY(
DATE_RANGE_START => DATEADD('hour', -1, CURRENT_TIMESTAMP())));
-- (c) Transformation plane: did the tasks run, succeed, and consume the stream?
SELECT NAME, STATE, SCHEDULED_TIME, COMPLETED_TIME, ERROR_MESSAGE
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
SCHEDULED_TIME_RANGE_START => DATEADD('hour', -1, CURRENT_TIMESTAMP())))
ORDER BY SCHEDULED_TIME DESC;
-- (d) End-to-end: an event in raw should appear curated within ~1-2 minutes
SELECT (SELECT COUNT(*) FROM ENRICHED.VEHICLE_STATE) AS enriched_rows,
(SELECT COUNT(*) FROM CURATED.COLD_CHAIN_ALERTS) AS alert_rows;
From the source side, confirm the connector reports zero failed records and a low offset lag:
curl -s http://kafka-connect:8083/connectors/telemetry-snowpipe-streaming/status | jq '.tasks[].state'
# All "RUNNING". Then check consumer-group lag:
kafka-consumer-groups --bootstrap-server kafka:9092 --describe \
--group connect-telemetry-snowpipe-streaming
Datadog turns these into standing alerts: it scrapes the connector’s JMX metrics (record-error-rate, buffer flush latency) and queries TASK_HISTORY for any STATE = 'FAILED', paging if ingest lag crosses 30 seconds or a Task errors twice consecutively.
Rollback / teardown
Tear down child-first for Tasks (you cannot drop or suspend a root Task that has resumed dependents cleanly otherwise), then the Stream, then the objects. Suspending — not dropping — is the safe pause if you only want to stop processing temporarily.
-- Pause processing without losing the stream offset:
ALTER TASK ENRICHED.T_LOAD_VEHICLE_STATE SUSPEND;
ALTER TASK CURATED.T_DERIVE_COLD_CHAIN_ALERTS SUSPEND;
-- Full teardown (children first):
DROP TASK IF EXISTS CURATED.T_DERIVE_COLD_CHAIN_ALERTS;
DROP TASK IF EXISTS ENRICHED.T_LOAD_VEHICLE_STATE;
DROP STREAM IF EXISTS RAW.TELEMETRY_STREAM;
DROP TABLE IF EXISTS CURATED.COLD_CHAIN_ALERTS;
DROP TABLE IF EXISTS ENRICHED.VEHICLE_STATE;
DROP TABLE IF EXISTS RAW.TELEMETRY; -- destroys landed data; export first if needed
Stop and remove the connector, and revoke the service identity:
curl -s -X DELETE http://kafka-connect:8083/connectors/telemetry-snowpipe-streaming
DROP USER IF EXISTS SVC_TELEMETRY_INGEST;
DROP ROLE IF EXISTS ROLE_TELEMETRY_INGEST;
For the Terraform-managed objects, prefer terraform destroy (or remove the resources and apply) so state stays consistent, and finally vault kv delete secret/snowflake/telemetry-ingest to retire the key.
Common pitfalls
- A Stream goes stale and silently loses data. A Stream becomes unreadable once the table’s offset passes the data-retention window (
MAX_DATA_EXTENSION_TIME_IN_DAYS/DATA_RETENTION_TIME_IN_DAYS). If a Task is suspended for longer than retention, the Stream is stale and consuming it returns nothing. Keep the consuming Task running, and set retention on the raw table to comfortably exceed your worst-case Task outage. - The Task consumes the Stream even on a partial failure assumption. The offset advances only if the consuming transaction commits. If your
INSERTerrors, the Stream is not advanced and the rows are re-presented next run — good for exactly-once, but it means a poison row can stall the pipeline. Add a dead-letterWHEREfilter or aTRY_CASTso one malformed event cannot block the batch. buffer.flush.timeset too aggressively. Sub-second flushing maximises commit overhead and channel chatter. One second is the sweet spot for “near real time”; do not go lower without a measured reason.- Resuming Tasks root-first. If you resume the root before its children, the root can fire against a not-yet-armed dependency. Always resume leaf → root, suspend root → leaf.
- Forgetting
WHEN SYSTEM$STREAM_HAS_DATA. Without it, every scheduled tick resumes the warehouse and burns a credit even when there is nothing to do. With it, empty ticks cost nothing. - Schematization surprises.
snowflake.enable.schematization=trueauto-adds columns from the JSON payload; if upstream adds a field, your table grows columns. Pin the schema and disable schematization if you need strict control.
Security notes
Authentication is key-pair (JWT) only for the streaming client — there is no password on SVC_TELEMETRY_INGEST, and the private key lives in HashiCorp Vault, injected into the connector at runtime so it never sits in a Kafka Connect config file or a Git repo. Human access to Snowsight is Okta federated to Microsoft Entra ID with SCIM-driven role provisioning, so people and the service identity are fully separated and the ROLE_TELEMETRY_INGEST role holds nothing beyond INSERT on one table plus warehouse USAGE. Wiz continuously checks the Snowflake account posture (over-privileged roles, network-policy drift, public exposure) and the Connect host, while CrowdStrike Falcon provides runtime threat detection on that VM. Lock the account down further with a Snowflake network policy allowlisting the Connect worker’s egress IPs, and require the OIDC-authenticated GitHub Actions pipeline plus a ServiceNow change record for any production DDL.
Cost notes
There is no warehouse in the ingestion path — Snowpipe Streaming bills by throughput (a per-connection rate plus the data volume), which is why this beats classic Snowpipe’s per-file overhead for high-frequency small events. The only compute cost is the transform Tasks, and the WHEN SYSTEM$STREAM_HAS_DATA guard plus auto_suspend = 60 mean the XSMALL warehouse runs only when there is data and suspends in a minute — typically a few credits a day rather than a credit-hour for an always-on warehouse. For very spiky volume, serverless Tasks can be cheaper still because Snowflake right-sizes per run. Track it precisely in Datadog by pulling SNOWPIPE_STREAMING_* and TASK_HISTORY/WAREHOUSE_METERING_HISTORY into a cost dashboard, so the operations team that asked for the live map can see exactly what real-time visibility costs per day and tune buffer.flush.time and the Task schedule against that number.