Contents

How To Read A Trino Distributed EXPLAIN Plan

The main rule:

Trino usually prints the root fragment first.
Data usually flows from source fragments toward fragment 0.

So the first pass over a distributed plan should not be:

Fragment 0, then Fragment 1, then Fragment 2

The better habit is:

Find each RemoteSource.
Use it to rebuild the dependency chain.
Read the plan from the source fragments back to the root.

For this note, the example is a row-level Iceberg DELETE. It is more useful than a one-fragment read query because it shows a real distributed shape:

source scan
  -> worker-side writer
  -> coordinator-side commit

The Iceberg details matter, but they are not the main lesson. The main lesson is how to read fragments, RemoteSource, LocalExchange, estimates, and runtime facts without mixing them together.

If I can read this delete plan, a normal SELECT plan should be easier. A SELECT uses the same fragment and exchange model, but it usually ends in result-producing operators instead of MergeWriter and TableCommit.

The example query is:

EXPLAIN (TYPE DISTRIBUTED)
DELETE FROM iceberg.write_trace.orders_delta
WHERE orderkey = 1;

The table is an Iceberg table. The predicate is on orderkey, not the partition column, so this is the row-level delete path rather than a simple metadata delete.

The plan shape can be summarized as:

Fragment 0 [COORDINATOR_ONLY]
  TableCommit
    LocalExchange[SINGLE]
      RemoteSource[sourceFragmentIds = [1]]

Fragment 1 [MERGE [insert = HASH]]
  MergeWriter
    LocalExchange[MERGE ...]
      RemoteSource[sourceFragmentIds = [2]]

Fragment 2 [SOURCE]
  ScanFilterProject
    table = iceberg.write_trace.orders_delta
    filterPredicate = orderkey = 1
    operation := DELETE
    field := $merge_row_id(...)

Printed order:

Fragment 0
Fragment 1
Fragment 2

Data-flow order:

Fragment 2
  -> Fragment 1
  -> Fragment 0

That reversal is the first thing to notice.

A fragment is one distributed section of the query plan. At runtime, it becomes stage work. That stage can run tasks, drivers, and operators.

A fragment is not a file, a row group, a single worker or a single thread.

It is a plan section with an input shape and an output shape.

Trino creates fragment boundaries when pages need to move between parts of the plan. Common reasons include:

  • source scan output needs to feed another stage
  • rows need to be repartitioned for a join, aggregation, or writer
  • distributed writer output needs to be gathered for a final commit
  • final results need to be gathered for the client

In this example:

Fragment 2:
  reads table data

Fragment 1:
  writes delete information from matching rows

Fragment 0:
  commits the Iceberg metadata change

The labels help:

[SOURCE]:
  starts from connector splits and table scan work

[MERGE [insert = HASH]]:
  row-change writer fragment with merge-style partitioning

[COORDINATOR_ONLY]:
  work that runs once on the coordinator

RemoteSource is the most important line when reading a distributed plan.

This line:

RemoteSource[sourceFragmentIds = [2]]

means:

the current fragment reads pages produced by fragment 2

In the example:

Fragment 1 has RemoteSource[sourceFragmentIds = [2]]

So fragment 1 depends on fragment 2.

And:

Fragment 0 has RemoteSource[sourceFragmentIds = [1]]

So fragment 0 depends on fragment 1.

That gives the actual dependency chain:

Fragment 2
  -> Fragment 1
  -> Fragment 0

This is the habit to keep:

1. Find Fragment 0.
2. Find its RemoteSource.
3. Jump to that source fragment.
4. Repeat until reaching SOURCE fragments.
5. Read the data flow from the deepest source back to Fragment 0.

Fragment numbers are useful labels. They are not the execution story by themselves.

RemoteSource crosses fragment boundaries.

LocalExchange reshapes pages inside a fragment.

In fragment 0:

LocalExchange[partitioning = SINGLE]

This gathers writer output into one stream before TableCommit.

That makes sense because an Iceberg commit is a single metadata update. Workers can write data or delete artifacts in parallel, but the final table metadata commit should happen once.

In fragment 1:

LocalExchange[partitioning = MERGE [insert = HASH],
              arguments = [operation::tinyint, orderstatus::varchar]]

This arranges pages for the merge writer. The details are writer-specific, but the general pattern is:

RemoteSource:
  bring pages from another fragment

LocalExchange:
  gather, repartition, or arrange pages locally before the next operator

So when I see both together, I read them as two different movement layers:

RemoteSource:
  inter-fragment page movement

LocalExchange:
  intra-fragment page movement

For this plan, the source fragment is:

Fragment 2 [SOURCE]
  ScanFilterProject[
      table = iceberg:write_trace.orders_delta$data@...,
      filterPredicate = (orderkey = integer '1')]

This is where table data enters the plan.

ScanFilterProject tells me three things are fused together in the source work:

scan:
  read rows from the connector

filter:
  keep rows where orderkey = 1

project:
  output the symbols needed by the rest of the plan

For a row-level delete, the output is not only user table columns. Trino also needs control columns:

operation := tinyint '2'
case_number := integer '0'
insert_from_update := tinyint '0'
field := $merge_row_id(...)

The important ones here:

operation := tinyint '2'
  marks the row as a delete operation

field := $merge_row_id(...)
  carries the row identity needed by the Iceberg connector

The $merge_row_id contains fields such as:

"_file"
"_pos"
"partition_spec_id"
"partition_data"
"source_row_id"

That is the clue that this is not an in-place Parquet edit. Trino reads the matching row, carries the file and row-position identity forward, and lets the Iceberg write path commit delete metadata.

For the purpose of reading EXPLAIN, the bigger point is:

Fragment 2 produces pages with both user columns and hidden control columns.
Fragment 1 consumes those pages through RemoteSource[sourceFragmentIds = [2]].

Fragment 1 consumes the output of fragment 2:

Fragment 1 [MERGE [insert = HASH]]
  MergeWriter
    LocalExchange[partitioning = MERGE [insert = HASH],
                  arguments = [operation::tinyint, orderstatus::varchar]]
      RemoteSource[sourceFragmentIds = [2]]

The operator to notice is:

MergeWriter

For this delete, MergeWriter receives pages containing:

the operation code
the row id
the columns needed by the writer

It sends that information into the connector’s row-change write path.

For Iceberg:

format v2:
  row-level deletes are represented with position delete files

format v3:
  row-level deletes can use deletion vectors

That storage detail belongs to the connector. From the Trino plan-reading angle, I only need this summary:

Fragment 1 is the worker-side write fragment.
It does not commit the table by itself.
It produces writer output for the root commit fragment.

Fragment 0 is printed first because it is the root:

Fragment 0 [COORDINATOR_ONLY]
  Output[columnNames = [rows]]
    TableCommit[target = iceberg:write_trace.orders_delta$data@...]
      LocalExchange[partitioning = SINGLE]
        RemoteSource[sourceFragmentIds = [1]]

This fragment depends on fragment 1:

RemoteSource[sourceFragmentIds = [1]]

Then it gathers the writer output:

LocalExchange[partitioning = SINGLE]

Then it runs:

TableCommit

This is the final connector commit. For the Iceberg row-level delete path, the commit is a RowDelta. Iceberg records delete metadata in a new snapshot instead of modifying the original Parquet file in place.

The useful write-query mental model:

workers:
  write data files, delete files, or other connector artifacts

coordinator:
  commits the final table metadata update

That is why write plans often have a worker-side writer fragment and a coordinator-only commit fragment.

EXPLAIN shows the planned shape. It does not read the data.

Lines like this are planner estimates:

Estimates: {rows: ..., cpu: ..., memory: ..., network: ...}

They are useful for understanding what the optimizer thinks might happen, but they are not proof of actual work performed.

The separation to remember:

EXPLAIN:
  planned operators, fragments, exchanges, symbols, estimates

EXPLAIN ANALYZE:
  runs the query and adds runtime measurements

For a read query, EXPLAIN ANALYZE can show facts such as:

Input rows
Physical input bytes
CPU time
Scheduled time
Split count
Splits generation wait time

So this line in a plan:

TableScan[... constraint on [o_orderstatus]]

means the planned scan has a connector constraint.

It does not by itself prove:

how many bytes were read
how many splits ran
how long the scan took
how many rows actually flowed through the operator

Use EXPLAIN for plan shape. Use EXPLAIN ANALYZE for runtime evidence.

For writes, be careful: EXPLAIN ANALYZE executes the write. It is not a dry run.

Plan text How to read it
Fragment N [...] One distributed plan section that becomes stage work at runtime.
[SOURCE] Fragment starts from connector scan/split work.
[COORDINATOR_ONLY] Fragment runs only on the coordinator. Common for final commit work.
RemoteSource[sourceFragmentIds = [N]] Current fragment reads pages produced by fragment N.
LocalExchange[partitioning = SINGLE] Gather pages into one local stream.
LocalExchange[partitioning = HASH/MERGE/...] Repartition or arrange pages inside the fragment.
TableScan Connector table data enters the engine.
ScanFilterProject Scan, filter, and projection are represented together.
MergeWriter Worker-side row-change writer path.
TableCommit Final connector commit path.

The delete example is intentionally a harder plan because it includes a write path and a final commit. A normal SELECT plan usually uses the same reading routine, but the operators are more familiar:

SELECT operator How to read it
TableScan Data enters from a connector table.
ScanFilterProject Scan, filter, and projection are combined near the source.
Filter Rows are removed by a predicate.
Project Columns or expressions are selected or computed.
Aggregation / HashAggregation Rows are grouped and aggregate state is built.
Join / LookupJoin / HashJoin Rows from two inputs are matched.
Sort Rows are ordered.
TopN Ordered rows are limited, often for ORDER BY ... LIMIT.
Limit The output row count is capped.
Window Window functions are evaluated over partitions or ordered rows.
Exchange / RemoteSource Pages move between fragments.
LocalExchange Pages are gathered or repartitioned inside a fragment.
Output Final result symbols are returned to the client.

So the comparison is:

DELETE:
  scan rows -> write delete artifacts -> commit table metadata

SELECT:
  scan rows -> filter/project/join/aggregate/sort -> return result rows

The fragment-reading habit is the same. The write-specific operators disappear.

When I read a distributed EXPLAIN, I want to do this:

1. Identify the root fragment, usually Fragment 0.
2. Follow each RemoteSource[sourceFragmentIds = [...]] dependency.
3. Reconstruct the source-to-root data flow.
4. Find where table data enters: TableScan or ScanFilterProject.
5. Find where pages move between fragments: RemoteSource.
6. Find where pages are rearranged inside a fragment: LocalExchange.
7. Use Layout and symbol mappings to understand the data flowing between operators.
8. Treat Estimates as planner guesses.
9. Switch to EXPLAIN ANALYZE only when runtime evidence is needed.

For the delete example, the compact reading is:

Fragment 2:
  scan Iceberg table rows where orderkey = 1
  create delete operation rows with $merge_row_id

Fragment 1:
  receive those rows through RemoteSource[2]
  arrange them for MergeWriter
  write delete information through the connector

Fragment 0:
  receive writer output through RemoteSource[1]
  gather it into one stream
  commit the Iceberg RowDelta

That is the part worth remembering. The printed plan starts at fragment 0, but the data story starts at the source fragment.

Questions to answer without looking back:

  • Why is fragment 0 often printed before the source fragment?
  • What does RemoteSource[sourceFragmentIds = [2]] mean?
  • What is the difference between RemoteSource and LocalExchange?
  • Why is Fragment 0 [COORDINATOR_ONLY] common in write plans?
  • What does ScanFilterProject tell me?
  • Why does a row-level Iceberg delete need $merge_row_id?
  • Why are Estimates not runtime proof?
  • When is EXPLAIN ANALYZE useful, and why is it dangerous for write queries?

Related Content