A payments fraud team runs a stateful streaming job that scores every card authorization against a 30-minute sliding window of velocity features. It has lived on a single hand-rolled Flink session cluster on three VMs for two years, and it has started to hurt: when the JobManager VM rebooted for patching last month the job restarted from scratch, replayed six hours of Kafka, and the fraud model went blind during the replay window — exactly when an attacker would want it blind. Worse, every Flink version bump is a white-knuckle manual savepoint-and-pray, and nobody can tell the auditor where the job state physically lives or who can reach it. The mandate from the platform lead is concrete: move this to Kubernetes, make JobManager failure a non-event, persist state to S3 with point-in-time recovery, and make a version upgrade a reviewable, rollback-able pipeline step. This guide is that migration, done properly with the Flink Kubernetes Operator — a FlinkDeployment with HA, incremental checkpointing to S3, and savepoint-based upgrades that lose nothing.
This is an advanced, hands-on guide. It assumes you are comfortable with kubectl, Helm, and Flink’s checkpoint/savepoint model, and it produces a running production-shaped job, not a toy.
Prerequisites
- A Kubernetes cluster ≥ 1.27 (EKS, GKE, or AKS) with at least 3 worker nodes and the cluster autoscaler enabled. Commands below assume EKS, but only the S3/IAM and DNS bits are cloud-specific.
kubectl,helm≥ 3.12, and theflinkCLI from a matching Flink 1.18 distribution on your workstation.- cert-manager installed in the cluster — the operator’s admission webhook needs it for TLS.
- An S3 bucket (or GCS/ABFS equivalent) for checkpoints, savepoints, and HA metadata, plus an IAM role you can attach to a Kubernetes ServiceAccount via IRSA (IAM Roles for Service Accounts) so pods get S3 credentials with no static keys.
- A Kafka cluster reachable from the cluster (the example job reads
card-authand writesfraud-scores). - HashiCorp Vault reachable in-cluster (used below to inject the Kafka SASL credential, never a Kubernetes Secret in git).
- Cluster-admin for the one-time operator install; namespace-scoped RBAC after that.
Target topology
The shape is deliberately boring, which is the point. The Flink Kubernetes Operator runs in its own flink-operator namespace and watches FlinkDeployment/FlinkSessionJob custom resources across the cluster. Each application job is one FlinkDeployment in the payments namespace, which the operator reconciles into a JobManager Deployment and a TaskManager Deployment in Application mode (one cluster per job, fully isolated — no noisy-neighbor session cluster). The JobManager runs in Kubernetes HA mode: leader election and the JobGraph/checkpoint pointers live in a ConfigMap, so a JobManager pod can die and a standby resumes from the last checkpoint with no human in the loop. State durability lives entirely off-cluster in S3 — checkpoints (automatic, frequent, for failure recovery) and savepoints (deliberate, portable, for upgrades) — reached through IRSA so there is not a single AWS key on disk. Identity for the humans who operate it flows from Okta (the workforce IdP) federated to Entra ID, gating both the cluster’s RBAC and the Flink Web UI behind SSO; secrets the job needs (the Kafka SASL/SCRAM password) come from HashiCorp Vault via the Agent injector. Everything is shipped by Argo CD from git, observed by Datadog, and changed through a ServiceNow gate.
1. Provision the S3 state backend and IRSA role
Flink needs one durable object store for three things: checkpoints, savepoints, and HA metadata. Create the bucket with versioning on (so a fat-fingered lifecycle rule cannot vaporize your only savepoint) and lay out clear prefixes.
export REGION=ap-south-1
export BUCKET=kv-flink-state-payments-prod
aws s3api create-bucket \
--bucket "$BUCKET" --region "$REGION" \
--create-bucket-configuration LocationConstraint="$REGION"
aws s3api put-bucket-versioning \
--bucket "$BUCKET" \
--versioning-configuration Status=Enabled
# Three logical roots the FlinkDeployment will reference
aws s3api put-object --bucket "$BUCKET" --key checkpoints/
aws s3api put-object --bucket "$BUCKET" --key savepoints/
aws s3api put-object --bucket "$BUCKET" --key ha/
Create an IAM policy scoped to this bucket only, then bind it to a Kubernetes ServiceAccount with IRSA. Least privilege matters here: the job identity should never be able to read another team’s state bucket.
cat > /tmp/flink-s3-policy.json <<'JSON'
{
"Version": "2012-10-17",
"Statement": [
{ "Effect": "Allow",
"Action": ["s3:ListBucket"],
"Resource": "arn:aws:s3:::kv-flink-state-payments-prod" },
{ "Effect": "Allow",
"Action": ["s3:GetObject","s3:PutObject","s3:DeleteObject"],
"Resource": "arn:aws:s3:::kv-flink-state-payments-prod/*" }
]
}
JSON
aws iam create-policy --policy-name kv-flink-state-payments \
--policy-document file:///tmp/flink-s3-policy.json
# Create the IRSA-bound ServiceAccount via eksctl (handles the trust policy)
eksctl create iamserviceaccount \
--cluster kv-prod --namespace payments \
--name flink \
--attach-policy-arn arn:aws:iam::<ACCOUNT_ID>:policy/kv-flink-state-payments \
--approve --region "$REGION"
The flink ServiceAccount in payments now assumes an S3-scoped role automatically — no aws.access-key/secret-key anywhere. (On GKE use Workload Identity to a GCS bucket; on AKS, a federated identity credential to ABFS — the rest of this guide is unchanged.)
2. Install cert-manager and the Flink Kubernetes Operator
The operator’s webhook validates and defaults your CRs, and it needs cert-manager for its serving certificate. Install that first, then the operator from the official Apache Helm chart.
# cert-manager (skip if already present)
helm repo add jetstack https://charts.jetstack.io
helm repo update
helm install cert-manager jetstack/cert-manager \
--namespace cert-manager --create-namespace \
--version v1.15.3 --set crds.enabled=true
# Flink Kubernetes Operator 1.10 (supports Flink 1.18/1.19/1.20)
helm repo add flink-operator-repo \
https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/
helm install flink-kubernetes-operator \
flink-operator-repo/flink-kubernetes-operator \
--namespace flink-operator --create-namespace \
--set watchNamespaces='{payments}'
Pinning watchNamespaces keeps the operator from reconciling CRs cluster-wide — important in a shared cluster so the payments operator does not touch another team’s jobs. Confirm it is healthy before going further.
kubectl -n flink-operator get pods
kubectl -n flink-operator logs deploy/flink-kubernetes-operator -c flink-kubernetes-operator | tail -20
kubectl get crd | grep flink
# flinkdeployments.flink.apache.org
# flinksessionjobs.flink.apache.org
3. Build and publish the application image
Application mode bakes your job JAR into an image that extends the official Flink base, with the S3 filesystem plugin enabled. Build it in GitHub Actions (your CI) so the digest is reproducible and signed, and push to your registry.
# Dockerfile
FROM flink:1.18.1-java17
# Enable the S3 filesystem (presto for checkpoints, hadoop also fine)
RUN mkdir -p /opt/flink/plugins/s3-fs-presto && \
cp /opt/flink/opt/flink-s3-fs-presto-1.18.1.jar \
/opt/flink/plugins/s3-fs-presto/
# Your shaded job jar
COPY target/fraud-scoring-1.4.0.jar /opt/flink/usrlib/fraud-scoring.jar
docker build -t <REGISTRY>/payments/fraud-scoring:1.4.0 .
docker push <REGISTRY>/payments/fraud-scoring:1.4.0
In a real pipeline this is a GitHub Actions job that builds, runs the Flink job’s unit tests, scans the image with Wiz / Wiz Code (container and IaC scanning — it fails the build on a critical CVE or a misconfigured manifest before anything reaches the cluster), and pushes the immutable tag. The cluster never builds; it only pulls a vetted digest.
4. Wire the Kafka credential through Vault (no Secrets in git)
The job authenticates to Kafka with SASL/SCRAM. That password must not live in a Kubernetes Secret committed to the GitOps repo. Store it in HashiCorp Vault and let the Vault Agent injector mount it into the pods as a file the job reads at startup.
# Put the secret in Vault (one time, by an operator, never in git)
vault kv put secret/payments/kafka \
username='fraud-scoring' password='<scram-password>'
# Bind a Vault role to the flink ServiceAccount via the Kubernetes auth method
vault write auth/kubernetes/role/flink-payments \
bound_service_account_names=flink \
bound_service_account_namespaces=payments \
policies=payments-kafka-read ttl=1h
You then add vault.hashicorp.com/* annotations to the pod templates (Step 5) so the Agent injects /vault/secrets/kafka.properties. The credential is leased, short-lived, and never written to a Kubernetes Secret or the git repo — which is the whole point.
5. Define the highly available FlinkDeployment
This is the core artifact. One FlinkDeployment describes the whole job-specific cluster: HA, the S3 state backend, checkpoint cadence, resources, and the upgrade strategy. Read every block — the comments explain the why.
# fraud-scoring.yaml
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: fraud-scoring
namespace: payments
spec:
image: <REGISTRY>/payments/fraud-scoring:1.4.0
flinkVersion: v1_18
serviceAccount: flink # the IRSA-bound SA from Step 1
flinkConfiguration:
# --- High availability: JobManager failure must be a non-event ---
high-availability.type: kubernetes
high-availability.storageDir: s3://kv-flink-state-payments-prod/ha/
kubernetes.jobmanager.replicas: "2" # active + standby, leader-elected
# --- State backend + checkpointing to S3 ---
state.backend.type: rocksdb # large keyed state spills to disk
state.backend.incremental: "true" # only ship changed RocksDB SSTs
state.checkpoints.dir: s3://kv-flink-state-payments-prod/checkpoints/
state.savepoints.dir: s3://kv-flink-state-payments-prod/savepoints/
execution.checkpointing.interval: "30 s"
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: "10 min"
execution.checkpointing.min-pause: "10 s"
execution.checkpointing.max-concurrent-checkpoints: "1"
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
# --- Restart strategy so transient blips self-heal ---
restart-strategy: exponential-delay
restart-strategy.exponential-delay.max-backoff: "5 min"
# --- Metrics out to Datadog via the StatsD/DogStatsD reporter ---
metrics.reporter.dghttp.factory.class: org.apache.flink.metrics.datadog.DatadogHttpReporterFactory
metrics.reporter.dghttp.apikey: "${DD_API_KEY}"
metrics.reporter.dghttp.tags: "service:fraud-scoring,env:prod"
jobManager:
resource: { memory: "2048m", cpu: 1 }
podTemplate:
metadata:
annotations:
vault.hashicorp.com/agent-inject: "true"
vault.hashicorp.com/role: "flink-payments"
vault.hashicorp.com/agent-inject-secret-kafka.properties: "secret/payments/kafka"
taskManager:
resource: { memory: "4096m", cpu: 2 }
podTemplate:
metadata:
annotations:
vault.hashicorp.com/agent-inject: "true"
vault.hashicorp.com/role: "flink-payments"
vault.hashicorp.com/agent-inject-secret-kafka.properties: "secret/payments/kafka"
job:
jarURI: local:///opt/flink/usrlib/fraud-scoring.jar
parallelism: 6
upgradeMode: savepoint # <-- redeploys take a savepoint first (Step 8)
state: running
The three settings that do the real work: high-availability.type: kubernetes makes JobManager loss survivable; state.backend.incremental: true on RocksDB means a 200 GB keyed state checkpoints in seconds because only changed SST files ship to S3; and upgradeMode: savepoint is what turns a version bump from “stop and replay” into a clean savepoint-restore. Apply it:
kubectl apply -f fraud-scoring.yaml
kubectl -n payments get flinkdeployment fraud-scoring -w
In practice you do not kubectl apply by hand in production — this YAML lives in a git repo that Argo CD syncs to the cluster, so the desired state is auditable and every change is a reviewed pull request. The kubectl apply above is the manual equivalent for a first run. Terraform provisions everything underneath the job (the bucket, IAM/IRSA, the EKS node groups, Vault policies); Argo CD owns everything inside the cluster from that point on.
6. Expose the Flink UI behind SSO
The Flink Web UI shows job graph, checkpoint history, and backpressure — operators need it, but it must not be open. Front it with an ingress that requires authentication, federated from Okta to Entra ID via an OIDC proxy (oauth2-proxy here), so only on-call engineers in the right group reach it and every session is tied to a real identity.
kubectl -n payments port-forward svc/fraud-scoring-rest 8081:8081
# Production: an oauth2-proxy ingress in front of fraud-scoring-rest,
# OIDC issuer = Entra ID (workforce SSO federated from Okta),
# allowed group = payments-flink-operators
Edge traffic to the UI and to any externally exposed Flink endpoint terminates at Akamai for TLS, WAF, and bot mitigation before it reaches the ingress — the same perimeter every other KloudVin service sits behind. Hosts that are not the cluster — a bastion or a legacy connector virtual appliance bridging to the on-prem fraud system — are enrolled in CrowdStrike Falcon for runtime threat detection and feed the SOC, and onto the same Datadog account so you have one pane across pods and appliances.
7. Trigger and inspect a manual savepoint
Checkpoints happen automatically every 30 seconds for failure recovery. A savepoint is a deliberate, self-contained, version-portable snapshot you take before an upgrade or for an audit point-in-time. With the operator you do not call the CLI — you patch the CR, and the operator drives Flink.
# Ask the operator to take a savepoint to s3://.../savepoints/
kubectl -n payments patch flinkdeployment fraud-scoring --type merge \
-p '{"spec":{"job":{"savepointTriggerNonce": 1}}}'
# Watch the operator record it on the resource status
kubectl -n payments get flinkdeployment fraud-scoring \
-o jsonpath='{.status.jobStatus.savepointInfo.lastSavepoint.location}{"\n"}'
# s3://kv-flink-state-payments-prod/savepoints/savepoint-abc123-...
Bump savepointTriggerNonce to a new integer each time you want a fresh savepoint. The returned S3 path is the exact artifact you would hand an auditor, or restore from in a disaster.
8. Perform a zero-data-loss version upgrade
This is the workflow that the whole migration exists to make safe. Because the job is upgradeMode: savepoint, redeploying with a new image makes the operator do the right dance automatically: take a fresh savepoint, stop the job, start the new image, and restore from that savepoint. No Kafka replay, no blind window.
# Bump the image (and/or job logic) — in GitOps this is a PR edit
kubectl -n payments patch flinkdeployment fraud-scoring --type merge \
-p '{"spec":{"image":"<REGISTRY>/payments/fraud-scoring:1.5.0"}}'
# Operator sequence (observe it on status.lifecycleState):
# SUSPENDED(savepoint taken) -> UPGRADING -> DEPLOYED -> STABLE
kubectl -n payments get flinkdeployment fraud-scoring \
-o jsonpath='{.status.lifecycleState}{" reconcile="}{.status.reconciliationStatus.state}{"\n"}'
The real production path for this patch is a pull request that Argo CD syncs, gated by a ServiceNow change request — the deploy is blocked until the change ticket is approved, so the auditor sees who authorized the upgrade and when, and an incident auto-raises in ServiceNow if a post-deploy health check fails. Datadog monitors the numRestarts, checkpoint duration, and consumer lag through the cutover; a spike pages on-call. That is the difference between the old “savepoint-and-pray” and a reviewable, observable, rollback-able pipeline step.
Validation
Prove the three properties you came for — running, recoverable, upgrade-safe — before you call it done.
# 1) Job is RUNNING with the expected parallelism
kubectl -n payments get flinkdeployment fraud-scoring \
-o jsonpath='{.status.jobStatus.state}{"\n"}' # RUNNING
# 2) Checkpoints are actually completing to S3 (not silently failing)
aws s3 ls s3://kv-flink-state-payments-prod/checkpoints/ --recursive | tail
# ... expect a fresh chk-<n>/_metadata every ~30s
# 3) HA works: kill the leader JobManager and confirm no full restart
kubectl -n payments delete pod -l component=jobmanager \
--field-selector status.phase=Running --grace-period=0
# Watch: standby takes leadership, job resumes from last checkpoint.
# numRestarts should NOT jump to a full job restart from offset 0.
kubectl -n payments logs -l component=jobmanager --tail=50 | grep -i "leader\|restored"
The decisive HA test is step 3: deleting the active JobManager must result in the standby resuming from the last checkpoint, not the job replaying from the earliest Kafka offset. If you see a full restart, your high-availability.storageDir is unreachable (usually IRSA not actually attached) — fix that before trusting the cluster.
Rollback / teardown
A bad upgrade rolls back to a known-good savepoint; a full teardown removes the job but, deliberately, keeps the state in S3.
# Roll back to a specific prior savepoint (e.g. after a bad 1.5.0)
kubectl -n payments patch flinkdeployment fraud-scoring --type merge -p '{
"spec": {
"image": "<REGISTRY>/payments/fraud-scoring:1.4.0",
"job": { "initialSavepointPath":
"s3://kv-flink-state-payments-prod/savepoints/savepoint-abc123-...",
"upgradeMode": "savepoint" }
}
}'
# Stop the job but RETAIN externalized state (RETAIN_ON_CANCELLATION did this)
kubectl -n payments delete flinkdeployment fraud-scoring
# Full uninstall of the operator (only when decommissioning the platform)
helm -n flink-operator uninstall flink-kubernetes-operator
helm -n cert-manager uninstall cert-manager
# S3 state survives all of the above — delete it only by intent:
# aws s3 rm s3://kv-flink-state-payments-prod/ --recursive
Never let teardown delete the bucket. Your savepoints are the only thing standing between a bad day and a six-hour Kafka replay; keep them until you have consciously decided the job is gone for good.
Common pitfalls
- HA storage dir not actually writable. The job “works” but a JobManager restart replays from offset 0. The cause is almost always IRSA not bound (the SA name in the
FlinkDeploymentdoes not match the oneeksctlcreated) so the pod cannot writeha/. Verify withaws sts get-caller-identityfrom inside a TaskManager pod. - Wrong S3 filesystem plugin. Using
flink-s3-fs-hadoopandflink-s3-fs-prestoat once, or neither, givesUnsupportedFileSystemException. Pick one (presto for checkpoints is the common choice) and copy exactly that jar intoplugins/. upgradeMode: statelessleft on by accident. Every redeploy then silently drops state and replays — the exact failure you migrated to escape. For any stateful job it must besavepoint(orlast-stateif you accept restoring from the latest checkpoint instead of a fresh savepoint).- Checkpoints too aggressive for the state size. A 10-second interval on 200 GB of state with
max-concurrent-checkpoints: 1causes checkpoints to overlap and back up. Start at 30 s, watch checkpoint duration in Datadog, and only then tune down. - RocksDB without incremental. Full checkpoints of large keyed state saturate the network and time out.
state.backend.incremental: trueis non-optional at scale. - Operator watching the wrong namespace. If
watchNamespacesdoes not includepayments, yourFlinkDeploymentis accepted by the API server but never reconciled — it just sits there. Check the operator logs.
Security notes
Identity is end to end: human access to the cluster and the Flink UI federates Okta → Entra ID with SSO and conditional access, scoped by group, so only on-call payments engineers reach the job and every action ties to a named user. The job’s S3 access uses IRSA — a scoped, keyless IAM role on the flink ServiceAccount — so there is no AWS credential on disk to leak, and the policy is locked to this one bucket. The Kafka SASL password is leased from HashiCorp Vault and injected as a short-lived file, never a Kubernetes Secret in git. Images are scanned by Wiz / Wiz Code in CI (container CVEs and manifest misconfigurations) and only signed digests deploy; any non-pod hosts — a bastion or a connector virtual appliance to the on-prem fraud system — carry CrowdStrike Falcon for runtime detection into the SOC. Enable encryption-at-rest on the bucket (SSE-KMS) and TLS on the Kafka and S3 paths; Akamai fronts any externally reachable endpoint with WAF and TLS termination.
Cost notes
The big lever is right-sizing TaskManager memory and CPU to the state and throughput, then letting the cluster autoscaler add nodes only under real load rather than statically provisioning for peak — Application mode’s one-cluster-per-job isolation makes that per-job sizing honest. Incremental checkpointing also directly cuts cost: shipping only changed RocksDB SST files means far less S3 PUT volume and egress than full checkpoints every 30 seconds. Apply an S3 lifecycle policy to expire old externalized checkpoints (but not savepoints, which you keep deliberately) so the bucket does not grow unbounded. Watch checkpoint duration, TaskManager CPU, and S3 request counts in Datadog, and use Flink’s reactive/autoscaler mode to scale parallelism with Kafka lag so you are paying for slots you are actually using. If this job is part of a wider data-platform enablement track for engineers, fold the runbook and the upgrade procedure into Moodle so on-call rotation onboarding is consistent and the savepoint-upgrade dance is documented once, not relearned each incident.