Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 253: Add file skipping with delta #254

Merged

Conversation

alexeiakimov
Copy link
Contributor

@alexeiakimov alexeiakimov commented Jan 12, 2024

Description

This is a rework of the query implementation. This PR uses internal Delta query engine always
except the queries with sampling clause. For the later the Qbeast engine is used.

Type of change

This is an internal change, neither API nor data format are affected.

Checklist:

  • New feature / bug fix has been committed following the Contribution guide.
  • Add comments to the code (make it easier for the community!).
  • Change the documentation.
  • Add tests.
  • Your branch is updated to the main branch (dependent changes have been merged).

How Has This Been Tested? (Optional)

DefaultFileFormatTest is added to test the new logic in the query engine

Copy link

codecov bot commented Jan 12, 2024

Codecov Report

Attention: 9 lines in your changes are missing coverage. Please review.

Comparison is base (e423a66) 91.02% compared to head (4536cc2) 90.65%.
Report is 11 commits behind head on main-1.0.0.

Files Patch % Lines
...n/scala/io/qbeast/spark/delta/EmptyFileIndex.scala 33.33% 4 Missing ⚠️
...he/spark/sql/delta/DeltaStatsCollectionUtils.scala 92.00% 2 Missing ⚠️
...scala/io/qbeast/spark/delta/DefaultFileIndex.scala 95.00% 1 Missing ⚠️
...qbeast/spark/delta/SamplingListFilesStrategy.scala 97.14% 1 Missing ⚠️
.../qbeast/spark/internal/rules/SaveAsTableRule.scala 94.11% 1 Missing ⚠️
Additional details and impacted files
@@              Coverage Diff               @@
##           main-1.0.0     #254      +/-   ##
==============================================
- Coverage       91.02%   90.65%   -0.37%     
==============================================
  Files              95       98       +3     
  Lines            2528     2569      +41     
  Branches          323      339      +16     
==============================================
+ Hits             2301     2329      +28     
- Misses            227      240      +13     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@osopardo1
Copy link
Member

I will approve this PR, but before merging I am waiting to understand if benchmarking the solution is necessary.

@fpj
Copy link
Contributor

fpj commented Mar 19, 2024

I gave a shot at this PR. I tested it locally with the NYC Yellow Cab trip data:

https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

I have fetched the parquet files from 2022 and 2023, and ingested them as a qbeast table. There are changes to the schema across the files, so I ended up executing the following to ingest the data:

def readParquetFile(filePath: String): DataFrame = {
        spark.read.parquet(filePath)
     }

parquetFiles.foreach { parquetFile =>
        val df = readParquetFile(parquetFile)
        val dfCasted = df.withColumn("VendorID", df("VendorID").cast(LongType)).
                                     withColumn("passenger_count", df("passenger_count").cast(LongType)).
                                     withColumn("RatecodeID", df("RatecodeID").cast(LongType)).
                                     withColumn("PULocationID", df("PULocationID").cast(LongType)).
                                     withColumn("DOLocationID", df("DOLocationID").cast(LongType))
        dfCasted.write.format("qbeast").mode("append").
                                  option("columnsToIndex", "fare_amount,trip_distance,passenger_count").
                                  option("cubeSize", "50000").
                                  save("/path/nyc-qbeast-table")
      }

I executed the following test query without the changes in this PR:

"SELECT count(*) FROM table WHERE (fare_amount > 100) AND (passenger_count = 3)"

and I got bad numbers:

scala> spark.time(nyc_qbeast_query.show)
+--------+
|count(1)|
+--------+
|   12282|
+--------+

Time taken: 20671 ms

scala> spark.time(nyc_qbeast_query.show)
+--------+
|count(1)|
+--------+
|   12282|
+--------+

Time taken: 19528 ms

with the changes in this PR, the execution time becomes a lot better:

scala> spark.time(nyc_qbeast_query.show)
+--------+                                                                      
|count(1)|
+--------+
|   12282|
+--------+

Time taken: 1283 ms

scala> spark.time(nyc_qbeast_query.show)
+--------+
|count(1)|
+--------+
|   12282|
+--------+

Time taken: 1158 ms

@osopardo1
Copy link
Member

Great test @fpj !!! Thanks for giving it a try 💯
I would suggest to also output some performance numbers for the sampling operation. Something like:

SELECT count(*) FROM table TABLESAMPLE(1 PERCENT)

@fpj
Copy link
Contributor

fpj commented Mar 20, 2024

I'm not seeing any significant difference when sampling between the execution time without the changes here and with the changes. Here is what I'm seeing:

// Query: "SELECT count(*) FROM nyc_qbeast_table TABLESAMPLE(1 PERCENT)"

//
// Runs without the changes in this PR
//
scala> spark.time(nyc_qbeast_query.show)
+--------+                                                                      
|count(1)|
+--------+
|  830160|
+--------+

Time taken: 24262 ms

scala> spark.time(nyc_qbeast_query.show)
+--------+                                                                      
|count(1)|
+--------+
|  830160|
+--------+

Time taken: 10625 ms

scala> spark.time(nyc_qbeast_query.show)
+--------+
|count(1)|
+--------+
|  830160|
+--------+

Time taken: 8200 ms

scala> spark.time(nyc_qbeast_query.show)
+--------+
|count(1)|
+--------+
|  830160|
+--------+

Time taken: 9059 ms

//
// Runs with the changes in this PR
//

scala> spark.time(nyc_qbeast_query.show)
+--------+                                                                      
|count(1)|
+--------+
|  830160|
+--------+

Time taken: 25136 ms

scala> spark.time(nyc_qbeast_query.show)
+--------+                                                                      
|count(1)|
+--------+
|  830160|
+--------+

Time taken: 10750 ms

scala> spark.time(nyc_qbeast_query.show)
+--------+
|count(1)|
+--------+
|  830160|
+--------+

Time taken: 8774 ms

scala> spark.time(nyc_qbeast_query.show)
+--------+                                                                      
|count(1)|
+--------+
|  830160|
+--------+

Time taken: 8725 ms

@osopardo1 Is this behavior expected?

@osopardo1
Copy link
Member

Yes @fpj , the behaviour is expected since the filters of sampling should be applied in the same way as before.

Now, if this PR has shown improvement in WHERE clauses and no damage at SAMPLING, I think it's ready for merge 👍

@osopardo1 osopardo1 self-requested a review March 20, 2024 15:04
@fpj fpj changed the title 253 add file skipping with delta Issue 253: Add file skipping with delta Mar 22, 2024
@fpj fpj merged commit ba089b2 into Qbeast-io:main-1.0.0 Mar 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants