From b7e84dda1fb2a05b565a9d790c2e1a05fe2a8a25 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 18 May 2023 11:31:47 +0100 Subject: [PATCH 1/5] Don't optimize AnalyzeExec (#6379) --- datafusion/core/src/physical_plan/planner.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 3532a6949a33..35b209c7c50b 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -1192,11 +1192,9 @@ impl DefaultPhysicalPlanner { "Unsupported logical plan: Distinct should be replaced to Aggregate".to_string(), )) } - LogicalPlan::Analyze(a) => { - let input = self.create_initial_plan(&a.input, session_state).await?; - let schema = SchemaRef::new((*a.schema).clone().into()); - Ok(Arc::new(AnalyzeExec::new(a.verbose, input, schema))) - } + LogicalPlan::Analyze(_) => Err(DataFusionError::Internal( + "Unsupported logical plan: Analyze must be root of the plan".to_string(), + )), LogicalPlan::Extension(e) => { let physical_inputs = self.create_initial_plan_multi(e.node.inputs(), session_state).await?; @@ -1851,6 +1849,10 @@ impl DefaultPhysicalPlanner { stringified_plans, e.verbose, )))) + } else if let LogicalPlan::Analyze(a) = logical_plan { + let input = self.create_physical_plan(&a.input, session_state).await?; + let schema = SchemaRef::new((*a.schema).clone().into()); + Ok(Some(Arc::new(AnalyzeExec::new(a.verbose, input, schema)))) } else { Ok(None) } From 991ab182b6ae66b53f0fe62a93b6a739185310c7 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 30 May 2023 16:33:05 -0400 Subject: [PATCH 2/5] Rewrite explain analyze to handle arbitrary inputs --- .../physical_optimizer/pipeline_checker.rs | 2 +- datafusion/core/src/physical_plan/analyze.rs | 158 +++++++++--------- datafusion/core/tests/sql/explain_analyze.rs | 20 ++- 3 files changed, 103 insertions(+), 77 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs b/datafusion/core/src/physical_optimizer/pipeline_checker.rs index 1421e25be785..b12c4ef93fc8 100644 --- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs +++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs @@ -387,7 +387,7 @@ mod sql_tests { }; let test2 = UnaryTestCase { source_type: SourceType::Unbounded, - expect_fail: true, + expect_fail: false, }; let case = QueryCase { sql: "EXPLAIN ANALYZE SELECT * FROM test".to_string(), diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs index 08b5bb34ed87..5b588e22e456 100644 --- a/datafusion/core/src/physical_plan/analyze.rs +++ b/datafusion/core/src/physical_plan/analyze.rs @@ -28,10 +28,12 @@ use crate::{ }, }; use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; -use futures::StreamExt; +use futures::{FutureExt, StreamExt, TryFutureExt}; +use tokio::task::JoinSet; use super::expressions::PhysicalSortExpr; -use super::{stream::RecordBatchReceiverStream, Distribution, SendableRecordBatchStream}; +use super::stream::RecordBatchStreamAdapter; +use super::{Distribution, SendableRecordBatchStream}; use crate::execution::context::TaskContext; /// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input, @@ -73,7 +75,7 @@ impl ExecutionPlan for AnalyzeExec { /// Specifies we want the input as a single stream fn required_input_distribution(&self) -> Vec { - vec![Distribution::SinglePartition] + vec![Distribution::UnspecifiedDistribution] } /// Specifies whether this plan generates an infinite stream of records. @@ -121,96 +123,102 @@ impl ExecutionPlan for AnalyzeExec { ))); } - // should be ensured by `SinglePartition` above - let input_partitions = self.input.output_partitioning().partition_count(); - if input_partitions != 1 { - return Err(DataFusionError::Internal(format!( - "AnalyzeExec invalid number of input partitions. Expected 1, got {input_partitions}" - ))); + // Gather futures that will run each input partition using a + // JoinSet to cancel outstanding futures on drop + let mut set = JoinSet::new(); + let num_input_partitions = self.input.output_partitioning().partition_count(); + + for input_partition in 0..num_input_partitions { + let input_stream = self.input.execute(input_partition, context.clone()); + + set.spawn(async move { + let mut total_rows = 0; + let mut input_stream = input_stream?; + while let Some(batch) = input_stream.next().await { + let batch = batch?; + total_rows += batch.num_rows(); + } + Ok(total_rows) + }); } - let (tx, rx) = tokio::sync::mpsc::channel(input_partitions); + // Turn the tasks in the JoinSet into a stream of + // Result representing the counts of each output + // partition. + let counts_stream = futures::stream::unfold(set, |mut set| async { + let next = set.join_next().await?; // returns Some when empty + // translate join errors (aka task panic's) into ExecutionErrors + let next = match next { + Ok(res) => res, + Err(e) => Err(DataFusionError::Execution(format!( + "Join error in AnalyzeExec: {e}" + ))), + }; + Some((next, set)) + }); + let start = Instant::now(); let captured_input = self.input.clone(); - let mut input_stream = captured_input.execute(0, context)?; let captured_schema = self.schema.clone(); let verbose = self.verbose; - // Task reads batches the input and when complete produce a - // RecordBatch with a report that is written to `tx` when done - let join_handle = tokio::task::spawn(async move { - let start = Instant::now(); - let mut total_rows = 0; - - // Note the code below ignores errors sending on tx. An - // error sending means the plan is being torn down and - // nothing is left that will handle the error (aka no one - // will hear us scream) - while let Some(b) = input_stream.next().await { - match b { - Ok(batch) => { - total_rows += batch.num_rows(); - } - b @ Err(_) => { - // try and pass on errors from input - if tx.send(b).await.is_err() { - // receiver hung up, stop executing (no - // one will look at any further results we - // send) - return; - } - } - } - } - let end = Instant::now(); - - let mut type_builder = StringBuilder::with_capacity(1, 1024); - let mut plan_builder = StringBuilder::with_capacity(1, 1024); - - // TODO use some sort of enum rather than strings? - type_builder.append_value("Plan with Metrics"); + // future that gathers the input counts into an overall output + // count, and makes an output batch + let output = counts_stream + // combine results from all input stream into a total count + .fold(Ok(0), |total_rows: Result, input_rows| async move { + Ok(total_rows? + input_rows?) + }) + // convert the total to a RecordBatch + .map(move |total_rows| { + let total_rows = total_rows?; + let end = Instant::now(); - let annotated_plan = - DisplayableExecutionPlan::with_metrics(captured_input.as_ref()) - .indent() - .to_string(); - plan_builder.append_value(annotated_plan); + let mut type_builder = StringBuilder::with_capacity(1, 1024); + let mut plan_builder = StringBuilder::with_capacity(1, 1024); - // Verbose output - // TODO make this more sophisticated - if verbose { - type_builder.append_value("Plan with Full Metrics"); + // TODO use some sort of enum rather than strings? + type_builder.append_value("Plan with Metrics"); let annotated_plan = - DisplayableExecutionPlan::with_full_metrics(captured_input.as_ref()) + DisplayableExecutionPlan::with_metrics(captured_input.as_ref()) .indent() .to_string(); plan_builder.append_value(annotated_plan); - type_builder.append_value("Output Rows"); - plan_builder.append_value(total_rows.to_string()); + // Verbose output + // TODO make this more sophisticated + if verbose { + type_builder.append_value("Plan with Full Metrics"); - type_builder.append_value("Duration"); - plan_builder.append_value(format!("{:?}", end - start)); - } + let annotated_plan = DisplayableExecutionPlan::with_full_metrics( + captured_input.as_ref(), + ) + .indent() + .to_string(); + plan_builder.append_value(annotated_plan); - let maybe_batch = RecordBatch::try_new( - captured_schema, - vec![ - Arc::new(type_builder.finish()), - Arc::new(plan_builder.finish()), - ], - ) - .map_err(Into::into); - // again ignore error - tx.send(maybe_batch).await.ok(); - }); + type_builder.append_value("Output Rows"); + plan_builder.append_value(total_rows.to_string()); + + type_builder.append_value("Duration"); + plan_builder.append_value(format!("{:?}", end - start)); + } - Ok(RecordBatchReceiverStream::create( - &self.schema, - rx, - join_handle, - )) + RecordBatch::try_new( + captured_schema, + vec![ + Arc::new(type_builder.finish()), + Arc::new(plan_builder.finish()), + ], + ) + }) + .map_err(DataFusionError::from); + + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema.clone(), + futures::stream::once(output), + ))) } fn fmt_as( diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 1ab933022d32..971dea81283f 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -687,13 +687,31 @@ async fn csv_explain_analyze() { // Only test basic plumbing and try to avoid having to change too // many things. explain_analyze_baseline_metrics covers the values // in greater depth - let needle = "CoalescePartitionsExec, metrics=[output_rows=5, elapsed_compute="; + let needle = "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))], metrics=[output_rows=5"; assert_contains!(&formatted, needle); let verbose_needle = "Output Rows"; assert_not_contains!(formatted, verbose_needle); } +#[tokio::test] +#[cfg_attr(tarpaulin, ignore)] +async fn csv_explain_analyze_order_by() { + let ctx = SessionContext::new(); + register_aggregate_csv_by_sql(&ctx).await; + let sql = "EXPLAIN ANALYZE SELECT c1 FROM aggregate_test_100 order by c1"; + let actual = execute_to_batches(&ctx, sql).await; + let formatted = arrow::util::pretty::pretty_format_batches(&actual) + .unwrap() + .to_string(); + + // Ensure that the ordering is not optimized away from the plan + // https://github.com/apache/arrow-datafusion/issues/6379 + let needle = + "SortExec: expr=[c1@0 ASC NULLS LAST], metrics=[output_rows=100, elapsed_compute"; + assert_contains!(&formatted, needle); +} + #[tokio::test] #[cfg_attr(tarpaulin, ignore)] async fn parquet_explain_analyze() { From 805f2f36c87dcaa67ca5a3fdf8f778b59c179cde Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 30 May 2023 17:25:12 -0400 Subject: [PATCH 3/5] Make it clear some inputs are ignored --- datafusion/core/src/physical_plan/analyze.rs | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs index 5b588e22e456..e2e0655c3883 100644 --- a/datafusion/core/src/physical_plan/analyze.rs +++ b/datafusion/core/src/physical_plan/analyze.rs @@ -73,23 +73,18 @@ impl ExecutionPlan for AnalyzeExec { vec![self.input.clone()] } - /// Specifies we want the input as a single stream + /// AnalyzeExec is handled specially so this value is ignored fn required_input_distribution(&self) -> Vec { - vec![Distribution::UnspecifiedDistribution] + vec![] } /// Specifies whether this plan generates an infinite stream of records. /// If the plan does not support pipelining, but its input(s) are /// infinite, returns an error to indicate this. - fn unbounded_output(&self, children: &[bool]) -> Result { - if children[0] { - Err(DataFusionError::Plan( - "Analyze Error: Analysis is not supported for unbounded inputs" - .to_string(), - )) - } else { - Ok(false) - } + fn unbounded_output(&self, _children: &[bool]) -> Result { + Err(DataFusionError::Internal( + "Optimization not supported for ANALYZE".to_string(), + )) } /// Get the output partitioning of this plan From aadf685ee8f858ae6421b84f652488057192d7e1 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 31 May 2023 13:41:45 -0400 Subject: [PATCH 4/5] Fix builds fail with error: symbol init___rust_ctor___ctor is already defined #6495 --- datafusion/optimizer/src/decorrelate_predicate_subquery.rs | 6 ------ datafusion/optimizer/src/lib.rs | 7 +++++++ datafusion/optimizer/src/scalar_subquery_to_join.rs | 6 ------ 3 files changed, 7 insertions(+), 12 deletions(-) diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs index 8630c606499b..5ecafe6e3714 100644 --- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs +++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs @@ -602,12 +602,6 @@ mod tests { assert_optimized_plan_equal(&plan, expected) } - #[cfg(test)] - #[ctor::ctor] - fn init() { - let _ = env_logger::try_init(); - } - /// Test multiple correlated subqueries /// See subqueries.rs where_in_multiple() #[test] diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index b8217e0ac7a7..7930c059ea47 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -49,3 +49,10 @@ pub use optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule}; pub use utils::optimize_children; mod plan_signature; + +#[cfg(test)] +#[ctor::ctor] +fn init() { + // Enable logging for tests + let _ = env_logger::try_init(); +} diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 26f86c607a22..04e0e0920b0c 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -307,12 +307,6 @@ mod tests { }; use std::ops::Add; - #[cfg(test)] - #[ctor::ctor] - fn init() { - let _ = env_logger::try_init(); - } - /// Test multiple correlated subqueries #[test] fn multiple_subqueries() -> Result<()> { From 5521a7031aa43a8961541f56be6bad9180de7a98 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 31 May 2023 14:11:07 -0400 Subject: [PATCH 5/5] Simplify AnalyzeExec --- datafusion/core/src/physical_plan/analyze.rs | 144 ++++++++++--------- 1 file changed, 75 insertions(+), 69 deletions(-) diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs index e2e0655c3883..84d74c512b54 100644 --- a/datafusion/core/src/physical_plan/analyze.rs +++ b/datafusion/core/src/physical_plan/analyze.rs @@ -28,7 +28,7 @@ use crate::{ }, }; use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; -use futures::{FutureExt, StreamExt, TryFutureExt}; +use futures::StreamExt; use tokio::task::JoinSet; use super::expressions::PhysicalSortExpr; @@ -118,8 +118,9 @@ impl ExecutionPlan for AnalyzeExec { ))); } - // Gather futures that will run each input partition using a - // JoinSet to cancel outstanding futures on drop + // Gather futures that will run each input partition in + // parallel (on a separate tokio task) using a JoinSet to + // cancel outstanding futures on drop let mut set = JoinSet::new(); let num_input_partitions = self.input.output_partitioning().partition_count(); @@ -133,82 +134,41 @@ impl ExecutionPlan for AnalyzeExec { let batch = batch?; total_rows += batch.num_rows(); } - Ok(total_rows) + Ok(total_rows) as Result }); } - // Turn the tasks in the JoinSet into a stream of - // Result representing the counts of each output - // partition. - let counts_stream = futures::stream::unfold(set, |mut set| async { - let next = set.join_next().await?; // returns Some when empty - // translate join errors (aka task panic's) into ExecutionErrors - let next = match next { - Ok(res) => res, - Err(e) => Err(DataFusionError::Execution(format!( - "Join error in AnalyzeExec: {e}" - ))), - }; - Some((next, set)) - }); - let start = Instant::now(); let captured_input = self.input.clone(); let captured_schema = self.schema.clone(); let verbose = self.verbose; - // future that gathers the input counts into an overall output - // count, and makes an output batch - let output = counts_stream - // combine results from all input stream into a total count - .fold(Ok(0), |total_rows: Result, input_rows| async move { - Ok(total_rows? + input_rows?) - }) - // convert the total to a RecordBatch - .map(move |total_rows| { - let total_rows = total_rows?; - let end = Instant::now(); - - let mut type_builder = StringBuilder::with_capacity(1, 1024); - let mut plan_builder = StringBuilder::with_capacity(1, 1024); - - // TODO use some sort of enum rather than strings? - type_builder.append_value("Plan with Metrics"); - - let annotated_plan = - DisplayableExecutionPlan::with_metrics(captured_input.as_ref()) - .indent() - .to_string(); - plan_builder.append_value(annotated_plan); - - // Verbose output - // TODO make this more sophisticated - if verbose { - type_builder.append_value("Plan with Full Metrics"); - - let annotated_plan = DisplayableExecutionPlan::with_full_metrics( - captured_input.as_ref(), - ) - .indent() - .to_string(); - plan_builder.append_value(annotated_plan); - - type_builder.append_value("Output Rows"); - plan_builder.append_value(total_rows.to_string()); - - type_builder.append_value("Duration"); - plan_builder.append_value(format!("{:?}", end - start)); + // future that gathers the results from all the tasks in the + // JoinSet that computes the overall row count and final + // record batch + let output = async move { + let mut total_rows = 0; + while let Some(res) = set.join_next().await { + // translate join errors (aka task panic's) into ExecutionErrors + match res { + Ok(row_count) => total_rows += row_count?, + Err(e) => { + return Err(DataFusionError::Execution(format!( + "Join error in AnalyzeExec: {e}" + ))) + } } + } - RecordBatch::try_new( - captured_schema, - vec![ - Arc::new(type_builder.finish()), - Arc::new(plan_builder.finish()), - ], - ) - }) - .map_err(DataFusionError::from); + let duration = Instant::now() - start; + create_output_batch( + verbose, + total_rows, + duration, + captured_input, + captured_schema, + ) + }; Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema.clone(), @@ -234,6 +194,52 @@ impl ExecutionPlan for AnalyzeExec { } } +/// Creates the ouput of AnalyzeExec as a RecordBatch +fn create_output_batch( + verbose: bool, + total_rows: usize, + duration: std::time::Duration, + input: Arc, + schema: SchemaRef, +) -> Result { + let mut type_builder = StringBuilder::with_capacity(1, 1024); + let mut plan_builder = StringBuilder::with_capacity(1, 1024); + + // TODO use some sort of enum rather than strings? + type_builder.append_value("Plan with Metrics"); + + let annotated_plan = DisplayableExecutionPlan::with_metrics(input.as_ref()) + .indent() + .to_string(); + plan_builder.append_value(annotated_plan); + + // Verbose output + // TODO make this more sophisticated + if verbose { + type_builder.append_value("Plan with Full Metrics"); + + let annotated_plan = DisplayableExecutionPlan::with_full_metrics(input.as_ref()) + .indent() + .to_string(); + plan_builder.append_value(annotated_plan); + + type_builder.append_value("Output Rows"); + plan_builder.append_value(total_rows.to_string()); + + type_builder.append_value("Duration"); + plan_builder.append_value(format!("{:?}", duration)); + } + + RecordBatch::try_new( + schema, + vec![ + Arc::new(type_builder.finish()), + Arc::new(plan_builder.finish()), + ], + ) + .map_err(DataFusionError::from) +} + #[cfg(test)] mod tests { use arrow::datatypes::{DataType, Field, Schema};