Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tracing spans to last value cache #26077

Open
hiltontj opened this issue Feb 27, 2025 · 0 comments
Open

Add tracing spans to last value cache #26077

hiltontj opened this issue Feb 27, 2025 · 0 comments
Labels

Comments

@hiltontj
Copy link
Contributor

Problem statement

There has been variable latency observed when using the last cache under higher concurrent query loads. There are no tracing spans to indicate where in the scan time is being taken, and contributing to that latency, in the last cache.

Proposed solution

Extract the span context from the DataFusion Session that is provided to the TableProvider::scan implementation for the last_cache:

async fn scan(
&self,
ctx: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {

It would be worth wrapping a span around the following to generate traces:

Within the latter, we could break down into more spans:

  • Predicate evaluation of the cache key hierarchy:
    for predicate in predicates {
    if caches.is_empty() {
    return Ok(vec![]);
    }
    let mut new_caches = vec![];
    for c in caches {
    let Some(cache_key) = c.state.as_key() else {
    continue;
    };
    if let Some(pred) = predicate {
    let next_states = cache_key.evaluate_predicate(pred);
    new_caches.extend(next_states.into_iter().map(|(state, value)| {
    let mut additional_columns = c.key_column_values.clone();
    additional_columns.push(value);
    ExtendedLastCacheState {
    state,
    key_column_values: additional_columns,
    }
    }));
    } else {
    new_caches.extend(cache_key.value_map.iter().map(|(v, state)| {
    let mut additional_columns = c.key_column_values.clone();
    additional_columns.push(v);
    ExtendedLastCacheState {
    state,
    key_column_values: additional_columns,
    }
    }));
    }
    }
    caches = new_caches;
    }
  • Scanning the filtered leaf nodes:
    caches
    .into_iter()
    .map(|c| c.to_record_batch(Arc::clone(&table_def), Arc::clone(&self.schema)))
    .collect()

Additional context

This is related to #25562

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant