AWS Serverless

Change Data Capture with DynamoDB Streams: Lambda Triggers, EventBridge Pipes, and Exactly-Once Processing

Change data capture off DynamoDB looks deceptively simple — turn on a stream, point a Lambda at it, ship the deltas somewhere. The trouble starts the first time a single poison record wedges a shard, an OpenSearch index drifts out of sync, or someone discovers that “exactly-once” was never a property the stream promised. DynamoDB Streams give you an ordered, at-least-once feed of item-level mutations. Everything downstream — ordering across reprocessing, deduplication, partial-batch recovery — is yours to engineer. This is how to build that pipeline so it survives throttles, redrives, and the occasional malformed item, using Lambda event source mappings where you need code and EventBridge Pipes where you do not.

1. Streams internals: shards, records, and view types

A DynamoDB Stream is an ordered, append-only log of item-level changes, retained for 24 hours. It is composed of shards, and shard topology mirrors the table’s partition structure: records for a given partition key always land in the same shard lineage, so they are delivered in the order the writes were committed. That per-partition ordering is the single most important property to internalize — there is no global ordering across the table, only within a partition key.

Enable the stream and choose a view type, which controls what each record carries:

aws dynamodb update-table \
  --table-name orders \
  --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES

The four view types are not cosmetic — they dictate what your consumers can compute:

StreamViewType Record contains Use when
KEYS_ONLY Key attributes only You re-read the item yourself, or only need invalidation signals
NEW_IMAGE Item after the write Projecting current state to a search index or cache
OLD_IMAGE Item before the write Audit “what changed from”, soft-delete capture
NEW_AND_OLD_IMAGES Both Computing diffs, conditional routing on field transitions

For CDC you almost always want NEW_AND_OLD_IMAGES. The diff is what lets you route on transitions (“status went from PENDING to SHIPPED”) and reconstruct deletes — on a REMOVE event there is no new image, only the old one.

A subtle trap: a stream record’s eventName is INSERT, MODIFY, or REMOVE. A TTL deletion arrives as REMOVE with userIdentity.principalId set to dynamodb.amazonaws.com and type Service. If you mirror deletes downstream, filter TTL expiries explicitly or you will replicate housekeeping you did not intend to.

There is a second, newer option: Kinesis Data Streams for DynamoDB. It pushes changes to a Kinesis stream you own, with configurable retention up to a year and no 24-hour cap. The trade-off is that Kinesis CDC does not guarantee ordering or exact deduplication the way the native DynamoDB Stream does — records can be delivered more than once and slightly out of order, so you must dedupe and reorder on a sequence number. Reach for it when you need long retention or multiple independent fan-out consumers; stay on native Streams when strict per-key ordering matters most.

2. Consuming with Lambda: batch size, batch window, and parallelization factor

The native stream is consumed through a Lambda event source mapping (ESM) — a managed poller AWS runs on your behalf. You do not write the polling loop; you tune it. Three knobs dominate throughput and latency:

aws lambda create-event-source-mapping \
  --function-name orders-cdc-processor \
  --event-source-arn "$STREAM_ARN" \
  --starting-position LATEST \
  --batch-size 100 \
  --maximum-batching-window-in-seconds 5 \
  --parallelization-factor 4 \
  --maximum-retry-attempts 3 \
  --bisect-batch-on-function-error \
  --maximum-record-age-in-seconds 3600 \
  --function-response-types ReportBatchItemFailures \
  --destination-config '{"OnFailure":{"Destination":"'"$DLQ_ARN"'"}}'

ParallelizationFactor deserves a warning. Ordering is preserved per partition key even with a factor above 1: the poller hashes each record’s partition key to one of the concurrent invocations, so all records for a key still go to the same invocation in order. What you lose is ordering across different keys within the same shard — which is almost always fine, because cross-key ordering was never guaranteed table-wide anyway. Use it freely as long as your processing logic only assumes per-key order.

--starting-position LATEST begins at the tip and ignores history; TRIM_HORIZON replays everything still in the 24-hour window. For a brand-new pipeline that must backfill, TRIM_HORIZON plus a one-time scan-and-load for older data is the usual combination — the stream alone cannot reach beyond 24 hours.

3. Ordering and partial-batch failures: bisect-on-error and checkpointing

Here is the failure mode that bites everyone. By default, if your function throws on a batch, the ESM retries the entire batch — and because the shard’s checkpoint has not advanced, it keeps retrying the same batch until it succeeds, expires past MaximumRecordAgeInSeconds, or exhausts MaximumRetryAttempts. A single un-processable record blocks every record behind it in that shard. That is a stalled shard, and you will see it as a climbing iterator age.

Two mechanisms fix this; use both.

Partial batch responses let the function tell the poller exactly which records failed so the rest are checkpointed and never reprocessed. Set --function-response-types ReportBatchItemFailures and return the failed sequence numbers:

def handler(event, context):
    failures = []
    for record in event["Records"]:
        seq = record["dynamodb"]["SequenceNumber"]
        try:
            process(record)
        except Exception:
            failures.append({"itemIdentifier": seq})
    return {"batchItemFailures": failures}

The contract has a sharp edge worth memorizing: because ordering matters, the poller treats the earliest reported failure as the checkpoint. Everything at or after that sequence number is retried; everything before it is considered done. So return failures honestly — do not report a late record as failed while silently dropping an earlier one, or you will skip data. Returning an empty list (or {"batchItemFailures": []}) means the whole batch succeeded.

Bisect-on-error (--bisect-batch-on-function-error) handles the case where the function throws instead of reporting failures. On error the poller splits the batch in two and retries each half, recursively narrowing down to the single offending record. That record then goes to the on-failure destination once retries are exhausted, and the rest of the batch proceeds. Without bisect, one bad record can burn your entire retry budget on a 100-record batch.

Rule of thumb: ReportBatchItemFailures is the primary mechanism — it is precise and avoids reprocessing good records. BisectBatchOnFunctionError is the safety net for unexpected exceptions and serialization bugs you did not anticipate. Ship both.

4. EventBridge Pipes: filtering, enrichment, and routing without glue Lambdas

A large fraction of CDC “processors” are glue: filter the events you care about, reshape them, send them to a target. EventBridge Pipes does exactly that as a managed integration — source, optional filter, optional enrichment, target — with no poller code to own. The source can be a DynamoDB Stream directly.

The filter stage runs before you pay for enrichment or invocation, so push as much selectivity into it as you can. Pipes filter patterns match the stream record shape, including the DynamoDB-typed attribute values:

{
  "eventName": ["MODIFY"],
  "dynamodb": {
    "NewImage": {
      "status": { "S": ["SHIPPED"] }
    }
  }
}

The Terraform wiring is compact, and the dynamodb_stream_parameters block carries the same batch/window/retry semantics as a Lambda ESM:

resource "aws_pipes_pipe" "orders_shipped" {
  name     = "orders-shipped-to-bus"
  role_arn = aws_iam_role.pipe.arn
  source   = aws_dynamodb_table.orders.stream_arn
  target   = aws_cloudwatch_event_bus.commerce.arn

  source_parameters {
    dynamodb_stream_parameters {
      starting_position                  = "LATEST"
      batch_size                         = 100
      maximum_batching_window_in_seconds = 5
      maximum_retry_attempts             = 4
      on_partial_batch_item_failure      = "AUTOMATIC_BISECT"
      dead_letter_config {
        arn = aws_sqs_queue.pipe_dlq.arn
      }
    }
    filter_criteria {
      filter {
        pattern = jsonencode({
          eventName = ["MODIFY"]
          dynamodb  = { NewImage = { status = { S = ["SHIPPED"] } } }
        })
      }
    }
  }
}

The enrichment stage (a Lambda, Step Functions Express workflow, API Gateway, or API destination) can hydrate the event — look up a customer record, call out to a pricing service — and return a transformed payload, all without you running a poller. The target transformer then reshapes the final payload with an input template. The net effect: a filter-enrich-route pipeline that used to be 150 lines of Lambda becomes declarative infrastructure, and the only code you keep is genuine business logic in the enrichment.

Choose Pipes when the work is route and reshape. Keep a Lambda ESM when the work is compute — non-trivial transformation, transactional writes to multiple sinks, or anything needing libraries and real error handling.

5. Idempotency and exactly-once semantics in downstream sinks

DynamoDB Streams are at-least-once. Redrives, bisect retries, and poller restarts all mean a downstream sink can see the same change more than once. “Exactly-once” downstream is therefore not a delivery setting you toggle — it is an idempotent write you design. The good news is every stream record hands you the tools: a stable item key and a monotonic SequenceNumber.

The cleanest pattern is a conditional write keyed on sequence. Store the last applied sequence number per item and reject anything not strictly newer. This collapses duplicates and protects against out-of-order application during reprocessing:

def apply_idempotent(key, new_image, sequence):
    try:
        table.put_item(
            Item={**new_image, "_seq": sequence},
            ConditionExpression="attribute_not_exists(#s) OR #s < :seq",
            ExpressionAttributeNames={"#s": "_seq"},
            ExpressionAttributeValues={":seq": sequence},
        )
    except ClientError as e:
        if e.response["Error"]["Code"] != "ConditionalCheckFailedException":
            raise
        # Duplicate or stale replay -> safe no-op

DynamoDB SequenceNumber values are strictly increasing within a shard for a given key, which is exactly the scope this guard needs. For sinks that lack conditional writes, fall back to a deterministic idempotency key — partitionKey#sortKey#sequenceNumber — and let the sink’s natural upsert (OpenSearch _id, an S3 object key, a primary-key MERGE) absorb the duplicate. The principle is invariant: make replay a no-op, never an additive side effect.

6. Fan-out to OpenSearch, S3, and analytics targets

Real pipelines feed several sinks with different durability and freshness needs. Resist the urge to fan out from a single mega-Lambda — couple them and one slow target stalls the shard for all of them. Fan out at the bus instead: a Pipe lands changes on an EventBridge bus, and independent rules deliver to each consumer with its own retry budget and DLQ.

A single Lambda target can also do the OpenSearch fan-out directly when you want code-level control — emit per-document failures via ReportBatchItemFailures so a 409 on one doc does not block the batch:

from opensearchpy import helpers

def to_action(record):
    key = record["dynamodb"]["Keys"]["pk"]["S"]
    if record["eventName"] == "REMOVE":
        return {"_op_type": "delete", "_index": "orders", "_id": key}
    img = deserialize(record["dynamodb"]["NewImage"])
    version = int(record["dynamodb"]["SequenceNumber"])
    return {
        "_op_type": "index", "_index": "orders", "_id": key,
        "_version": version, "_version_type": "external", "_source": img,
    }

7. Handling poison records with on-failure destinations and DLQs

A poison record is one that will never succeed no matter how many times you retry — a schema your code cannot parse, a downstream constraint it will always violate. The entire point of MaximumRetryAttempts, MaximumRecordAgeInSeconds, and an on-failure destination is to evict that record so the shard keeps moving.

For a Lambda ESM, the on-failure destination (SQS or SNS) receives metadata about the failed batch, not the record bodies themselves — shard ID, the failing sequence-number range, and an error summary. That is deliberate: the records still live in the stream for 24 hours, so the destination’s job is to point you at them. Your redrive tooling reads the DynamoDBStreamArn, shard, and sequence-number window from the failure message and re-reads the offending records with GetShardIterator at AT_SEQUENCE_NUMBER. An EventBridge Pipes DLQ behaves the same way — it captures failure context after retries are exhausted.

{
  "requestContext": { "functionArn": "arn:aws:lambda:...:orders-cdc-processor" },
  "responseContext": { "statusCode": 200, "functionError": "Unhandled" },
  "DDBStreamBatchInfo": {
    "shardId": "shardId-00000001700000000000-abcdef01",
    "startSequenceNumber": "700000000000000000001",
    "endSequenceNumber": "700000000000000000099",
    "approximateArrivalOfFirstRecord": "2026-06-08T14:00:00Z",
    "batchSize": 99,
    "streamArn": "arn:aws:dynamodb:...:table/orders/stream/2026-06-08T00:00:00.000"
  }
}

Alarm on ApproximateAgeOfOldestRecord and on DLQ depth, and make the redrive a deliberate, reviewed action — not an automatic retry that re-poisons the pipeline. Set MaximumRecordAgeInSeconds comfortably below 24 hours (e.g. 6 hours) so records are evicted to the DLQ while they are still readable in the stream; if a record ages past the 24-hour retention before you redrive, it is gone for good.

Verify

Confirm the guarantees, not just that data moved. Generate a known mutation and trace it end to end:

# 1. Mutate an item and capture the moment
aws dynamodb update-item \
  --table-name orders \
  --key '{"pk":{"S":"ORDER#42"}}' \
  --update-expression "SET #s = :v" \
  --expression-attribute-names '{"#s":"status"}' \
  --expression-attribute-values '{":v":{"S":"SHIPPED"}}'

# 2. Iterator age must trend to ~0 — a rising value means a stalled shard
aws cloudwatch get-metric-statistics \
  --namespace AWS/Lambda --metric-name IteratorAge \
  --dimensions Name=FunctionName,Value=orders-cdc-processor \
  --start-time "$(date -u -v-15M +%FT%TZ)" --end-time "$(date -u +%FT%TZ)" \
  --period 60 --statistics Maximum

# 3. Confirm the upsert landed with the expected version in OpenSearch
curl -s "$OS_ENDPOINT/orders/_doc/ORDER%2342" | jq '._version, ._source.status'

# 4. Prove idempotency: replay the same record twice, assert _version is unchanged

Then deliberately break it: push a malformed item, confirm the batch bisects, the poison record lands in the DLQ, the shard’s iterator age recovers, and the other items in that batch still arrived. A CDC pipeline you have not watched recover from a poison record is a pipeline you do not yet trust.

Enterprise scenario

A retail platform team ran inventory on a single DynamoDB table fronting 2,000 stores, with a Lambda ESM projecting every change into an OpenSearch index that powered the storefront’s “in stock near you” search. On a normal day it was fine. During a flash sale, one SKU’s partition went hot, the ESM’s iterator age on that shard climbed past 40 minutes, and search results went stale right when traffic peaked — the exact failure that costs money. Worse, a malformed bundle item (a nested map their deserializer choked on) had been silently stuck in that shard for hours, blocking every inventory update behind it because the whole batch kept failing and retrying.

The constraint was twofold: they needed the hot shard to catch up without resharding a 2,000-store table mid-sale, and they could not let one un-parseable item hold the rest hostage. Their original ESM had BatchSize=500, ParallelizationFactor=1, no partial-batch reporting, and no bisect — so a single bad record consumed the entire retry budget on a 500-record batch, over and over.

They made three changes, all configuration. They raised ParallelizationFactor to 8 so the hot shard drained across eight concurrent invocations (ordering held, because all records for a given SKU still hashed to one invocation). They switched the function to ReportBatchItemFailures and returned precise failed sequence numbers, so good records checkpointed immediately instead of being reprocessed behind the poison item. And they enabled BisectBatchOnFunctionError with MaximumRecordAgeInSeconds=21600, so the malformed bundle item was narrowed down and evicted to a DLQ within minutes instead of wedging the shard for hours.

aws lambda update-event-source-mapping \
  --uuid "$ESM_UUID" \
  --parallelization-factor 8 \
  --batch-size 100 \
  --maximum-record-age-in-seconds 21600 \
  --bisect-batch-on-function-error \
  --function-response-types ReportBatchItemFailures \
  --destination-config '{"OnFailure":{"Destination":"'"$DLQ_ARN"'"}}'

The outcome: iterator age on the hot shard fell back under 30 seconds within the next sale, the poison bundle item surfaced in the DLQ with its exact sequence range for a engineer to fix offline, and the storefront search stopped lying about stock during the one window where accuracy mattered most. No table redesign, no resharding — just the ESM knobs used the way they were designed.

Checklist

awsdynamodbstreamscdcevent-driven

Comments

Keep Reading