Rule S015
first() or last() inside .agg() without orderBy() after .agg() — non-deterministic result
Severity
🟡 MEDIUM — Non-deterministic results across runs.
PySpark version
Compatible with PySpark 1.3 and later.
Information
Using first() or last() as aggregation functions inside .groupBy().agg() without an explicit ordering is non-deterministic. After a groupBy(), Spark performs a hash-based shuffle that redistributes rows across partitions. The order in which rows arrive at each executor depends on:
- The number of partitions and the hash function output
- Network timing and executor load
- Data locality and memory pressure
Because of this, the "first" or "last" row within each group is effectively random and can change between runs, even on the same cluster with the same data. This leads to:
- Silent correctness bugs — the pipeline produces different results each time without any error
- Unreproducible analytics — dashboards, reports, or ML features that shift between runs
- Hard-to-debug issues — the non-determinism only manifests at scale; local tests with small data often appear stable because a single partition preserves insertion order
Why does a shuffle break ordering?
Run 1 Run 2
┌─────────────┐ ┌─────────────┐
│ Partition 0 │ │ Partition 0 │
│ row A ◄── first() │ row C ◄── first()
│ row B │ row A
│ row C │ row B
└─────────────┘ └─────────────┘
Each shuffle redistributes rows differently. Without ordering, first() picks whichever row happens to land first in the partition — a different row every time.
Best practices
- Add an explicit
orderBy()orsort()after the.agg()to guarantee a deterministic ordering of the aggregated result - Alternatively, replace
first()/last()with a deterministic aggregate such asmin(),max(), or a Window function with an explicitorderBy
Rule of thumb: Never rely on first() or last() after a groupBy() without an explicit sort after the .agg() — the result is a coin flip at cluster scale.
Example
Bad:
# Non-deterministic: first() picks an arbitrary row per group
df.groupBy("user_id").agg(first("email"))
# Same issue with last()
df.groupBy("user_id").agg(last("login_date"), first("email"))
# F-qualified calls are also flagged
df.groupBy("user_id").agg(F.first("email"))
# orderBy before or between groupBy and agg does NOT help
df.orderBy("created_at").groupBy("user_id").agg(first("email"))
df.groupBy("user_id").orderBy("created_at").agg(first("email"))
Good:
# orderBy after .agg() ensures deterministic output
df.groupBy("user_id").agg(first("email")).orderBy("created_at")
# sort() after .agg() works the same way
df.groupBy("user_id").agg(first("email")).sort("created_at")
# Use a deterministic aggregate instead
df.groupBy("user_id").agg(min("email"))
# Use a Window function with explicit ordering
from pyspark.sql.window import Window
w = Window.partitionBy("user_id").orderBy("created_at")
df.withColumn("first_email", first("email").over(w))