Stream Joins Without Joining: How Paimon Inverts the Problem

Stream Joins Without Joining: How Paimon Inverts the Problem

I spent last week deep in Apache Flink and Apache Paimon, chasing a problem that sounds trivial and absolutely is not: keep one wide, always-current row per product, assembled in real time from half a dozen systems that never coordinate with each other.

The naive solution is a streaming join, and it works right up until it doesn’t. The interesting part is the fix, which inverts the whole problem: stop joining the streams, and let the storage layer assemble the row instead. This post walks through why the obvious approach collapses under its own state, and how Paimon’s partial-update merge engine, backed by LSM trees, turns a billion-row merge into something the database was built to do.

The use case: one row per product

Imagine you run an online store. Every team wants a single source of truth per product, in near real time. It’s the row your storefront, your search index, and your analysts all read from:

product_id          42
product_name        "Aurora Lamp"
brand               "Lumen"
price               39.99
sale_ends           2026-07-01
discontinue_date    2027-03-31
review_cnt          128
open_question_cnt   3

Eight columns. Looks innocent. The catch: no single system holds all of it. Each field lives in a different platform, owned by a different team, updated on its own schedule.

Where the data lives

Those eight columns come from five source systems. Each one emits changes (inserts, updates, deletes) as they happen, a pattern called Change Data Capture, or CDC.

Source system Owns these columns Change rate
Catalog / PIM product_name, brand Daily-ish
Pricing service price, sale_ends Weekly (promotions)
Merchandising / lifecycle discontinue_date Quarterly
Reviews service review_cnt Continuous
Q&A service open_question_cnt Continuous

The product_id is the shared key, and it’s the only thing these systems agree on. A product might be created in the catalog today, get its first price next week, and never receive a single customer question. The five sources never coordinate, and they never will.

The obvious approach: join the streams

The standard real-time engine for this is Apache Flink, a stream processor that reads CDC feeds, runs SQL over them, and writes results to a sink. Joining five streams on product_id is a one-page SQL job:

INSERT INTO product_360
SELECT  c.product_id, c.product_name, c.brand,
        p.price, p.sale_ends,
        l.discontinue_date, r.review_cnt, q.open_question_cnt
FROM      catalog    c
LEFT JOIN pricing    p USING (product_id)
LEFT JOIN lifecycle  l USING (product_id)
LEFT JOIN reviews    r USING (product_id)
LEFT JOIN questions  q USING (product_id);

Five CDC streams in, one wide row out. It looks elegant. It is a trap.

Why that breaks: it’s the state

A streaming join is not a batch join. To emit an updated row whenever any side changes, Flink has to remember everything on every side, indefinitely. That requirement quietly turns a one-page query into an operational liability in three ways.

1. State explosion. Every join operator holds both sides in keyed state (typically RocksDB). Ten million products across five sources is a lot of operator state to checkpoint and restore, and it only ever grows.

2. Skew and late arrivals. A product is created today; its first price arrives next week. The join has to hold the catalog row in state the entire time, waiting for a match that might never come. Multiply by every slow-moving relationship in the graph.

3. CDC churn. A single update to one column (say, review_cnt ticking from 127 to 128) retracts and re-emits the entire wide row downstream. Now multiply by millions of small updates per hour.

The summary: streaming joins scale with your join cardinality × retention, not your event rate. For a wide row that lives forever and updates from five places, the math simply doesn’t work. So if Flink shouldn’t do the joins, what should?

A detour: open table formats

To answer that, we need a quick detour into open table formats (OTFs). These put ACID transactions, time travel, and schema evolution on top of plain columnar files (Parquet/ORC) sitting in object storage.

Format Sweet spot Update model Streaming story
Iceberg Big analytic tables, multi-engine Copy-on-write / merge-on-read Batch-leaning; v3 improving
Delta Lake Databricks ecosystem Copy-on-write (deletion vectors newer) Streaming via Spark Structured Streaming
Hudi Upserts at scale Copy-on-write & merge-on-read Strong upsert story; complex tuning
Paimon Streaming-first lakehouse LSM trees on object store Native, built for Flink CDC

What makes Paimon stand out for this problem is that it’s the only one whose storage layout is itself a merge engine. Updates are appended, not rewritten; merging rows by key is a first-class operation, not a workaround bolted on top. To see why that matters, we need thirty seconds on LSM trees.

LSM trees and compaction in 30 seconds

An LSM (Log-Structured Merge) tree absorbs writes into small sorted files at the top level (L0), then periodically compacts them into fewer, larger files at deeper levels.

flowchart TD
    W([incoming writes]) -->|append new fragments| L0["L0 · small fresh runs<br/>A:v1 · A:v2 · B:v1 · A:v3"]
    L0 -->|compaction merges fragments by key| L1["L1<br/>A:v1+v2+v3 · B:v1"]
    L1 -->|further compaction into large sorted runs| L2["L2 · large sorted runs<br/>A..M merged · N..Z merged"]

Two properties fall out of this design, and they’re exactly the two we need:

  • Writes are cheap. Appending is just dropping a new fragment at L0, with no read-modify-write of the existing row. You never have to find and rewrite the current version to record a change.
  • Reads merge on the fly. A reader stitches fragments together across levels at query time. Compaction’s whole job is to keep the number of fragments small so reads stay fast.

Hold onto that first point. “Record a change by appending a fragment, and merge fragments by key later” is the entire trick.

Paimon: an LSM tree on object storage

Paimon is, in essence, an LSM tree that lives on S3/GCS/HDFS and speaks the same file-level language as the other lakehouse formats. Here’s how it lines up against them:

What’s the same as Iceberg/Delta:

  • Parquet/ORC files on object storage
  • Snapshot and manifest metadata
  • Readable from Spark, Trino, Flink, Hive, and friends
  • Time travel, schema evolution, ACID

What’s different:

  • Primary-key tables backed by an LSM tree
  • Pluggable merge engines, configured per table
  • Bucketed writes: many writers, one shared primary-key space
  • First-class changelog producers

The implication is the whole ballgame: the table itself knows how to merge rows by primary key. So we can write partial rows to it, from many independent streams, and let the table assemble them.

Here’s the inversion. Instead of asking Flink to join five streams and emit a finished row, we ask each stream to do a plain INSERT of just its own columns into one shared Paimon table. Paimon stitches the full row together by primary key, at read time and during compaction.

Flink becomes a writer. Paimon becomes the merger.

No join state. No retraction storms. Late data converges naturally, because “late” just means “a fragment that compaction hasn’t merged yet.”

The new architecture

flowchart LR
    c1[catalog CDC] --> F
    c2[pricing CDC] --> F
    c3[lifecycle CDC] --> F
    c4[reviews CDC] --> F
    c5[questions CDC] --> F
    F["Flink writer(s)<br/>stateless · just INSERT"] --> P[("Paimon table<br/>partial-update merge engine")]

The numbers that used to scare us all collapse:

  • Join state in Flink: ~0. Just sink buffers, with no keyed state for the join itself.
  • One shared target table, keyed by product_id. Each writer sets only its own columns.
  • Skew tolerance: effectively infinite. The catalog row and the first price can arrive hours, days, or weeks apart, and nothing sits in memory waiting.

Configuring the table

The whole behavior is declared on the table. The key line is 'merge-engine' = 'partial-update':

CREATE TABLE product_360 (
  product_id BIGINT,

  -- written by the catalog stream
  product_name STRING,
  brand        STRING,

  -- written by the pricing stream
  price        DECIMAL(10, 2),
  sale_ends    DATE,

  -- lifecycle + aggregated columns
  discontinue_date  DATE,
  review_cnt        BIGINT,
  open_question_cnt BIGINT,

  PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
  'merge-engine' = 'partial-update',

  -- what a CDC delete means (covered below)
  'partial-update.remove-record-on-delete' = 'false'
);

How partial-update works

Each writer only sets its columns. A NULL means “I have no opinion; don’t clobber whatever is already there.” Paimon merges fragments by primary key, newest non-null value winning per column:

flowchart LR
    f1["catalog @ t1<br/>sets product_name, brand"] --> M
    f2["pricing @ t2 (hours later)<br/>sets price, sale_ends"] --> M
    f3["questions @ t3<br/>sets open_question_cnt"] --> M
    M{{"merge by primary key"}} --> R["assembled row · product_id 42<br/>Aurora Lamp · Lumen · 39.99<br/>sale_ends 2026-07-01 · open_question_cnt 3"]

Three independent writes, three different points in time, one assembled row. No writer ever had to read what the others wrote.

Sequence groups: why you need them

There’s a subtlety. Multiple streams write concurrently, and updates can arrive out of order within a single source. A late catalog update must not overwrite a newer catalog update, but it also shouldn’t be ranked against the pricing stream at all. Each source needs to advance on its own clock.

Without sequence groups, a single global sequence number governs every write. A late catalog fragment can stomp on a newer pricing fragment for the same row, simply because they share one ordering.

With per-source sequence groups, each column group gets its own ordering field, typically the CDC op_ts (operation timestamp) emitted by that source. Late catalog data now only competes against its own stream’s history.

'fields.product_name.sequence-group' = 'catalog_op_ts',
'fields.brand.sequence-group'        = 'catalog_op_ts',
'fields.price.sequence-group'        = 'pricing_op_ts',
'fields.sale_ends.sequence-group'    = 'pricing_op_ts',
-- the op_ts columns themselves live in the row, too
'fields.catalog_op_ts.sequence-group' = 'catalog_op_ts',
'fields.pricing_op_ts.sequence-group' = 'pricing_op_ts'

Aggregated columns: two patterns

review_cnt and open_question_cnt aren’t copied from a source; they’re derived. Paimon gives you two ways to maintain them.

Pattern A: pre-aggregate in Flink. A small Flink job counts reviews (or open questions) per product_id, then partial-updates just that one count column:

INSERT INTO product_360 (product_id, open_question_cnt, questions_op_ts)
SELECT product_id, COUNT(*), MAX(op_ts)
FROM questions_stream
GROUP BY product_id;

This handles CDC deletes cleanly: when a question is deleted, the retraction decrements the count, and Flink emits the new total.

Pattern B: Paimon aggregation functions. Write +1 / −1 deltas and let Paimon sum them per row:

'fields.review_cnt.aggregate-function'        = 'sum',
'fields.open_question_cnt.aggregate-function' = 'sum'

Cheaper to wire up, but delete semantics get tricky: you have to emit explicit −1 events yourself.

Rule of thumb: if your sources emit CDC deletes, prefer Pattern A. The small pre-aggregation job absorbs retractions for you, and the count column then behaves like any other partial-update field.

Handling deletes

Speaking of deletes: what should a delete on the pricing stream even mean to the assembled row? Paimon makes you decide, per source.

Option 1: clear those columns. Keep the product row; just null out the pricing columns (there’s simply no active price right now):

'partial-update.remove-record-on-delete' = 'false',
'fields.price.ignore-retract'            = 'false'

Option 2: drop the whole row. A delete from this source removes the merged row entirely:

'partial-update.remove-record-on-delete' = 'true'

The right answer differs by source. A “product removed from catalog” event is usually authoritative, so drop the row. A “price deleted” event usually just means “no current price”, so clear those columns and keep everything else. Paimon lets you express both, which is exactly what you want.

Reading: merge-on-read

Every query path merges fragments by primary key, so readers always see one assembled row per product_id. That guarantee isn’t free, and the cost has a name: read amplification. If too many L0 fragments pile up, every read has to merge all of them, and reads slow down.

The cure is compaction, and Paimon runs it in one of three modes:

  • In-writer: small async compaction inside the Flink writer itself.
  • Dedicated job: a separate Flink or Spark job runs compaction continuously.
  • Manual: periodic CALL compact(...) on a schedule.

The deal compaction offers is straightforward: spend some CPU and I/O at the storage layer to keep file counts low and reads fast. It’s the same merge work you’d otherwise pay for in Flink keyed state, except billions of rows merged by key is precisely what LSM trees were built for.

Downstream wants a changelog?

By default, a partial-update table emits input changelog: the raw fragments, not the assembled rows. If a downstream Flink job wants the merged row whenever it changes, configure a changelog producer.

Mode What it emits Cost
input The raw incoming fragments Cheapest; not the merged view
lookup Merged row on every change, via lookup join at commit Moderate; near-real-time
full-compaction Merged row only when full compaction runs Cheap to produce, higher latency
WITH ( 'changelog-producer' = 'lookup' )

How Paimon shards writes: buckets

One more piece makes the multi-writer story work. Paimon splits the primary-key space into buckets, and each bucket is its own independent LSM tree:

flowchart TD
    K["product_id · 17 · 42 · 88 · 105 · 1003"] --> H{{"hash(product_id) mod N"}}
    H --> B0["bucket 0<br/>17"]
    H --> B1["bucket 1<br/>42"]
    H --> B2["bucket 2<br/>88"]
    H --> B3["bucket 3<br/>105 · 1003"]

This buys three things:

  • A routing contract. The same primary key always hashes to the same bucket, so every update for product_id=42, from any writer, lands in one LSM tree. That’s what lets independent streams merge into a single coherent row.
  • A parallelism knob. Each bucket is written by one task at a time, so N buckets means up to N parallel writers. Size N to your throughput.
  • A sizing decision. 'bucket' = '64' pins the count; 'bucket' = '-1' grows it dynamically. Multi-writer partial-update tables must use a fixed count.

The one real caveat

It’s not all free. The sharp edge is concurrent writers plus compaction. All writers to the same partial-update table need compatible bucketing, and you should run either combined writers or a separate dedicated compaction job. Several independent writers left on default settings can collide on compaction commits.

The setup that stays out of trouble:

  • Use a fixed bucket count, sized to your product cardinality.
  • Disable in-writer compaction on the streaming writers ('write-only' = 'true').
  • Run one dedicated compaction job (Flink batch or streaming).
  • Monitor file counts and snapshot lag.
'bucket' = '64',
'write-only' = 'true',   -- on the streaming writers

-- ...and a separate, dedicated compaction job:
CALL sys.compact(`table` => 'db.product_360');

Where did the work go?

Step back and notice that the total amount of merge work didn’t disappear; it moved. That’s the whole story, so it’s worth making the move explicit.

  • An N-way keyed join operator sits in the pipeline.
  • It holds a huge RocksDB state that grows forever.
  • Late data forces retract-and-re-emit of the entire wide row.
  • You spend your time tuning checkpoints, savepoints, and state TTL.
  • The sink writes a row that was already fully merged in memory.

Paimon: merge in the table

  • Flink writers are stateless sinks; each stream just INSERTs its own columns.
  • Late data converges naturally on the primary key.
  • The merge happens at read time and during LSM compaction.
  • You spend your tuning budget on buckets, compaction cadence, and sequence groups.

The billion-row merge problem moves out of Flink’s keyed state and into Paimon’s LSM compaction, which is exactly the workload LSM trees were designed for in the first place.

Flink writes. Paimon merges. You stop fighting state.


Want the authoritative details? The Paimon docs cover the partial-update merge engine in full, including sequence groups, aggregation functions, and changelog producers.