Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ge-profiler): catch TimeoutError #12855

Merged

Conversation

sgomezvillamor
Copy link
Contributor

This fixes the following exception where a TimeoutError during profiling is not properly catch and so failing abruptly the pipeline.

[2025-02-27 04:16:58,824] ERROR    {datahub.ingestion.run.pipeline:741} - Ingestion pipeline threw an uncaught exception: 158 (of 220) futures unfinished
Traceback (most recent call last):
  File "/tmp/datahub/ingest/venv-databricks-e1b1ec025dd01f46/lib/python3.10/site-packages/datahub/ingestion/run/pipeline.py", line 466, in run
    for wu in itertools.islice(
  File "/tmp/datahub/ingest/venv-databricks-e1b1ec025dd01f46/lib/python3.10/site-packages/datahub/ingestion/source/state/stale_entity_removal_handler.py", line 72, in auto_stale_entity_removal
    for wu in stream:
  File "/tmp/datahub/ingest/venv-databricks-e1b1ec025dd01f46/lib/python3.10/site-packages/datahub/ingestion/api/auto_work_units/auto_ensure_aspect_size.py", line 91, in ensure_aspect_size
    for wu in stream:
  File "/tmp/datahub/ingest/venv-databricks-e1b1ec025dd01f46/lib/python3.10/site-packages/datahub/ingestion/api/auto_work_units/auto_dataset_properties_aspect.py", line 62, in auto_patch_last_modified
    for wu in stream:
  File "/tmp/datahub/ingest/venv-databricks-e1b1ec025dd01f46/lib/python3.10/site-packages/datahub/ingestion/api/source_helpers.py", line 148, in auto_workunit_reporter
    for wu in stream:
  File "/tmp/datahub/ingest/venv-databricks-e1b1ec025dd01f46/lib/python3.10/site-packages/datahub/ingestion/api/source_helpers.py", line 252, in auto_browse_path_v2
    for urn, batch in _batch_workunits_by_urn(stream):
  File "/tmp/datahub/ingest/venv-databricks-e1b1ec025dd01f46/lib/python3.10/site-packages/datahub/ingestion/api/source_helpers.py", line 508, in _batch_workunits_by_urn
    for wu in stream:
  File "/tmp/datahub/ingest/venv-databricks-e1b1ec025dd01f46/lib/python3.10/site-packages/datahub/ingestion/api/source_helpers.py", line 415, in auto_fix_empty_field_paths
    for wu in stream:
  File "/tmp/datahub/ingest/venv-databricks-e1b1ec025dd01f46/lib/python3.10/site-packages/datahub/ingestion/api/source_helpers.py", line 367, in auto_fix_duplicate_schema_field_paths
    for wu in stream:
  File "/tmp/datahub/ingest/venv-databricks-e1b1ec025dd01f46/lib/python3.10/site-packages/datahub/ingestion/api/source_helpers.py", line 171, in auto_materialize_referenced_tags_terms
    for wu in stream:
  File "/tmp/datahub/ingest/venv-databricks-e1b1ec025dd01f46/lib/python3.10/site-packages/datahub/ingestion/api/source_helpers.py", line 103, in auto_status_aspect
    for wu in stream:
  File "/tmp/datahub/ingest/venv-databricks-e1b1ec025dd01f46/lib/python3.10/site-packages/datahub/ingestion/source/unity/source.py", line 339, in get_workunits_internal
    yield from UnityCatalogGEProfiler(
  File "/tmp/datahub/ingest/venv-databricks-e1b1ec025dd01f46/lib/python3.10/site-packages/datahub/ingestion/source/unity/ge_profiler.py", line 86, in get_workunits
    for i, completed in enumerate(
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 241, in as_completed
    raise TimeoutError(
concurrent.futures._base.TimeoutError: 158 (of 220) futures unfinished
[2025-02-27 04:16:58,843] INFO     {datahub.ingestion.reporting.file_reporter:54} - Wrote FAILURE report successfully to <_io.TextIOWrapper name='/tmp/datahub/logs/945c0e9b-b70e-407c-a4e4-c6dea2889f70/artifacts/ingestion_report.json' mode='w' encoding='UTF-8'>
[2025-02-27 04:16:58,896] INFO     {datahub.cli.ingest_cli:144} - Finished metadata ingestion

The timeout exception raises here

            try:
                for i, completed in enumerate(
                    as_completed(futures, timeout=self.profiling_config.max_wait_secs)
                ):
                    profile_request = completed.result()
                    if profile_request is not None:
                        profile_requests.append(profile_request)
                    if i > 0 and i % 100 == 0:
                        logger.info(f"Finished table-level profiling for {i} tables")
            except TimeoutError:
                logger.warning("Timed out waiting to complete table-level profiling.")

Root cause is the exception being thrown is concurrent.futures.TimeoutError, which is different from TimeoutError.

I have reproduced the issue with this code snippet:

import time
import concurrent.futures

def slow_task():
    """Simulates a long-running task."""
    time.sleep(5)
    return "done"

def test_timeout_handling():
    futures = []

    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        # Submit 5 tasks that each take 5 seconds
        for _ in range(5):
            futures.append(executor.submit(slow_task))

        try:
            # Iterate over as_completed with a short timeout (2s)
            for future in concurrent.futures.as_completed(futures, timeout=2):
                result = future.result()
                print(f"Task completed with result: {result}")
        #except TimeoutError:
        except concurrent.futures.TimeoutError:
            print("Caught TimeoutError: Timeout occurred while waiting for futures.")

# Run the test
test_timeout_handling()

With except TimeoutError it results in exception not being catch:

➜  ~ python3 test-as-completed.py
Traceback (most recent call last):
  File "/Users/sergio/test-as-completed.py", line 26, in <module>
    test_timeout_handling()
  File "/Users/sergio/test-as-completed.py", line 19, in test_timeout_handling
    for future in concurrent.futures.as_completed(futures, timeout=2):
  File "/Users/sergio/.pyenv/versions/3.10.15/lib/python3.10/concurrent/futures/_base.py", line 241, in as_completed
    raise TimeoutError(
concurrent.futures._base.TimeoutError: 5 (of 5) futures unfinished

whereas with except concurrent.futures.TimeoutError, the exception is properly catch

➜  ~ python3 test-as-completed.py
Caught TimeoutError: Timeout occurred while waiting for futures.

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
  • For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub

@sgomezvillamor sgomezvillamor marked this pull request as ready for review March 12, 2025 14:19
Copy link

codecov bot commented Mar 12, 2025

Codecov Report

Attention: Patch coverage is 50.00000% with 1 line in your changes missing coverage. Please review.

✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
.../src/datahub/ingestion/source/unity/ge_profiler.py 50.00% 1 Missing ⚠️

❌ Your patch status has failed because the patch coverage (50.00%) is below the target coverage (75.00%). You can increase the patch coverage or adjust the target coverage.

📢 Thoughts on this report? Let us know!

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@github-actions github-actions bot added the ingestion PR or Issue related to the ingestion of metadata label Mar 12, 2025
@datahub-cyborg datahub-cyborg bot added the needs-review Label for PRs that need review from a maintainer. label Mar 12, 2025
@datahub-cyborg datahub-cyborg bot added pending-submitter-merge and removed needs-review Label for PRs that need review from a maintainer. labels Mar 12, 2025
@sgomezvillamor sgomezvillamor merged commit 0d54352 into master Mar 13, 2025
76 of 77 checks passed
@sgomezvillamor sgomezvillamor deleted the feature/cus-3105-fix-ge-profiler-try-except-timeout branch March 13, 2025 09:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ingestion PR or Issue related to the ingestion of metadata pending-submitter-merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants