Skip to content

Rule PERF007

DataFrame used 2 or more times without caching

Severity

🟡 MEDIUM — Moderate performance impact.

Experimental rule

This rule is experimental. Detection is limited to .join(), .union(), and .unionByName() calls — other DataFrame operations (e.g. show, collect, count) are intentionally ignored. False positives and false negatives are possible; review every finding before acting on it.

PySpark version

Compatible with PySpark 1.3 and later.

Information

Spark uses lazy evaluation: every time you trigger an action on a DataFrame, Spark walks back up the entire lineage graph and re-executes every transformation from the source. If the same DataFrame is used as input in two or more places without being cached, all of that upstream work is repeated for each branch.

source ──► filter ──► join  ──► write A
                 └──► agg   ──► write B

Without caching filter's result, Spark reads the source and applies the filter twice — once for write A and once for write B.

Consequences: - Doubled (or worse) read and compute costs — every uncached fork multiplies the upstream work - Non-determinism with non-idempotent sources (Kafka, random sampling, etc.) — the two re-executions may return different data - Longer wall-clock time in iterative pipelines, ML feature pipelines, and any workflow where a cleaned or filtered base DataFrame feeds multiple outputs

Adding .cache() (or .persist(StorageLevel.…) for explicit memory/disk control) after the DataFrame is ready materialises it once, and all downstream consumers read from the cached copy.

Best practices

Cache the DataFrame immediately after the last transformation that all downstream consumers share:

# Bad — df is computed twice
df = df.filter(col('country') == 'US')
df2 = df.join(cities, 'city_id')          # full DAG re-executed
df3 = df.groupBy('city').count()          # full DAG re-executed again
# Good — df is computed once
df = df.filter(col('country') == 'US')
df = df.cache()                           # materialise once
df2 = df.join(cities, 'city_id')          # reads from cache
df3 = df.groupBy('city').count()          # reads from cache

Use .persist(StorageLevel.MEMORY_AND_DISK) (or another explicit level) when the DataFrame may be too large to fit entirely in memory, or when you need the data to survive executor failures.

from pyspark import StorageLevel

df = df.filter(col('country') == 'US')
df = df.persist(StorageLevel.MEMORY_AND_DISK)
df2 = df.join(cities, 'city_id')
df3 = df.groupBy('city').count()

# ... when done ...
df.unpersist()

Rule of thumb: If the same DataFrame feeds more than one downstream computation, cache it.