Architecture Multi-Cloud

Designing a Lakehouse with Medallion Architecture and Unified Streaming-Batch Ingestion

Every data platform I have inherited eventually grew two pipelines for the same data: a batch path that loaded the warehouse overnight and a streaming path bolted on later when someone needed fresher numbers. They drifted. The batch job applied a dedup rule the stream did not, a late-arriving correction landed in one and not the other, and by Tuesday the dashboard and the warehouse disagreed by a number large enough to start a meeting. The lakehouse exists to collapse that duplication: one set of tables, one transformation graph, fed by both streaming and batch through the same code, with ACID guarantees so a reader never sees a half-written partition.

This article builds that architecture concretely on open table formats over object storage. The running assumption is that you have raw data arriving – some as files dropped in a bucket, some as a Kafka or Event Hubs stream – and you need a governed, queryable, correct set of tables out the other end. I use Delta Lake for the worked examples because its tooling is the most complete, but the patterns – medallion layering, idempotent ingestion, MERGE-based upserts, change feeds – apply equally to Apache Iceberg and Hudi.

What makes it a lakehouse: open table formats over object storage

Strip away the marketing and a lakehouse is one idea: put a transactional metadata layer on top of columnar files in cheap object storage, so the storage behaves like a database table instead of a directory of Parquet.

Plain Parquet in a bucket has no notion of a transaction. A reader scanning s3://lake/orders/ while a writer is replacing files sees a partial result – some new files, some deleted, no atomic cutover. Open table formats fix this by maintaining a transaction log alongside the data:

The load-bearing property all three give you is snapshot isolation via optimistic concurrency. Writers stage new files, then attempt to commit a new log/metadata version; if another writer committed first, the commit is retried or fails cleanly. Readers pin a snapshot and never observe an in-flight write. This is what lets streaming and batch hit the same table without corrupting each other – and it is exactly what a directory of Parquet cannot offer.

Everything below depends on this guarantee. Without atomic commits, “MERGE into silver” and “stream into bronze” would be a race; with it, they are ordinary transactions.

Step 1 – Lay out the medallion: bronze, silver, gold

The medallion architecture assigns each table to one of three layers with a single, defensible responsibility. The discipline is what keeps the platform debuggable two years in.

Layer Responsibility Schema Mutability
Bronze Raw ingest, append-only, faithful to source As-received + ingest metadata Append-only
Silver Cleaned, conformed, deduplicated, typed Enforced, validated Upserts, SCD
Gold Business aggregates, serving, dimensional models Curated for consumers Recomputed / merged

Bronze is the immutable landing zone. You write source data exactly as it arrived plus provenance columns – ingest timestamp, source file or offset, a batch id. The cardinal rule: never clean in bronze. If a downstream rule turns out wrong, bronze is your replay source of truth, and you can rebuild silver and gold from it without re-reading the source system.

-- Bronze: append-only, raw payload preserved, provenance attached
CREATE TABLE IF NOT EXISTS lake.bronze.orders (
  raw_payload   STRING,
  source_file   STRING,
  ingest_ts     TIMESTAMP,
  batch_id      STRING
) USING DELTA
LOCATION 's3://lake/bronze/orders'
TBLPROPERTIES (delta.appendOnly = 'true');

Silver is where data becomes trustworthy: parsed from the raw payload, typed, deduplicated, and conformed to a stable schema. This is the layer most analytics joins against. Silver tables are mutable – upserts apply corrections and late-arriving updates, and dimensions track history with SCD logic.

Gold is purpose-built for consumption: pre-aggregated metrics, star-schema facts and dimensions, or feature tables. Gold is recomputed or incrementally merged from silver and is the only layer most BI tools and ML pipelines should touch. Keep gold thin and numerous – one table per consumption use case is healthier than one wide table everyone fights over.

Step 2 – Idempotent ingestion with the unified streaming-plus-batch pattern

The whole point is one transformation, fed two ways. Spark Structured Streaming makes this real because a streaming query and a batch query share the same DataFrame API – the only difference is read versus readStream. Write the logic once and run it as a continuous stream for freshness or as a scheduled Trigger.AvailableNow batch for cost.

For file ingestion, Databricks Auto Loader (cloudFiles) incrementally discovers new files and checkpoints which it has processed, so re-running never double-ingests:

from pyspark.sql import functions as F

bronze_stream = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", "s3://lake/_schemas/orders")
    .load("s3://landing/orders/")
    .withColumn("source_file", F.col("_metadata.file_path"))
    .withColumn("ingest_ts", F.current_timestamp())
)

(bronze_stream.writeStream
    .format("delta")
    .option("checkpointLocation", "s3://lake/_checkpoints/bronze_orders")
    .trigger(availableNow=True)   # drain all new files, then stop (batch economics)
    .toTable("lake.bronze.orders"))

The two idempotency mechanisms doing the work:

# Idempotent batch write: re-running with the same (appId, version) is a no-op
(df.write.format("delta").mode("append")
   .option("txnAppId", "orders_loader")
   .option("txnVersion", batch_version)   # monotonic, derived from source watermark
   .saveAsTable("lake.bronze.orders"))

Flip from streaming to batch by changing one thing: the trigger. trigger(processingTime="30 seconds") for near-real-time, trigger(availableNow=True) for cheap micro-batch on a schedule. The transformation code, the target table, and the checkpoint are identical. That single-source-of-truth property is the entire reason the lakehouse beats the lambda architecture’s twin pipelines – you can no longer have a streaming bug that the batch path silently masks.

Step 3 – ACID upserts and slowly changing dimensions with MERGE

Bronze is append-only; silver needs upserts. MERGE INTO is the ACID primitive that makes this correct under concurrency. It matches incoming rows against the target on a key and inserts, updates, or deletes atomically – the whole MERGE is one transaction, so readers see all of it or none.

MERGE INTO lake.silver.orders AS t
USING staged_orders AS s
ON t.order_id = s.order_id
WHEN MATCHED AND s.op = 'delete' THEN DELETE
WHEN MATCHED AND s.updated_ts > t.updated_ts THEN UPDATE SET *
WHEN NOT MATCHED AND s.op <> 'delete' THEN INSERT *;

Two details that separate a correct MERGE from a corrupting one:

  1. Deduplicate the source first. MERGE raises an error if more than one source row matches the same target key, and rightly so – which update wins would be undefined. Collapse to one row per key before merging, keeping the latest by event time:
from pyspark.sql import Window

w = Window.partitionBy("order_id").orderBy(F.col("updated_ts").desc())
staged = (changes
    .withColumn("rn", F.row_number().over(w))
    .filter("rn = 1").drop("rn"))
staged.createOrReplaceTempView("staged_orders")
  1. Guard the UPDATE with an event-time predicate (s.updated_ts > t.updated_ts above). Without it, a late-arriving old version of a row overwrites a newer one. This is the single most common silent corruption I find in lakehouse silver layers.

For dimensions that must retain history, implement SCD Type 2: instead of overwriting, close the current row and insert a new version. The idiom is a MERGE that expires the old record and a follow-on insert of the new one, tracked with is_current, effective_from, and effective_to:

-- SCD2: expire the changed current row; the new version is inserted separately
MERGE INTO lake.silver.dim_customer AS t
USING customer_changes AS s
ON t.customer_id = s.customer_id AND t.is_current = true
WHEN MATCHED AND t.hash <> s.hash THEN
  UPDATE SET t.is_current = false, t.effective_to = s.change_ts;

A row hash over the tracked attributes lets you ignore no-op updates cheaply – if t.hash = s.hash, nothing changed and you skip the version churn.

Step 4 – Schema enforcement, evolution, and quality expectations

Open table formats enforce schema on write by default: a write whose columns or types do not match the table is rejected, not silently coerced. That rejection is a feature – it stops a malformed upstream change from poisoning the table. When a schema should change, you opt in explicitly:

# Allow additive schema evolution for this write (new columns only, by default)
(df.write.format("delta").mode("append")
   .option("mergeSchema", "true")
   .saveAsTable("lake.bronze.orders"))

Set mergeSchema (or spark.databricks.delta.schema.autoMerge.enabled for MERGE) only on bronze ingestion where you want to absorb new source fields. Keep silver and gold under strict enforcement so a schema drift surfaces as a controlled failure you triage, not a column that quietly appears in a serving table.

Enforcement guarantees structure, not correctness. For that, attach data-quality expectations that quarantine or drop bad rows. In Delta Live Tables (Lakeflow Declarative Pipelines), expectations are declarative constraints with an explicit failure action:

import dlt

@dlt.table(name="silver_orders")
@dlt.expect_or_drop("valid_amount", "amount > 0")
@dlt.expect_or_fail("has_order_id", "order_id IS NOT NULL")
@dlt.expect("recent", "order_ts > '2020-01-01'")
def silver_orders():
    return dlt.read_stream("bronze_orders").transform(parse_and_type)

The three actions map to three policies: expect warns and records a metric, expect_or_drop removes the offending rows, expect_or_fail halts the pipeline. A pragmatic default is drop-and-quarantine – route failing rows to a _rejected table so you can inspect them rather than lose them.

Treat schema and quality as a contract enforced at the layer boundary, not a wish. The cheapest place to stop bad data is the bronze-to-silver hop, before it fans out into every gold table and every dashboard built on them. A row dropped at silver costs one alert; the same row discovered in a board-level metric costs a quarter of trust.

Step 5 – Partitioning, clustering, compaction, and file size

Object storage rewards few large files and punishes many small ones. Streaming ingestion produces the opposite – a flood of tiny files, one or more per micro-batch – so file management is not optional housekeeping; it is what keeps queries fast.

Partition coarsely. Partition by a low-cardinality column you filter on, typically a date. Over-partitioning (for example by customer_id) creates millions of tiny directories and destroys performance. A rule of thumb: only partition a column if each partition value holds at least ~1 GB of data.

Cluster for selectivity within partitions. For high-cardinality predicates, use data skipping. Delta’s OPTIMIZE ... ZORDER BY co-locates related values so the engine skips files that cannot match. Newer Delta supports liquid clustering, which adapts clustering keys without rewriting your partition layout and handles skew better:

-- Liquid clustering: declarative, evolvable, no physical repartition lock-in
ALTER TABLE lake.silver.orders CLUSTER BY (customer_id, order_date);
OPTIMIZE lake.silver.orders;

-- Classic alternative on partitioned tables:
OPTIMIZE lake.silver.orders
  WHERE order_date >= '2026-06-01'
  ZORDER BY (customer_id);

Compact small files. OPTIMIZE bin-packs small files into larger ones (Delta targets ~1 GB by default; tune with delta.targetFileSize). Schedule it on hot partitions. Then reclaim storage from the now-unreferenced old files with VACUUM – but mind the retention window, because it is also what time travel depends on:

-- Reclaim files older than the retention threshold (default 7 days).
-- Do NOT shorten below your time-travel / CDF retention needs.
VACUUM lake.silver.orders RETAIN 168 HOURS;

On Databricks, predictive optimization can run OPTIMIZE and VACUUM automatically; if you are self-managing on open-source Delta or Iceberg, make compaction a first-class scheduled job. An un-compacted streaming table degrades quietly until a query that was sub-second is suddenly scanning a hundred thousand files.

Step 6 – Time travel, incremental processing, and the change data feed

Because every change is a versioned commit, you get time travel for free – query the table as of a version or timestamp:

SELECT * FROM lake.silver.orders VERSION AS OF 42;
SELECT * FROM lake.silver.orders TIMESTAMP AS OF '2026-06-07T00:00:00Z';

This is operationally priceless: reproduce yesterday’s report exactly, diff two versions to find what a bad job changed, or RESTORE a table to a pre-incident version after a faulty MERGE.

For incremental processing, you do not want to rescan all of silver to update gold. Enable the Change Data Feed (CDF) and consume only the rows that changed, with their change type:

ALTER TABLE lake.silver.orders SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
# Read only what changed since version 42, with row-level change metadata
changes = (spark.read.format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", 42)
    .table("lake.silver.orders"))
# Yields _change_type in {insert, update_preimage, update_postimage, delete}

This is the engine of efficient gold materialization: stream silver’s CDF, filter to update_postimage and insert, and MERGE those deltas into gold aggregates instead of recomputing from scratch. Iceberg offers the analogous capability via incremental reads between snapshots. The pattern – bronze appends, silver MERGEs, gold consumes CDF – is what turns the medallion into an incrementally-maintained graph rather than a nightly full rebuild.

Step 7 – Governance, lineage, and cost across the layers

A lakehouse without governance is a data swamp with better file formats. Centralize three controls:

Verify

Confirm the pipeline behaves under the failure modes that actually occur, not just the happy path.

Enterprise scenario

A retail analytics platform ran a classic lambda split: a nightly Spark batch loaded the warehouse from S3, and a separately-maintained Flink job pushed “live” order counts to a real-time dashboard. The constraint that broke them was returns. A return event arrived hours after the original order, and the two pipelines handled it differently – the batch job’s dedup keyed on (order_id, event_type) and kept both, while the stream’s keyed on order_id and overwrote. By morning the dashboard’s revenue and the warehouse’s revenue diverged by enough to be escalated, every single day. Reconciliation was a standing 8 a.m. firefight.

They collapsed it onto a Delta lakehouse with one transformation graph. Raw order and return events landed append-only in bronze via Auto Loader. A single silver transformation – run as a 1-minute stream for the dashboard and re-used verbatim as an AvailableNow batch for the warehouse load – applied one dedup rule and one MERGE, with an event-time guard so a late return could not be overwritten by a stale order row. Gold revenue tables were maintained incrementally off silver’s change data feed.

-- One MERGE, one truth: event-time guard makes late returns safe.
MERGE INTO silver.orders AS t
USING staged AS s
ON t.order_id = s.order_id
WHEN MATCHED AND s.event_ts > t.event_ts THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

The divergence went to zero because there was no longer a second code path to diverge from – batch and streaming were literally the same logic over the same ACID table. The 8 a.m. reconciliation meeting was deleted from the calendar. Storage cost actually fell once they lifecycle-expired bronze after 30 days, since it was fully replayable from the source bucket if they ever needed to rebuild.

Checklist

data-architecturelakehousemedallionstreamingdata-engineering

Comments

Keep Reading