Skip to content

Rule S015

first() or last() inside .agg() without orderBy() after .agg() — non-deterministic result

Severity

🟡 MEDIUM — Non-deterministic results across runs.

PySpark version

Compatible with PySpark 1.3 and later.

Information

Using first() or last() as aggregation functions inside .groupBy().agg() without an explicit ordering is non-deterministic. After a groupBy(), Spark performs a hash-based shuffle that redistributes rows across partitions. The order in which rows arrive at each executor depends on:

  • The number of partitions and the hash function output
  • Network timing and executor load
  • Data locality and memory pressure

Because of this, the "first" or "last" row within each group is effectively random and can change between runs, even on the same cluster with the same data. This leads to:

  • Silent correctness bugs — the pipeline produces different results each time without any error
  • Unreproducible analytics — dashboards, reports, or ML features that shift between runs
  • Hard-to-debug issues — the non-determinism only manifests at scale; local tests with small data often appear stable because a single partition preserves insertion order

Why does a shuffle break ordering?

Run 1                              Run 2
┌─────────────┐                    ┌─────────────┐
│ Partition 0  │                    │ Partition 0  │
│  row A  ◄── first()              │  row C  ◄── first()
│  row B                           │  row A
│  row C                           │  row B
└─────────────┘                    └─────────────┘

Each shuffle redistributes rows differently. Without ordering, first() picks whichever row happens to land first in the partition — a different row every time.

Best practices

  • Add an explicit orderBy() or sort() after the .agg() to guarantee a deterministic ordering of the aggregated result
  • Alternatively, replace first() / last() with a deterministic aggregate such as min(), max(), or a Window function with an explicit orderBy

Rule of thumb: Never rely on first() or last() after a groupBy() without an explicit sort after the .agg() — the result is a coin flip at cluster scale.

Example

Bad:

# Non-deterministic: first() picks an arbitrary row per group
df.groupBy("user_id").agg(first("email"))

# Same issue with last()
df.groupBy("user_id").agg(last("login_date"), first("email"))

# F-qualified calls are also flagged
df.groupBy("user_id").agg(F.first("email"))

# orderBy before or between groupBy and agg does NOT help
df.orderBy("created_at").groupBy("user_id").agg(first("email"))
df.groupBy("user_id").orderBy("created_at").agg(first("email"))

Good:

# orderBy after .agg() ensures deterministic output
df.groupBy("user_id").agg(first("email")).orderBy("created_at")

# sort() after .agg() works the same way
df.groupBy("user_id").agg(first("email")).sort("created_at")

# Use a deterministic aggregate instead
df.groupBy("user_id").agg(min("email"))

# Use a Window function with explicit ordering
from pyspark.sql.window import Window
w = Window.partitionBy("user_id").orderBy("created_at")
df.withColumn("first_email", first("email").over(w))