There's a line in the Spark documentation that every streaming engineer eventually learns the hard way:
foreachBatch delivers your data to the function. What happens inside is your problem.
That single distinction is the most important thing to understand about wiring Structured Streaming to any non-native sink — including LanceDB. This post is about exactly where that line sits, why it matters, and what you build to protect yourself on the other side of it.
What foreachBatch Actually Does
Structured Streaming processes data in micro-batches. Each micro-batch is a discrete DataFrame representing the records that arrived since the last trigger. foreachBatch hands you that DataFrame and a batch ID, then steps back.
def write_to_lance(batch_df, batch_id):
# Spark's job is done here
# Everything below is yours
records = batch_df.toPandas()
records["vector"] = embed(records)
table.add(records)
Spark knows the batch was delivered to the function. It has no visibility into what the function does with it. If embedding fails, Spark doesn't know. If the LanceDB write times out, Spark doesn't know. If the function exits cleanly after writing half the records, Spark doesn't know that either.
This isn't a flaw in foreachBatch. It's the design. The escape hatch is intentionally minimal — flexibility requires responsibility.
The Two Failure Modes That Will Find You
Duplicate writes on retry. Spark's at-least-once delivery guarantee means it will retry a batch if the function throws an exception. That's the correct behavior. The problem is LanceDB is append-only — it has no awareness of whether a batch was already written. A retry that succeeds after a partial write produces duplicate vectors in your index, and duplicate vectors produce retrieval noise that degrades answer quality silently. No errors. Just wrong answers with high confidence scores.
Silent record loss on write failure. If your function catches an exception internally and exits cleanly rather than re-raising, Spark marks the batch as successful and advances the checkpoint. Those records are gone. The stream continues. Nothing alerts. You discover the gap when someone queries for something that should be there and isn't.
Re-raising exceptions is not optional. If something fails inside foreachBatch, Spark needs to know.
The Fix: Idempotency via Batch ID Tracking
The solution to both failure modes is the same: track processed batch IDs in a small Delta table and check before writing.
def write_to_lance(batch_df, batch_id):
if is_already_processed(batch_id):
return # safe skip on retry
try:
records = batch_df.toPandas()
records["vector"] = embed(records)
table.add(records)
mark_processed(batch_id)
except Exception as e:
log.error(f"Batch {batch_id} failed: {e}")
raise # re-raise so Spark retries correctly
The deduplication table is intentionally minimal — batch ID, timestamp, status. Three columns. The overhead is negligible. The protection is not.
Two things worth noting about this pattern. First, mark_processed only gets called after a successful LanceDB write — never before. The order matters. Second, re-raising the exception after logging is what keeps Spark's retry behavior intact. Swallowing it is the mistake.
The Hidden Bottleneck: Embedding Throughput
Idempotency solves the correctness problem. Embedding throughput is the performance problem, and it's the one that surprises people most.
Embedding is CPU or GPU bound. It doesn't parallelize the way a Spark transformation does. When foreachBatch hands you a DataFrame and you call .toPandas() to embed it, you've left the distributed execution model and entered single-machine territory.
The consequence is straightforward: if your trigger interval is shorter than your embedding time, batches accumulate faster than they clear. Backpressure builds. Eventually the stream falls behind or fails.
The fix has two parts.
Profile before you commit. Run your embedding step against realistic batch sizes in your actual environment before you set a trigger interval. The number you need is embedding time per batch at your expected record volume. Don't guess at it.
Set the trigger interval to at least 1.5x your observed embedding time. If embedding 500 records takes 20 seconds, your trigger interval should be at least 30 seconds. That buffer absorbs variance without letting batches queue.
Use maxFilesPerTrigger to cap batch size. During normal operation this controls throughput. During catchup scenarios — stream restart after downtime, large backlog of files — it prevents a single oversized batch from overwhelming your embedding step.
.option("maxFilesPerTrigger", 10)
.trigger(processingTime="30 seconds")
The specific numbers are placeholders. The point is that both settings should be derived from profiling, not from defaults or intuition.
What to Instrument
Instrument these four things from day one:
Batch size — record count per micro-batch. Tells you if maxFilesPerTrigger is doing its job and surfaces catchup scenarios early.
Embedding time — wall clock time for the embed call per batch. This is your ground truth for trigger interval calibration and the first place to look when the stream falls behind.
Write time — LanceDB table.add() duration. Usually fast, but worth tracking separately so embedding and write latency don't get conflated in your logs.
Batch ID — log it on entry, on successful completion, and on failure. When something goes wrong you want to know exactly which batch it was and whether the idempotency check fired.
The Seam in Context
foreachBatch is where the Spark execution model ends and the LanceDB write model begins. Understanding that boundary clearly — what Spark guarantees, what it doesn't, and what you're responsible for on the other side — is the difference between a streaming pipeline that holds up in production and one that produces subtle, hard-to-diagnose problems at the worst possible moment.
Idempotency and throughput profiling aren't advanced optimizations. They're table stakes for any production streaming job writing to a non-transactional sink. Build them in from the start.
Next: Part 4 — LanceDB in Production. Append-only realities, fragment accumulation, and what optimize cadence actually looks like when real data is moving.
Clarity through the chaos.