Data Multi-cloud

Stateful Streaming Analytics with Flink (exactly-once)

There is a class of problem where “we’ll batch it overnight” is not an acceptable answer, and most of them involve money moving in real time. A card authorization has roughly 200 milliseconds before the issuer must say yes or no. A fraud ring drains an account in the ninety seconds it takes a nightly job to even start. A regulator asks why a flagged transaction settled, and “our scoring pipeline runs at 2 a.m.” is the kind of sentence that ends with a fine. Stream processing exists for this gap — but the moment your stream needs to remember anything (a running balance, a velocity counter, a 30-day rolling aggregate), you have crossed from trivial stateless filtering into the genuinely hard territory of stateful streaming, where the central question becomes: when a machine dies mid-flight, did each event count exactly once, or did it count twice, or not at all? This article is a reference architecture for getting that right with Apache Flink, the engine that treats large, durable state and exactly-once semantics as first-class concerns rather than bolt-ons.

The business scenario

The recurring driver is a financial institution that has to make a stateful decision faster than a human or a batch job ever could, and then prove it was correct. Picture a mid-size payments processor — call them a card acquirer and a digital-wallet operator rolled into one — handling somewhere north of 12,000 transactions per second at peak, across millions of cardholders. Three pressures land on the same platform team at once.

Latency. Real-time fraud scoring must complete inside the authorization window. If the model needs the cardholder’s spending velocity over the last hour, the count of distinct merchants in the last ten minutes, and whether this device has been seen before, those features have to be computed as the events arrive, not looked up from a table that is an hour stale.

Correctness under failure. A fraud feature like “transactions in the last 60 seconds” is a counter. If a node crashes and the framework replays the last few seconds of input, a naïve system double-counts, the velocity feature spikes artificially, and you decline a legitimate customer at the worst possible moment — checkout. Conversely, if events are lost on recovery, a real attack slips under the threshold. Neither at-least-once nor at-most-once is good enough; you need exactly-once effect on the state.

Regulation and auditability. AML (anti-money-laundering) and card-scheme rules require that the institution can reconstruct why a decision was made and demonstrate that the same input always produces the same output. A pipeline that occasionally double-counts is not just a bug; it is an audit finding.

The naïve fixes fail predictably. Nightly batch misses the latency requirement entirely. A microservice with an in-memory counter loses all its state when the pod restarts and cannot scale its state beyond one machine’s RAM. At-least-once stream processing (the default for many systems) double-counts on replay, which silently corrupts every aggregate feature. Flink threads this needle because it was built around exactly the property these workloads need: large keyed state that survives failure, checkpointed consistently, with end-to-end exactly-once delivery into downstream systems.

The scenario scales cleanly. A small deployment scores one card program at a few hundred TPS on a handful of task slots. A large one runs dozens of pipelines — fraud, AML, real-time ledger, dispute detection — across thousands of slots and terabytes of state, with the same architectural shape. What changes is parallelism, state-backend sizing, and how many checkpoints per minute you can afford, not the diagram.

Architecture overview

Stateful Streaming Analytics with Flink (exactly-once) — architecture

The end-to-end design has a data plane (events flowing through the Flink job) and a control plane (Flink’s coordination: checkpoints, savepoints, recovery), and the discipline of keeping them mentally separate is the first step to operating this well.

Data path, following the flow: (1) Authorization events, device telemetry, and account changes are published to Apache Kafka (managed as Confluent Cloud, or Amazon MSK / Azure Event Hubs with the Kafka surface) — Kafka is the durable, replayable log that makes exactly-once possible, because Flink can rewind to a known offset on recovery. (2) The Flink job consumes via the KafkaSource connector, which tracks consumed offsets inside Flink’s own checkpoint rather than committing them back to Kafka eagerly. (3) Events are keyed by card / account, routing every event for a given key to the same parallel sub-task, where the operator maintains keyed state — the velocity counters, the windowed merchant sets, the last-seen-device map. (4) Windowed and stateful operators compute features; the enriched event, plus a fraud score from an embedded model or a call out to a model-serving endpoint, flows downstream. (5) Results land in exactly-once sinks: scored decisions back into a Kafka topic (consumed by the authorization service), feature snapshots into a JDBC-backed store or Apache Iceberg table on object storage for the feature store and for audit, and alerts into a notification path. (6) The authorization microservice reads the decision topic and responds to the card network inside the latency window.

Control path runs continuously and is what makes the data path trustworthy. The JobManager is the coordinator; TaskManagers run the actual operators across task slots. On a fixed interval the JobManager’s checkpoint coordinator injects a barrier into the source streams; as the barrier flows through the dataflow graph, each operator snapshots its state to durable storage. The state itself lives in a state backend — for anything beyond toy sizes, the embedded RocksDB backend, which keeps state on local disk (so it can exceed RAM) and ships snapshots to durable object storage. Checkpoints are automatic and for failure recovery; savepoints are operator-triggered, portable snapshots used for upgrades, migrations, and A/B rollbacks. Everything that follows hangs off those two mechanisms.

Component breakdown

Component Technology Role in the platform Key configuration choices
Event log Kafka (Confluent Cloud / MSK / Event Hubs) Durable, replayable source and sink; the backbone of exactly-once transaction.timeout.ms ≥ checkpoint interval + max recovery; partition by card key; tiered storage for replay depth
Stream engine Apache Flink (1.18+) Stateful keyed processing, event-time windows, checkpoint coordination Exactly-once checkpointing; aligned or unaligned barriers; restart strategy with backoff
State backend EmbeddedRocksDB Large keyed state on local disk, incremental snapshots to object store Incremental checkpoints on; managed memory tuned; SSD-backed local dirs
Checkpoint store S3 / ADLS / GCS Durable home for checkpoints and savepoints Versioning + lifecycle; separate prefix for savepoints (retained) vs checkpoints (auto-cleaned)
Source connector KafkaSource Offset tracking inside checkpoints, not eager Kafka commits OffsetsInitializer, bounded-or-continuous, per-partition watermarks
Sink connector (stream) KafkaSink (exactly-once) Transactional writes to downstream topics via Kafka transactions DeliveryGuarantee.EXACTLY_ONCE; unique transactionalIdPrefix per job
Sink connector (table) Iceberg / JDBC (2PC) Audit + feature snapshots, idempotent or transactional Iceberg commit on checkpoint; JDBC with XA two-phase or upsert-by-key
Orchestration Flink Kubernetes Operator Deploys/upgrades jobs, manages savepoints on K8s FlinkDeployment CRD; savepoint upgrade mode; HA via Kubernetes
Identity Entra ID / Okta + Vault SSO to the Flink UI/REST; short-lived broker and DB credentials OIDC on the gateway; Vault dynamic secrets for Kafka/JDBC
Observability Datadog / Dynatrace + Prometheus Lag, backpressure, checkpoint health, state size Flink metric reporter → Prometheus; alert on checkpoint failures & busy-time

A few choices deserve the why, because they are the ones teams get wrong.

Why RocksDB, not the heap state backend. Flink offers an in-heap state backend that is faster for tiny state. But fraud features over millions of cardholders with 30-day windows are gigabytes to terabytes of state — far past what fits in the JVM heap, and heap state means crippling GC pauses and OOM kills under load. The EmbeddedRocksDB backend spills state to local SSD, so state size is bounded by disk not RAM, and its incremental checkpoints ship only the changed SST files since the last snapshot rather than the whole state — the difference between a 5-second checkpoint and a 5-minute one once state is large. You pay a serialization cost on every state access (RocksDB stores bytes, not objects), which is the deliberate trade for scale.

Why offsets live in the checkpoint, not in Kafka. The instinct is to let the consumer commit offsets to Kafka as it reads. Flink deliberately does not rely on that for correctness. Instead the consumed offset is part of the operator state captured in each checkpoint. On recovery, Flink restores both the operator state and the source position from the same consistent checkpoint, then replays from exactly there. State and input position move together atomically — that is the entire trick behind exactly-once on the input side. Kafka offset commits still happen, but only as progress telemetry for monitoring, never as the source of truth.

Why the sink is the hard half of exactly-once. Recovering input and state consistently is solved; the genuinely difficult part is not duplicating external side effects — the records you already wrote downstream before the crash. Flink solves this with a two-phase commit (2PC) protocol tied to the checkpoint lifecycle. For Kafka, KafkaSink in EXACTLY_ONCE mode opens a Kafka transaction, writes records into it, and only commits the transaction when the checkpoint that covers those records completes. If the job dies before the checkpoint, the transaction is aborted and the records are never visible to consumers (who read with read_committed isolation). For databases, the equivalent is an XA/2PC JDBC sink or an idempotent upsert keyed by a deterministic id. Get this wrong and you have exactly-once processing but at-least-once delivery — duplicated rows your auditors will find.

Why event time and watermarks, not processing time. Payments arrive out of order — a mobile transaction queued offline can land seconds after a later one. If your “last 60 seconds” window keys off wall-clock arrival time, late events fall in the wrong bucket and your velocity feature is wrong. Flink’s event-time processing uses the timestamp on the event and watermarks to reason about completeness (“we have probably seen all events up to time T”), with a bounded allowed lateness to admit stragglers. This is what makes the aggregates correct and, crucially, deterministic on replay — the same input always produces the same window result, which is precisely what the auditor demands.

Implementation guidance

Pin the checkpoint configuration first; it is the contract. Everything about reliability flows from these settings, and the defaults are not production-ready. A representative Flink config:

# flink-conf.yaml (or set via StreamExecutionEnvironment)
execution.checkpointing.interval: 30s
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.min-pause: 10s          # breathing room between checkpoints
execution.checkpointing.timeout: 10min
execution.checkpointing.unaligned: true         # ride out backpressure (see below)
execution.checkpointing.tolerable-failed-checkpoints: 3
state.backend.type: rocksdb
state.backend.incremental: true                 # ship only changed SST files
state.checkpoints.dir: s3://flink-ckpt-prod/checkpoints
state.savepoints.dir: s3://flink-ckpt-prod/savepoints
restart-strategy: exponential-delay

The two that bite teams: tolerable-failed-checkpoints — leave it low so a job that cannot checkpoint (often the real symptom of a deeper problem) fails fast and recovers rather than silently running with stale durability; and the Kafka producer’s transaction.timeout.ms, which must exceed your checkpoint interval plus your worst-case recovery time. If a transaction times out before the covering checkpoint commits, Kafka aborts records you needed, breaking exactly-once. Set it generously (e.g. 15 minutes) — the cost of a long timeout is small; the cost of getting it short is silent data loss on recovery.

Keep state with the operator, key everything deliberately. Stateful logic lives in a KeyedProcessFunction (or windowed operators) after a keyBy(card_id). The partitioning is load-bearing: every event for a key reaches the same sub-task, so the per-card counter is consistent without distributed locking. A sketch of a velocity feature:

public class VelocityFeature extends KeyedProcessFunction<String, Txn, Scored> {
  // managed keyed state: survives failure, checkpointed, scoped per card_id
  private transient ValueState<Long> count60s;

  @Override
  public void open(Configuration p) {
    ValueStateDescriptor<Long> d =
        new ValueStateDescriptor<>("count60s", Long.class);
    d.enableTimeToLive(StateTtlConfig                      // bound state growth
        .newBuilder(Time.minutes(2)).cleanupInRocksdbCompactFilter().build());
    count60s = getRuntimeContext().getState(d);
  }

  @Override
  public void processElement(Txn t, Context ctx, Collector<Scored> out) {
    long n = (count60s.value() == null ? 0 : count60s.value()) + 1;
    count60s.update(n);
    ctx.timerService().registerEventTimeTimer(t.eventTimeMs() + 60_000);
    out.collect(score(t, n));                              // feature feeds the model
  }
  // onTimer decrements / expires the window edge
}

Two non-obvious points. State TTL is mandatory at scale — unbounded keyed state grows until checkpoints slow and disks fill; expire cold keys (dormant cards) with StateTtlConfig and RocksDB compaction-filter cleanup. And use the keyed state APIs, never a plain HashMap field — instance fields are not checkpointed and are lost on recovery, a classic and silent bug.

Wire the sinks transactionally. The decision topic uses the exactly-once Kafka sink:

KafkaSink<Scored> sink = KafkaSink.<Scored>builder()
    .setBootstrapServers(brokers)
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("fraud-score-prod-v7")   // MUST be unique & stable per job
    .setRecordSerializer(/* ... */)
    .build();

The transactionalIdPrefix must be unique per deployed job and stable across restarts — two jobs sharing a prefix will fence each other’s transactions, and a changing prefix orphans in-flight transactions that then block consumers. For the audit/feature table, Apache Iceberg is the clean choice: its Flink sink commits a new snapshot exactly when the Flink checkpoint completes, so the table and the stream stay consistent, and you get time-travel for free — invaluable when an auditor asks for the feature values “as of” a disputed transaction.

Secrets and identity, not config files. Kafka broker credentials and JDBC passwords are dynamic, short-lived secrets from HashiCorp Vault (Kafka SASL credentials, database creds via Vault’s database secrets engine with automatic rotation), injected at TaskManager startup — never baked into the job jar or a flink-conf.yaml in git. Human access to the Flink Web UI and REST API sits behind an OIDC gateway authenticating against Microsoft Entra ID (or Okta) for SSO, because the REST API can trigger savepoints and cancel jobs and must not be open. The leak risk here is real: a Flink REST endpoint exposed without auth is effectively remote code execution on your fraud pipeline.

Enterprise considerations

Backpressure — diagnose it, do not just survive it. Backpressure is when a downstream operator (or a slow sink) cannot keep up, and the slowdown propagates upstream through Flink’s credit-based flow control until the source stops reading from Kafka. It is self-protecting — Flink will not OOM — but it is also your single most important health signal: sustained backpressure means growing Kafka consumer lag, which means your fraud scores arrive after the authorization decision is made, i.e. uselessly late. The Flink UI flags backpressured operators (busy-time / backPressuredTimeMsPerSecond metrics); the usual culprits are a slow sink (an under-provisioned database), data skew (one hot card key overwhelming one sub-task), or simply insufficient parallelism. The mitigations differ by cause: rebalance or salt hot keys for skew, scale parallelism for throughput, and async-I/O the sink for slow externals. Crucially, classic aligned checkpoints stall under heavy backpressure (barriers queue behind buffered data and checkpoints time out) — which is exactly why unaligned checkpoints (set above) exist: they let the barrier overtake in-flight buffers so checkpoints still complete while the job is backpressured, at the cost of larger snapshots.

Failure modes and recovery. Enumerate them and decide each:

Failure What Flink does What you must ensure
TaskManager crash JobManager restarts the job from the last completed checkpoint RocksDB local dirs on durable-enough disk; restart strategy with backoff
JobManager crash HA (ZooKeeper or Kubernetes) elects a new leader, recovers metadata HA configured; checkpoint metadata in durable store, not local
Poison message Operator throws, job loops on restart Dead-letter via a side output; never let one bad event halt the pipeline
Slow/dead sink DB Backpressure → lag grows Async I/O sink, circuit-break to a buffer topic, alert on lag
State corruption / bad code deploy n/a — logic error Roll back to a prior savepoint; this is why savepoints are retained

The throughline: Flink guarantees processing exactly-once across crashes automatically, but operational correctness — poison-pill handling, sink resilience, and code-rollback — is your design responsibility.

Savepoints are how you change a running pipeline without breaking it. A checkpoint is for the machine to recover from; a savepoint is for you to evolve the job. To deploy a new model version or fix a feature bug, you take a savepoint (a portable, self-contained snapshot of all state and offsets), stop the job, and restart the new code from that savepoint — state and input position carried forward, exactly-once preserved across the upgrade. This requires forethought: assign stable operator UIDs (.uid("velocity-feature")) so Flink can map saved state to operators in the new job graph, and evolve state schemas compatibly (Avro/POJO evolution) or provide a StateMigration. With the Flink Kubernetes Operator, this whole dance is declarative — set the FlinkDeployment upgrade mode to savepoint and the operator snapshots, stops, and restarts on every spec change. Savepoints also give you the audit-grade trick of reprocessing: to backfill a corrected feature, start a new job from an old savepoint or from a Kafka offset and replay history deterministically.

Security and Zero Trust. Identity-based access only: SSO via Entra ID / Okta to the UI/REST, Vault for all broker and database secrets with short TTLs and rotation, and TLS + SASL on every Kafka connection (no plaintext, no static passwords in jars). Encrypt checkpoints at rest (S3/ADLS SSE) — they contain raw cardholder state and are a regulated data store in their own right; treat the checkpoint bucket with the same controls as a database. Wiz runs continuous CSPM and DSPM (data security posture) over the object-storage checkpoint/Iceberg buckets and the Kafka clusters, catching a checkpoint bucket that drifts public or an over-permissive IAM role before it becomes an incident. CrowdStrike Falcon provides runtime protection on the Kubernetes nodes running TaskManagers — these are JVMs processing live financial data and are a high-value target. Network-isolate the cluster: brokers, Flink, and the checkpoint store communicate over private endpoints / VPC peering, never the public internet.

Cost. Streaming runs 24/7, so steady-state efficiency dominates. (1) Right-size parallelism to the p95, not the peak — over-provisioned task slots are money burning continuously; use reactive/autoscaling mode or the operator’s autoscaler to track load. (2) State backend on local SSD + incremental checkpoints is far cheaper than fat memory instances trying to hold state in heap. (3) State TTL directly controls cost — unbounded state means ever-larger checkpoints (object-storage writes), slower recovery, and bigger disks; expiring cold keys is a cost lever, not just a correctness one. (4) Checkpoint interval is a dial — more frequent means lower data-loss-replay on recovery but more object-storage I/O and overhead; tune to your RPO, do not just leave it tiny. (5) Tiered Kafka storage keeps deep replayability affordable by offloading old segments to object storage. A pragmatic split many teams land on: reserved/savings-plan compute for the steady baseline, with autoscaling absorbing the daily peak.

CI/CD and change governance. The job jar builds in GitHub Actions (or Jenkins) with the topology unit-tested using Flink’s MiniCluster test harness and KeyedOneInputStreamOperatorTestHarness — you can assert exactly-once and window behavior in CI by killing and restoring state in tests, which is the only way to catch the silent HashMap-field-not-checkpointed bug before production. Infrastructure — Kafka topics, the Flink cluster, IAM, buckets — is Terraform, so the platform is reproducible and reviewable. A production deploy (which means a stateful savepoint-restart of a live fraud pipeline) goes through a ServiceNow change request with approval, because a botched state migration can corrupt features feeding live authorizations. Datadog or Dynatrace ingests Flink’s metrics (via the Prometheus reporter) and the must-alert signals are explicit: checkpoint failure / duration trending up, consumer lag, sustained backpressure (busy-time), restart count, and state size growth — these five tell you the health of an exactly-once pipeline far better than CPU does. Where the scored decisions feed a customer-facing or fast edge, Akamai fronts the API that serves them.

Reference enterprise example

Cardinal Pay, a fictional payments processor (~1,400 employees, operating an acquiring business and a wallet), built this platform to bring fraud scoring inside the authorization window and to satisfy an AML mandate that every flagged transaction be reconstructable. Their load: ~12,000 TPS at peak (Friday-evening retail), millions of active cards, and a hard requirement that the fraud feature pipeline never double-count under failure.

Decisions they made. They ran a single logical fraud job at parallelism 96 across a Flink-on-Kubernetes cluster (the Flink Kubernetes Operator managing it), with the EmbeddedRocksDB backend on NVMe local disks and incremental checkpoints every 30 seconds to an S3 bucket. State totaled ~1.8 TB — per-card velocity counters, 10-minute distinct-merchant sets, and a last-seen-device map — with State TTL expiring cards dormant beyond 45 days to hold state flat. Input was Confluent Cloud Kafka, keyed by card id; the scored-decision topic used the exactly-once KafkaSink with a per-job transactionalIdPrefix, and the authorization microservice consumed it with read_committed isolation. An Apache Iceberg audit table captured every feature vector, committed on each checkpoint, giving time-travel for AML reconstruction. They set transaction.timeout.ms to 15 minutes after an early incident where a 5-minute timeout aborted transactions during a slow recovery and dropped scored events. Unaligned checkpoints were enabled after Friday-peak backpressure repeatedly timed out aligned ones. Secrets came from Vault; the Flink REST/UI sat behind Okta SSO; Wiz watched the checkpoint and Iceberg buckets; CrowdStrike Falcon ran on the TaskManager nodes.

The numbers. End-to-end p99 from event-ingest to scored-decision held at ~85 ms, comfortably inside the authorization budget. Checkpoints completed in ~6–9 seconds at 1.8 TB thanks to incremental snapshots (a full checkpoint would have been minutes). Monthly run cost landed near ₹46 lakh (~$55,000): Kubernetes compute for the Flink cluster ~$31,000 (autoscaled, reserved baseline), Confluent Cloud ~$12,000, S3 checkpoint + Iceberg storage and I/O ~$5,500, observability and the rest the remainder — materially cheaper than the over-provisioned all-in-memory design they prototyped first, which needed 3x the RAM to hold state on heap and still GC-stalled.

The outcome. Fraud caught inside the authorization window rose sharply because features were now real-time rather than hour-stale; a measured drop in fraud losses paid for the platform several times over. The audit story was the unlock: when a regulator queried a specific settled transaction, the team time-traveled the Iceberg table to the exact feature vector that produced the decision and reproduced the score deterministically by replaying from a savepoint — an answer a non-exactly-once pipeline could never have given with confidence. In a game-day, they killed the JobManager and three TaskManagers mid-peak; the job recovered from the last checkpoint in ~40 seconds with zero double-counted events, verified against the Iceberg audit table.

When to use it

Use this architecture when you need decisions or aggregates computed in real time over a continuous event stream; your logic is stateful (counters, windows, joins, sessionization, dedup); a duplicated or dropped event genuinely corrupts the result (money, inventory, billing, safety); and you must survive node failure without losing or double-counting. That is the heart of real-time fraud and AML, streaming feature pipelines, real-time ledgers and reconciliation, IoT/telemetry aggregation, and dynamic pricing.

Trade-offs to accept. Flink is operationally heavier than a stateless consumer or a managed query engine — you own checkpoint tuning, state-backend sizing, savepoint-driven upgrades, and backpressure diagnosis. Exactly-once sinks add real complexity (transactions, 2PC, the transaction.timeout foot-gun) and a small latency cost, since transactional output only becomes visible when its checkpoint commits — true exactly-once visibility is therefore checkpoint-paced, not instantaneous. Large state means recovery is “restore terabytes,” so checkpoint and restart times must be engineered, not assumed.

Anti-patterns. (1) Holding state in plain instance fields (a HashMap) instead of Flink keyed state — silently lost on recovery. (2) Exactly-once processing with a non-transactional sink — you get duplicate downstream effects and think you are safe. (3) transaction.timeout.ms shorter than checkpoint interval + recovery — Kafka aborts records on recovery; silent loss. (4) Unbounded keyed state with no TTL — checkpoints balloon, recovery crawls, disks fill. (5) Processing-time windows for out-of-order data — wrong, non-deterministic aggregates; use event time + watermarks. (6) No stable operator UIDs — your first state-bearing upgrade fails to restore from the savepoint. (7) Ignoring backpressure as “Flink handling load” — it is your lag-and-latency alarm, and late fraud scores are useless scores.

Alternatives, and when they win. If your transformations are stateless (filter, route, reshape) and per-event, Kafka Streams or even Kafka Connect SMTs are simpler and skip a cluster. If you want SQL-first streaming without running Flink yourself, managed Flink SQL services (Confluent’s, or cloud-managed Flink like Amazon Managed Service for Apache Flink / Azure Stream Analytics for lighter needs) trade some control for less ops — and Flink SQL composes with this architecture for the analytical slices. If “real-time” actually means minutes, a micro-batch engine (Spark Structured Streaming) is a reasonable, often cheaper fit and your team may already run it. And if the workload is fundamentally analytical queries over recent data rather than continuous stateful computation, a real-time OLAP store (Apache Pinot, ClickHouse, Apache Druid) fed by Kafka may serve the dashboards better than a Flink job. The architecture here is the destination when state, latency, and provable exactly-once all matter at once — graduate to it from the simpler tools when correctness under failure stops being optional.

FlinkStreamingExactly-OnceArchitectureEnterpriseData
Need this built for real?

Vinod is a Senior Cloud Architect (22+ yrs) — available for Azure / AWS / GCP architecture, landing zones, and migrations.

Work with me

Comments

Keep Reading