Data Platform

Configure Snowpipe Streaming with Streams and Tasks for Near-Real-Time Ingestion

A logistics company runs 40,000 GPS-and-telemetry-emitting vehicles, and the operations desk wants a live map plus exception alerts (“this refrigerated trailer breached 8 °C twelve minutes ago”) instead of the hourly batch they have today. Telemetry arrives as a Kafka topic at roughly 18,000 events/second at peak. The data team’s mandate is to land every event in Snowflake within a couple of seconds, transform it into clean per-vehicle and per-alert tables continuously, and do it without standing up a Spark cluster or paying for an always-on warehouse that idles between bursts. This guide builds exactly that: Snowpipe Streaming for sub-second row-level ingestion into a raw table, then Streams and Tasks to incrementally transform that raw table into curated, queryable models — the serverless, no-files, near-real-time pattern Snowflake is built for.

The shape that makes this work: Snowpipe Streaming writes rows directly through a channel (no staged files, no per-file overhead, billed by throughput), a Stream is a change-tracking cursor over the raw table that exposes only new rows since you last consumed it, and a Task runs on a schedule or trigger to drain that Stream into the next layer. Chained together you get a continuous medallion pipeline (raw → enriched → curated) with offset-exactly-once semantics and second-scale latency.

Prerequisites

Target topology

Configure Snowpipe Streaming with Streams and Tasks for Near-Real-Time Ingestion — topology

Two planes share the warehouse but run on different clocks. The ingestion plane is push-driven and serverless: Kafka Connect opens a Snowpipe Streaming channel per topic-partition and streams rows straight into RAW.TELEMETRY with no compute warehouse involved — Snowflake bills the streaming throughput directly. The transformation plane is change-driven: a Stream on RAW.TELEMETRY tracks unconsumed rows, and a chain of Tasks (running on a small dedicated warehouse, or serverless) drains the Stream every minute into ENRICHED.VEHICLE_STATE and CURATED.COLD_CHAIN_ALERTS.

Around the data path sit the enterprise controls the platform team actually operates: Okta → Entra ID federates human SSO into Snowsight with SCIM-provisioned roles; HashiCorp Vault issues the connector’s private key and Snowflake login so no secret lands in a Kafka Connect config file; Terraform provisions every Snowflake object (database, schemas, roles, warehouse, tasks) declaratively; Datadog scrapes the Kafka Connector JMX metrics and Snowflake’s SNOWPIPE_STREAMING_* views for lag and cost dashboards; Wiz runs posture checks against the Snowflake account and the connector’s host; CrowdStrike Falcon protects the Kafka Connect VM at runtime; GitHub Actions applies the Terraform and dbt-style SQL through an OIDC-authenticated pipeline; and ServiceNow is the change gate before a new Task or schema goes live in production.

1. Provision the Snowflake objects with Terraform

Create the database, schemas, a dedicated transform warehouse, and a least-privilege role. Keep this in version control; the GitHub Actions pipeline applies it.

# main.tf — Snowflake-Labs/snowflake provider
terraform {
  required_providers {
    snowflake = { source = "Snowflake-Labs/snowflake", version = "~> 0.95" }
  }
}

resource "snowflake_database" "telemetry" {
  name = "TELEMETRY"
}

resource "snowflake_schema" "raw"      { database = snowflake_database.telemetry.name, name = "RAW" }
resource "snowflake_schema" "enriched" { database = snowflake_database.telemetry.name, name = "ENRICHED" }
resource "snowflake_schema" "curated"  { database = snowflake_database.telemetry.name, name = "CURATED" }

# A small, dedicated warehouse for the transform Tasks (separate from BI/ad-hoc)
resource "snowflake_warehouse" "transform" {
  name                = "WH_TELEMETRY_TRANSFORM"
  warehouse_size      = "XSMALL"
  auto_suspend        = 60      # seconds; suspend fast between task runs
  auto_resume         = true
  initially_suspended = true
}

# Role the Kafka connector and the tasks run as
resource "snowflake_role" "ingest" { name = "ROLE_TELEMETRY_INGEST" }

resource "snowflake_grant_privileges_to_account_role" "db_usage" {
  account_role_name = snowflake_role.ingest.name
  privileges        = ["USAGE"]
  on_account_object { object_type = "DATABASE", object_name = snowflake_database.telemetry.name }
}

resource "snowflake_grant_privileges_to_account_role" "wh_usage" {
  account_role_name = snowflake_role.ingest.name
  privileges        = ["USAGE", "OPERATE"]
  on_account_object { object_type = "WAREHOUSE", object_name = snowflake_warehouse.transform.name }
}

Apply it:

terraform init
terraform plan  -out=telemetry.plan
terraform apply telemetry.plan

The auto_suspend = 60 matters: the transform warehouse should resume only when a Task fires and suspend the instant it idles, so you pay for seconds of compute per minute, not a running warehouse.

2. Create the raw landing table and the ingest user

Snowpipe Streaming writes into an ordinary table — you do not pre-create a “stream object” to write to. Define the raw table with the columns your events carry plus an ingest timestamp, then create the service user the connector authenticates as.

USE ROLE SYSADMIN;
USE DATABASE TELEMETRY;
USE SCHEMA RAW;

CREATE OR REPLACE TABLE RAW.TELEMETRY (
  RECORD_METADATA  VARIANT,            -- Kafka connector writes offset/partition/topic here
  VEHICLE_ID       STRING,
  EVENT_TS         TIMESTAMP_NTZ,
  LAT              FLOAT,
  LON              FLOAT,
  SPEED_KMH        NUMBER(6,2),
  TEMP_C           NUMBER(5,2),
  RAW_PAYLOAD      VARIANT,
  LOADED_AT        TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);

Create the user with key-pair auth. Generate the key first (the private key goes into HashiCorp Vault, never to disk on the connector host long-term):

# Generate an encrypted RSA private key + public key
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
# Store the private key in Vault; the connector pulls it at startup
vault kv put secret/snowflake/telemetry-ingest \
  private_key=@rsa_key.p8 \
  account="kv-org.eu-west-1" user="SVC_TELEMETRY_INGEST"

Register the public key on the Snowflake user:

USE ROLE ACCOUNTADMIN;
CREATE USER SVC_TELEMETRY_INGEST
  DEFAULT_ROLE = ROLE_TELEMETRY_INGEST
  RSA_PUBLIC_KEY = 'MIIBIjANBgkqh...<contents of rsa_key.pub, no header/footer>...'
  COMMENT = 'Snowpipe Streaming service account (key-pair auth, Vault-held key)';

GRANT ROLE ROLE_TELEMETRY_INGEST TO USER SVC_TELEMETRY_INGEST;

-- The ingest role needs INSERT on the landing table
GRANT USAGE ON SCHEMA TELEMETRY.RAW TO ROLE ROLE_TELEMETRY_INGEST;
GRANT INSERT ON TABLE TELEMETRY.RAW.TELEMETRY TO ROLE ROLE_TELEMETRY_INGEST;

Human access is a separate path: analysts reach Snowsight through Okta federated to Entra ID (SAML SSO with SCIM provisioning), so no person ever shares the SVC_TELEMETRY_INGEST credential.

3. Deploy the Kafka Connector in Snowpipe Streaming mode

The Snowflake Kafka Connector, told to use the streaming ingestion method, opens one Snowpipe Streaming channel per topic-partition and pushes rows continuously. The connector config pulls its private key from Vault at deploy time rather than hardcoding it.

{
  "name": "telemetry-snowpipe-streaming",
  "config": {
    "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "tasks.max": "8",
    "topics": "vehicle.telemetry",
    "snowflake.url.name": "kv-org.eu-west-1.snowflakecomputing.com:443",
    "snowflake.user.name": "SVC_TELEMETRY_INGEST",
    "snowflake.role.name": "ROLE_TELEMETRY_INGEST",
    "snowflake.private.key": "${vault:secret/snowflake/telemetry-ingest:private_key}",

    "snowflake.ingestion.method": "SNOWPIPE_STREAMING",
    "snowflake.streaming.enable.single.buffer": "true",
    "snowflake.enable.schematization": "true",

    "snowflake.database.name": "TELEMETRY",
    "snowflake.schema.name": "RAW",
    "snowflake.topic2table.map": "vehicle.telemetry:TELEMETRY",

    "buffer.flush.time": "1",
    "buffer.count.records": "10000",
    "buffer.size.bytes": "20000000",

    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false"
  }
}

Deploy it to the Connect worker:

curl -s -X PUT -H "Content-Type: application/json" \
  --data @telemetry-snowpipe-streaming.json \
  http://kafka-connect:8083/connectors/telemetry-snowpipe-streaming/config | jq .

# Confirm it is RUNNING and tasks are healthy
curl -s http://kafka-connect:8083/connectors/telemetry-snowpipe-streaming/status | jq .

The three knobs that control latency-vs-cost: buffer.flush.time = 1 second is the dominant lever (lower latency, more frequent commits), while buffer.count.records and buffer.size.bytes cap the buffer so a quiet partition still flushes on the timer. With tasks.max=8 over an 8-partition topic, you get eight parallel channels. The CrowdStrike Falcon sensor on the Connect VM and Wiz posture scanning of the host run independently of this config.

4. Create the Stream on the raw table

A Stream is a change-tracking object: it records an offset into the table and, when queried, returns only the rows inserted (or changed) since that offset was last advanced. For an append-only landing table, an append-only stream is cheaper and simpler than a standard one because it ignores deletes/updates and never needs to compute them.

USE ROLE SYSADMIN;
USE SCHEMA TELEMETRY.RAW;

CREATE OR REPLACE STREAM RAW.TELEMETRY_STREAM
  ON TABLE RAW.TELEMETRY
  APPEND_ONLY = TRUE
  COMMENT = 'CDC cursor feeding the transform tasks';

-- Inspect what the stream currently exposes (does NOT advance the offset)
SELECT SYSTEM$STREAM_HAS_DATA('RAW.TELEMETRY_STREAM');
SELECT VEHICLE_ID, EVENT_TS, TEMP_C, METADATA$ACTION
FROM   RAW.TELEMETRY_STREAM
LIMIT 20;

Key behavior to internalise: the Stream’s offset only advances when it is consumed inside a DML statement that succeeds in a transaction — which is exactly what a Task’s INSERT ... FROM stream does. Merely SELECT-ing the stream (as above) shows you the rows but does not move the cursor, so it is safe to inspect.

5. Build the transform Tasks and chain them

Now the transformation plane. Task 1 drains the Stream into the enriched per-vehicle table; Task 2 runs after Task 1 (a dependency, not its own schedule) to derive cold-chain alerts. Use WHEN SYSTEM$STREAM_HAS_DATA(...) so a Task that finds an empty stream skips its run and does not consume a credit — the single most important cost control in this whole design.

USE ROLE SYSADMIN;
USE SCHEMA TELEMETRY.ENRICHED;

-- Targets
CREATE OR REPLACE TABLE ENRICHED.VEHICLE_STATE (
  VEHICLE_ID STRING, EVENT_TS TIMESTAMP_NTZ, LAT FLOAT, LON FLOAT,
  SPEED_KMH NUMBER(6,2), TEMP_C NUMBER(5,2), LOADED_AT TIMESTAMP_LTZ
);

CREATE OR REPLACE TABLE CURATED.COLD_CHAIN_ALERTS (
  VEHICLE_ID STRING, EVENT_TS TIMESTAMP_NTZ, TEMP_C NUMBER(5,2),
  THRESHOLD_C NUMBER(5,2), DETECTED_AT TIMESTAMP_LTZ
);

-- Task 1: raw stream -> enriched, runs every minute
CREATE OR REPLACE TASK ENRICHED.T_LOAD_VEHICLE_STATE
  WAREHOUSE = WH_TELEMETRY_TRANSFORM
  SCHEDULE  = '1 MINUTE'
  WHEN SYSTEM$STREAM_HAS_DATA('TELEMETRY.RAW.TELEMETRY_STREAM')
AS
  INSERT INTO ENRICHED.VEHICLE_STATE
  SELECT VEHICLE_ID, EVENT_TS, LAT, LON, SPEED_KMH, TEMP_C, CURRENT_TIMESTAMP()
  FROM   TELEMETRY.RAW.TELEMETRY_STREAM
  WHERE  METADATA$ACTION = 'INSERT';

-- Task 2: depends on Task 1 (no schedule of its own), derives alerts
CREATE OR REPLACE TASK CURATED.T_DERIVE_COLD_CHAIN_ALERTS
  WAREHOUSE = WH_TELEMETRY_TRANSFORM
  AFTER ENRICHED.T_LOAD_VEHICLE_STATE
AS
  INSERT INTO CURATED.COLD_CHAIN_ALERTS
  SELECT VEHICLE_ID, EVENT_TS, TEMP_C, 8.00, CURRENT_TIMESTAMP()
  FROM   ENRICHED.VEHICLE_STATE
  WHERE  TEMP_C > 8.00
    AND  LOADED_AT > DATEADD('minute', -2, CURRENT_TIMESTAMP());

A Task is created suspended. Resume the chain from the leaf upward (children first, root last) so the dependency graph is fully armed before the scheduler can fire the root:

USE ROLE ACCOUNTADMIN;  -- EXECUTE TASK privilege
ALTER TASK CURATED.T_DERIVE_COLD_CHAIN_ALERTS RESUME;
ALTER TASK ENRICHED.T_LOAD_VEHICLE_STATE      RESUME;

To skip the dedicated warehouse entirely and let Snowflake size and bill the compute per run, drop WAREHOUSE = ... and instead set USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'XSMALL' — that turns it into a serverless Task, which suits spiky telemetry where a fixed warehouse would idle.

Promotion of any new Task or schema change to production goes through a ServiceNow change request, and the SQL itself is applied by the OIDC-authenticated GitHub Actions pipeline — never hand-run in production by a person.

Validation

Verify each plane independently, then end-to-end latency.

-- (a) Streaming plane: are rows actually landing, and how fresh?
SELECT COUNT(*) AS rows_total,
       MAX(LOADED_AT) AS last_load,
       DATEDIFF('second', MAX(EVENT_TS), MAX(LOADED_AT)) AS ingest_lag_s
FROM TELEMETRY.RAW.TELEMETRY;

-- (b) Streaming billing/throughput (no warehouse involved)
SELECT * FROM TABLE(INFORMATION_SCHEMA.SNOWPIPE_STREAMING_FILE_MIGRATION_HISTORY(
  DATE_RANGE_START => DATEADD('hour', -1, CURRENT_TIMESTAMP())));

-- (c) Transformation plane: did the tasks run, succeed, and consume the stream?
SELECT NAME, STATE, SCHEDULED_TIME, COMPLETED_TIME, ERROR_MESSAGE
FROM   TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
         SCHEDULED_TIME_RANGE_START => DATEADD('hour', -1, CURRENT_TIMESTAMP())))
ORDER  BY SCHEDULED_TIME DESC;

-- (d) End-to-end: an event in raw should appear curated within ~1-2 minutes
SELECT (SELECT COUNT(*) FROM ENRICHED.VEHICLE_STATE)        AS enriched_rows,
       (SELECT COUNT(*) FROM CURATED.COLD_CHAIN_ALERTS)     AS alert_rows;

From the source side, confirm the connector reports zero failed records and a low offset lag:

curl -s http://kafka-connect:8083/connectors/telemetry-snowpipe-streaming/status | jq '.tasks[].state'
# All "RUNNING". Then check consumer-group lag:
kafka-consumer-groups --bootstrap-server kafka:9092 --describe \
  --group connect-telemetry-snowpipe-streaming

Datadog turns these into standing alerts: it scrapes the connector’s JMX metrics (record-error-rate, buffer flush latency) and queries TASK_HISTORY for any STATE = 'FAILED', paging if ingest lag crosses 30 seconds or a Task errors twice consecutively.

Rollback / teardown

Tear down child-first for Tasks (you cannot drop or suspend a root Task that has resumed dependents cleanly otherwise), then the Stream, then the objects. Suspending — not dropping — is the safe pause if you only want to stop processing temporarily.

-- Pause processing without losing the stream offset:
ALTER TASK ENRICHED.T_LOAD_VEHICLE_STATE      SUSPEND;
ALTER TASK CURATED.T_DERIVE_COLD_CHAIN_ALERTS SUSPEND;

-- Full teardown (children first):
DROP TASK   IF EXISTS CURATED.T_DERIVE_COLD_CHAIN_ALERTS;
DROP TASK   IF EXISTS ENRICHED.T_LOAD_VEHICLE_STATE;
DROP STREAM IF EXISTS RAW.TELEMETRY_STREAM;
DROP TABLE  IF EXISTS CURATED.COLD_CHAIN_ALERTS;
DROP TABLE  IF EXISTS ENRICHED.VEHICLE_STATE;
DROP TABLE  IF EXISTS RAW.TELEMETRY;        -- destroys landed data; export first if needed

Stop and remove the connector, and revoke the service identity:

curl -s -X DELETE http://kafka-connect:8083/connectors/telemetry-snowpipe-streaming
DROP USER IF EXISTS SVC_TELEMETRY_INGEST;
DROP ROLE IF EXISTS ROLE_TELEMETRY_INGEST;

For the Terraform-managed objects, prefer terraform destroy (or remove the resources and apply) so state stays consistent, and finally vault kv delete secret/snowflake/telemetry-ingest to retire the key.

Common pitfalls

Security notes

Authentication is key-pair (JWT) only for the streaming client — there is no password on SVC_TELEMETRY_INGEST, and the private key lives in HashiCorp Vault, injected into the connector at runtime so it never sits in a Kafka Connect config file or a Git repo. Human access to Snowsight is Okta federated to Microsoft Entra ID with SCIM-driven role provisioning, so people and the service identity are fully separated and the ROLE_TELEMETRY_INGEST role holds nothing beyond INSERT on one table plus warehouse USAGE. Wiz continuously checks the Snowflake account posture (over-privileged roles, network-policy drift, public exposure) and the Connect host, while CrowdStrike Falcon provides runtime threat detection on that VM. Lock the account down further with a Snowflake network policy allowlisting the Connect worker’s egress IPs, and require the OIDC-authenticated GitHub Actions pipeline plus a ServiceNow change record for any production DDL.

Cost notes

There is no warehouse in the ingestion path — Snowpipe Streaming bills by throughput (a per-connection rate plus the data volume), which is why this beats classic Snowpipe’s per-file overhead for high-frequency small events. The only compute cost is the transform Tasks, and the WHEN SYSTEM$STREAM_HAS_DATA guard plus auto_suspend = 60 mean the XSMALL warehouse runs only when there is data and suspends in a minute — typically a few credits a day rather than a credit-hour for an always-on warehouse. For very spiky volume, serverless Tasks can be cheaper still because Snowflake right-sizes per run. Track it precisely in Datadog by pulling SNOWPIPE_STREAMING_* and TASK_HISTORY/WAREHOUSE_METERING_HISTORY into a cost dashboard, so the operations team that asked for the live map can see exactly what real-time visibility costs per day and tune buffer.flush.time and the Task schedule against that number.

SnowflakeSnowpipe StreamingStreams and TasksData EngineeringStreamingCDC
Need this built for real?

Vinod is a Senior Cloud Architect (22+ yrs) — available for Azure / AWS / GCP architecture, landing zones, and migrations.

Work with me

Comments

Keep Reading