diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs b/datafusion/core/src/physical_optimizer/pipeline_checker.rs index 1421e25be785..b12c4ef93fc8 100644 --- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs +++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs @@ -387,7 +387,7 @@ mod sql_tests { }; let test2 = UnaryTestCase { source_type: SourceType::Unbounded, - expect_fail: true, + expect_fail: false, }; let case = QueryCase { sql: "EXPLAIN ANALYZE SELECT * FROM test".to_string(), diff --git a/datafusion/core/src/physical_plan/analyze.rs b/datafusion/core/src/physical_plan/analyze.rs index 08b5bb34ed87..84d74c512b54 100644 --- a/datafusion/core/src/physical_plan/analyze.rs +++ b/datafusion/core/src/physical_plan/analyze.rs @@ -29,9 +29,11 @@ use crate::{ }; use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; use futures::StreamExt; +use tokio::task::JoinSet; use super::expressions::PhysicalSortExpr; -use super::{stream::RecordBatchReceiverStream, Distribution, SendableRecordBatchStream}; +use super::stream::RecordBatchStreamAdapter; +use super::{Distribution, SendableRecordBatchStream}; use crate::execution::context::TaskContext; /// `EXPLAIN ANALYZE` execution plan operator. This operator runs its input, @@ -71,23 +73,18 @@ impl ExecutionPlan for AnalyzeExec { vec![self.input.clone()] } - /// Specifies we want the input as a single stream + /// AnalyzeExec is handled specially so this value is ignored fn required_input_distribution(&self) -> Vec { - vec![Distribution::SinglePartition] + vec![] } /// Specifies whether this plan generates an infinite stream of records. /// If the plan does not support pipelining, but its input(s) are /// infinite, returns an error to indicate this. - fn unbounded_output(&self, children: &[bool]) -> Result { - if children[0] { - Err(DataFusionError::Plan( - "Analyze Error: Analysis is not supported for unbounded inputs" - .to_string(), - )) - } else { - Ok(false) - } + fn unbounded_output(&self, _children: &[bool]) -> Result { + Err(DataFusionError::Internal( + "Optimization not supported for ANALYZE".to_string(), + )) } /// Get the output partitioning of this plan @@ -121,96 +118,62 @@ impl ExecutionPlan for AnalyzeExec { ))); } - // should be ensured by `SinglePartition` above - let input_partitions = self.input.output_partitioning().partition_count(); - if input_partitions != 1 { - return Err(DataFusionError::Internal(format!( - "AnalyzeExec invalid number of input partitions. Expected 1, got {input_partitions}" - ))); + // Gather futures that will run each input partition in + // parallel (on a separate tokio task) using a JoinSet to + // cancel outstanding futures on drop + let mut set = JoinSet::new(); + let num_input_partitions = self.input.output_partitioning().partition_count(); + + for input_partition in 0..num_input_partitions { + let input_stream = self.input.execute(input_partition, context.clone()); + + set.spawn(async move { + let mut total_rows = 0; + let mut input_stream = input_stream?; + while let Some(batch) = input_stream.next().await { + let batch = batch?; + total_rows += batch.num_rows(); + } + Ok(total_rows) as Result + }); } - let (tx, rx) = tokio::sync::mpsc::channel(input_partitions); - + let start = Instant::now(); let captured_input = self.input.clone(); - let mut input_stream = captured_input.execute(0, context)?; let captured_schema = self.schema.clone(); let verbose = self.verbose; - // Task reads batches the input and when complete produce a - // RecordBatch with a report that is written to `tx` when done - let join_handle = tokio::task::spawn(async move { - let start = Instant::now(); + // future that gathers the results from all the tasks in the + // JoinSet that computes the overall row count and final + // record batch + let output = async move { let mut total_rows = 0; - - // Note the code below ignores errors sending on tx. An - // error sending means the plan is being torn down and - // nothing is left that will handle the error (aka no one - // will hear us scream) - while let Some(b) = input_stream.next().await { - match b { - Ok(batch) => { - total_rows += batch.num_rows(); - } - b @ Err(_) => { - // try and pass on errors from input - if tx.send(b).await.is_err() { - // receiver hung up, stop executing (no - // one will look at any further results we - // send) - return; - } + while let Some(res) = set.join_next().await { + // translate join errors (aka task panic's) into ExecutionErrors + match res { + Ok(row_count) => total_rows += row_count?, + Err(e) => { + return Err(DataFusionError::Execution(format!( + "Join error in AnalyzeExec: {e}" + ))) } } } - let end = Instant::now(); - - let mut type_builder = StringBuilder::with_capacity(1, 1024); - let mut plan_builder = StringBuilder::with_capacity(1, 1024); - - // TODO use some sort of enum rather than strings? - type_builder.append_value("Plan with Metrics"); - - let annotated_plan = - DisplayableExecutionPlan::with_metrics(captured_input.as_ref()) - .indent() - .to_string(); - plan_builder.append_value(annotated_plan); - - // Verbose output - // TODO make this more sophisticated - if verbose { - type_builder.append_value("Plan with Full Metrics"); - - let annotated_plan = - DisplayableExecutionPlan::with_full_metrics(captured_input.as_ref()) - .indent() - .to_string(); - plan_builder.append_value(annotated_plan); - - type_builder.append_value("Output Rows"); - plan_builder.append_value(total_rows.to_string()); - - type_builder.append_value("Duration"); - plan_builder.append_value(format!("{:?}", end - start)); - } - let maybe_batch = RecordBatch::try_new( + let duration = Instant::now() - start; + create_output_batch( + verbose, + total_rows, + duration, + captured_input, captured_schema, - vec![ - Arc::new(type_builder.finish()), - Arc::new(plan_builder.finish()), - ], ) - .map_err(Into::into); - // again ignore error - tx.send(maybe_batch).await.ok(); - }); - - Ok(RecordBatchReceiverStream::create( - &self.schema, - rx, - join_handle, - )) + }; + + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema.clone(), + futures::stream::once(output), + ))) } fn fmt_as( @@ -231,6 +194,52 @@ impl ExecutionPlan for AnalyzeExec { } } +/// Creates the ouput of AnalyzeExec as a RecordBatch +fn create_output_batch( + verbose: bool, + total_rows: usize, + duration: std::time::Duration, + input: Arc, + schema: SchemaRef, +) -> Result { + let mut type_builder = StringBuilder::with_capacity(1, 1024); + let mut plan_builder = StringBuilder::with_capacity(1, 1024); + + // TODO use some sort of enum rather than strings? + type_builder.append_value("Plan with Metrics"); + + let annotated_plan = DisplayableExecutionPlan::with_metrics(input.as_ref()) + .indent() + .to_string(); + plan_builder.append_value(annotated_plan); + + // Verbose output + // TODO make this more sophisticated + if verbose { + type_builder.append_value("Plan with Full Metrics"); + + let annotated_plan = DisplayableExecutionPlan::with_full_metrics(input.as_ref()) + .indent() + .to_string(); + plan_builder.append_value(annotated_plan); + + type_builder.append_value("Output Rows"); + plan_builder.append_value(total_rows.to_string()); + + type_builder.append_value("Duration"); + plan_builder.append_value(format!("{:?}", duration)); + } + + RecordBatch::try_new( + schema, + vec![ + Arc::new(type_builder.finish()), + Arc::new(plan_builder.finish()), + ], + ) + .map_err(DataFusionError::from) +} + #[cfg(test)] mod tests { use arrow::datatypes::{DataType, Field, Schema}; diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 3532a6949a33..35b209c7c50b 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -1192,11 +1192,9 @@ impl DefaultPhysicalPlanner { "Unsupported logical plan: Distinct should be replaced to Aggregate".to_string(), )) } - LogicalPlan::Analyze(a) => { - let input = self.create_initial_plan(&a.input, session_state).await?; - let schema = SchemaRef::new((*a.schema).clone().into()); - Ok(Arc::new(AnalyzeExec::new(a.verbose, input, schema))) - } + LogicalPlan::Analyze(_) => Err(DataFusionError::Internal( + "Unsupported logical plan: Analyze must be root of the plan".to_string(), + )), LogicalPlan::Extension(e) => { let physical_inputs = self.create_initial_plan_multi(e.node.inputs(), session_state).await?; @@ -1851,6 +1849,10 @@ impl DefaultPhysicalPlanner { stringified_plans, e.verbose, )))) + } else if let LogicalPlan::Analyze(a) = logical_plan { + let input = self.create_physical_plan(&a.input, session_state).await?; + let schema = SchemaRef::new((*a.schema).clone().into()); + Ok(Some(Arc::new(AnalyzeExec::new(a.verbose, input, schema)))) } else { Ok(None) } diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 1ab933022d32..971dea81283f 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -687,13 +687,31 @@ async fn csv_explain_analyze() { // Only test basic plumbing and try to avoid having to change too // many things. explain_analyze_baseline_metrics covers the values // in greater depth - let needle = "CoalescePartitionsExec, metrics=[output_rows=5, elapsed_compute="; + let needle = "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))], metrics=[output_rows=5"; assert_contains!(&formatted, needle); let verbose_needle = "Output Rows"; assert_not_contains!(formatted, verbose_needle); } +#[tokio::test] +#[cfg_attr(tarpaulin, ignore)] +async fn csv_explain_analyze_order_by() { + let ctx = SessionContext::new(); + register_aggregate_csv_by_sql(&ctx).await; + let sql = "EXPLAIN ANALYZE SELECT c1 FROM aggregate_test_100 order by c1"; + let actual = execute_to_batches(&ctx, sql).await; + let formatted = arrow::util::pretty::pretty_format_batches(&actual) + .unwrap() + .to_string(); + + // Ensure that the ordering is not optimized away from the plan + // https://github.com/apache/arrow-datafusion/issues/6379 + let needle = + "SortExec: expr=[c1@0 ASC NULLS LAST], metrics=[output_rows=100, elapsed_compute"; + assert_contains!(&formatted, needle); +} + #[tokio::test] #[cfg_attr(tarpaulin, ignore)] async fn parquet_explain_analyze() { diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs index 8630c606499b..5ecafe6e3714 100644 --- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs +++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs @@ -602,12 +602,6 @@ mod tests { assert_optimized_plan_equal(&plan, expected) } - #[cfg(test)] - #[ctor::ctor] - fn init() { - let _ = env_logger::try_init(); - } - /// Test multiple correlated subqueries /// See subqueries.rs where_in_multiple() #[test] diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index b8217e0ac7a7..7930c059ea47 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -49,3 +49,10 @@ pub use optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule}; pub use utils::optimize_children; mod plan_signature; + +#[cfg(test)] +#[ctor::ctor] +fn init() { + // Enable logging for tests + let _ = env_logger::try_init(); +} diff --git a/datafusion/optimizer/src/scalar_subquery_to_join.rs b/datafusion/optimizer/src/scalar_subquery_to_join.rs index 26f86c607a22..04e0e0920b0c 100644 --- a/datafusion/optimizer/src/scalar_subquery_to_join.rs +++ b/datafusion/optimizer/src/scalar_subquery_to_join.rs @@ -307,12 +307,6 @@ mod tests { }; use std::ops::Add; - #[cfg(test)] - #[ctor::ctor] - fn init() { - let _ = env_logger::try_init(); - } - /// Test multiple correlated subqueries #[test] fn multiple_subqueries() -> Result<()> {