pipes

Case Study 5: Reduction in DataFrame Operations

As our organization increasingly depends on Databricks for large-scale data processing, identifying and eliminating performance bottlenecks became essential to run efficient workloads. Databricks excels in distributed computing and scalability. However, by overlooking inefficient operations we suffered major performance degradation. We will showcase a specific scenario in our daily operational workflow, where we eliminated a small, but critical common pitfall and were able to reduce our task’s processing time by half.

Problem

In distributed systems like Spark, bottlenecks often arise due to carelessly employing inefficient I/O operations, redundant data transformations, or poor resource management. When working with Spark DataFrames, the following operations can introduce significant bottlenecks:

To illustrate the principles of optimizing Spark DataFrame operations, we review our daily data pipeline that processes and writes large volumes of data. The original implementation of the processing task contained several inefficiencies, which we address step by step to demonstrate the performance gains achieved by applying best practices.

After performing heavy processing tasks, we separated the results into two categories by filtering the DataFrame:

df_ok = df.filter("erroy_type is None")

df_failed = df.filter("error_type is None")

These operations caused multiple filters on the same DataFrame, resulting in multiple data passes. Additionally, writing each DataFrame separately increased the number of I/O operations, exacerbating performance issues. Therefore, a simple, redundant operation became an increasingly significant performance bottleneck as the processing logic grew more complex.

Challenges & Solutions

To address these issues, we redesigned the filtering and post-filtering stage in our pipeline:

df = df.withColumn("failed", col("error_type").isNull())

df = save_and_read(df, output_table, partition_by=["failed"])

This refactoring writes the DataFrame once and uses partitioning. Partitioning the DataFrame based on the failed column allows Spark to efficiently segregate the data into partitions where there were errors or no errors. This reduces the number of write operations and leverages Spark’s optimized partitioning mechanisms by skipping irrelevant partitions.

(1) Minimize redundant DataFrame transformations:

Our refactoring combined multiple operations into a single step, and by adding flags we eliminated redundant filtering. The trick was to declare a conditional column for the filter and prevent multiple passes over the data, halving the computational load.

(2) Leverage partitioning for large datasets:

Since failed is a boolean, partitioning on this column results in two partitions. This is generally efficient, but data skew has to be kept in mind.

(3) Efficient data access:

Another favourful consequence of the partitioning is the more compact data management, while still allowing Spark to read only the relevant partitions when executing queries, leading to faster read times and reduced computational overhead.

Overall, our suboptimal DataFrame operations sabotaged Databricks’ and Spark’s advantages at processing large datasets. By following simple best practices like reducing redundant operations, using partitioning, and optimizing I/O, performance and scalability was significantly enhanced.

Let’s wrap it up with some conclusions.