Skip to content

Commit 4f2743c

Browse files
authored
fix: Dataproc operators fail to import without OpenLineage (apache#46561)
Signed-off-by: Kacper Muda <[email protected]>
1 parent 45d37da commit 4f2743c

File tree

6 files changed

+158
-56
lines changed

6 files changed

+158
-56
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
from __future__ import annotations
19+
20+
from typing import TYPE_CHECKING
21+
22+
from attr import define, field
23+
24+
from airflow.providers.google import __version__ as provider_version
25+
26+
if TYPE_CHECKING:
27+
from openlineage.client.generated.base import RunFacet
28+
else:
29+
try:
30+
try:
31+
from openlineage.client.generated.base import RunFacet
32+
except ImportError: # Old OpenLineage client is used
33+
from openlineage.client.facet import BaseFacet as RunFacet
34+
35+
@define
36+
class BigQueryJobRunFacet(RunFacet):
37+
"""
38+
Facet that represents relevant statistics of bigquery run.
39+
40+
:param cached: BigQuery caches query results. Rest of the statistics will not be provided for cached queries.
41+
:param billedBytes: How many bytes BigQuery bills for.
42+
:param properties: Full property tree of BigQUery run.
43+
"""
44+
45+
cached: bool
46+
billedBytes: int | None = field(default=None)
47+
properties: str | None = field(default=None)
48+
49+
@staticmethod
50+
def _get_schema() -> str:
51+
return (
52+
"https://raw.githubusercontent.com/apache/airflow/"
53+
f"providers-google/{provider_version}/airflow/providers/google/"
54+
"openlineage/BigQueryJobRunFacet.json"
55+
)
56+
except ImportError: # OpenLineage is not available
57+
58+
def create_no_op(*_, **__) -> None:
59+
"""
60+
Create a no-op placeholder.
61+
62+
This function creates and returns a None value, used as a placeholder when the OpenLineage client
63+
library is available. It represents an action that has no effect.
64+
"""
65+
return None
66+
67+
BigQueryJobRunFacet = create_no_op

providers/google/src/airflow/providers/google/cloud/openlineage/mixins.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
)
3939
from airflow.providers.google.cloud.openlineage.utils import (
4040
BIGQUERY_NAMESPACE,
41-
BigQueryJobRunFacet,
4241
get_facets_from_bq_table,
4342
get_from_nullable_chain,
4443
get_identity_column_lineage_facet,
@@ -48,6 +47,7 @@
4847

4948
if TYPE_CHECKING:
5049
from airflow.providers.common.compat.openlineage.facet import Dataset, RunFacet
50+
from airflow.providers.google.cloud.openlineage.facets import BigQueryJobRunFacet
5151

5252

5353
class _BigQueryInsertJobOperatorOpenLineageMixin:
@@ -316,6 +316,8 @@ def _get_inputs_and_outputs_for_extract_job(
316316

317317
@staticmethod
318318
def _get_bigquery_job_run_facet(properties: dict) -> BigQueryJobRunFacet:
319+
from airflow.providers.google.cloud.openlineage.facets import BigQueryJobRunFacet
320+
319321
job_type = get_from_nullable_chain(properties, ["configuration", "jobType"])
320322
cache_hit, billed_bytes = None, None
321323
if job_type == "QUERY":

providers/google/src/airflow/providers/google/cloud/openlineage/utils.py

-29
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,13 @@
2626
from collections.abc import Iterable
2727
from typing import TYPE_CHECKING, Any
2828

29-
from attr import define, field
30-
3129
from airflow.providers.common.compat.openlineage.facet import (
3230
ColumnLineageDatasetFacet,
3331
DatasetFacet,
3432
DocumentationDatasetFacet,
3533
Fields,
3634
Identifier,
3735
InputField,
38-
RunFacet,
3936
SchemaDatasetFacet,
4037
SchemaDatasetFacetFields,
4138
SymlinksDatasetFacet,
@@ -44,7 +41,6 @@
4441
inject_parent_job_information_into_spark_properties,
4542
inject_transport_information_into_spark_properties,
4643
)
47-
from airflow.providers.google import __version__ as provider_version
4844
from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url
4945
from google.cloud.dataproc_v1 import Batch, RuntimeConfig
5046

@@ -317,31 +313,6 @@ def get_identity_column_lineage_facet(
317313
return {"columnLineage": column_lineage_facet}
318314

319315

320-
@define
321-
class BigQueryJobRunFacet(RunFacet):
322-
"""
323-
Facet that represents relevant statistics of bigquery run.
324-
325-
This facet is used to provide statistics about bigquery run.
326-
327-
:param cached: BigQuery caches query results. Rest of the statistics will not be provided for cached queries.
328-
:param billedBytes: How many bytes BigQuery bills for.
329-
:param properties: Full property tree of BigQUery run.
330-
"""
331-
332-
cached: bool
333-
billedBytes: int | None = field(default=None)
334-
properties: str | None = field(default=None)
335-
336-
@staticmethod
337-
def _get_schema() -> str:
338-
return (
339-
"https://raw.githubusercontent.com/apache/airflow/"
340-
f"providers-google/{provider_version}/airflow/providers/google/"
341-
"openlineage/BigQueryJobRunFacet.json"
342-
)
343-
344-
345316
def get_from_nullable_chain(source: Any, chain: list[str]) -> Any | None:
346317
"""
347318
Get object from nested structure of objects, where it's not guaranteed that all keys in the nested structure exist.

providers/google/src/airflow/providers/google/cloud/operators/dataproc.py

+61-23
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,6 @@
4949
DataprocWorkflowLink,
5050
DataprocWorkflowTemplateLink,
5151
)
52-
from airflow.providers.google.cloud.openlineage.utils import (
53-
inject_openlineage_properties_into_dataproc_batch,
54-
inject_openlineage_properties_into_dataproc_job,
55-
inject_openlineage_properties_into_dataproc_workflow_template,
56-
)
5752
from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator
5853
from airflow.providers.google.cloud.triggers.dataproc import (
5954
DataprocBatchTrigger,
@@ -1858,12 +1853,7 @@ def execute(self, context: Context):
18581853
project_id = self.project_id or hook.project_id
18591854
if self.openlineage_inject_parent_job_info or self.openlineage_inject_transport_info:
18601855
self.log.info("Automatic injection of OpenLineage information into Spark properties is enabled.")
1861-
self.template = inject_openlineage_properties_into_dataproc_workflow_template(
1862-
template=self.template,
1863-
context=context,
1864-
inject_parent_job_info=self.openlineage_inject_parent_job_info,
1865-
inject_transport_info=self.openlineage_inject_transport_info,
1866-
)
1856+
self._inject_openlineage_properties_into_dataproc_workflow_template(context)
18671857

18681858
operation = hook.instantiate_inline_workflow_template(
18691859
template=self.template,
@@ -1920,6 +1910,25 @@ def on_kill(self) -> None:
19201910
hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
19211911
hook.get_operations_client(region=self.region).cancel_operation(name=self.operation_name)
19221912

1913+
def _inject_openlineage_properties_into_dataproc_workflow_template(self, context: Context) -> None:
1914+
try:
1915+
from airflow.providers.google.cloud.openlineage.utils import (
1916+
inject_openlineage_properties_into_dataproc_workflow_template,
1917+
)
1918+
1919+
self.template = inject_openlineage_properties_into_dataproc_workflow_template(
1920+
template=self.template,
1921+
context=context,
1922+
inject_parent_job_info=self.openlineage_inject_parent_job_info,
1923+
inject_transport_info=self.openlineage_inject_transport_info,
1924+
)
1925+
except Exception as e:
1926+
self.log.warning(
1927+
"An error occurred while trying to inject OpenLineage information. "
1928+
"Dataproc template has not been modified by OpenLineage.",
1929+
exc_info=e,
1930+
)
1931+
19231932

19241933
class DataprocSubmitJobOperator(GoogleCloudBaseOperator):
19251934
"""
@@ -2017,12 +2026,8 @@ def execute(self, context: Context):
20172026
self.hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
20182027
if self.openlineage_inject_parent_job_info or self.openlineage_inject_transport_info:
20192028
self.log.info("Automatic injection of OpenLineage information into Spark properties is enabled.")
2020-
self.job = inject_openlineage_properties_into_dataproc_job(
2021-
job=self.job,
2022-
context=context,
2023-
inject_parent_job_info=self.openlineage_inject_parent_job_info,
2024-
inject_transport_info=self.openlineage_inject_transport_info,
2025-
)
2029+
self._inject_openlineage_properties_into_dataproc_job(context)
2030+
20262031
job_object = self.hook.submit_job(
20272032
project_id=self.project_id,
20282033
region=self.region,
@@ -2096,6 +2101,25 @@ def on_kill(self):
20962101
if self.job_id and self.cancel_on_kill:
20972102
self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id, region=self.region)
20982103

2104+
def _inject_openlineage_properties_into_dataproc_job(self, context: Context) -> None:
2105+
try:
2106+
from airflow.providers.google.cloud.openlineage.utils import (
2107+
inject_openlineage_properties_into_dataproc_job,
2108+
)
2109+
2110+
self.job = inject_openlineage_properties_into_dataproc_job(
2111+
job=self.job,
2112+
context=context,
2113+
inject_parent_job_info=self.openlineage_inject_parent_job_info,
2114+
inject_transport_info=self.openlineage_inject_transport_info,
2115+
)
2116+
except Exception as e:
2117+
self.log.warning(
2118+
"An error occurred while trying to inject OpenLineage information. "
2119+
"Dataproc job has not been modified by OpenLineage.",
2120+
exc_info=e,
2121+
)
2122+
20992123

21002124
class DataprocUpdateClusterOperator(GoogleCloudBaseOperator):
21012125
"""
@@ -2502,12 +2526,7 @@ def execute(self, context: Context):
25022526

25032527
if self.openlineage_inject_parent_job_info or self.openlineage_inject_transport_info:
25042528
self.log.info("Automatic injection of OpenLineage information into Spark properties is enabled.")
2505-
self.batch = inject_openlineage_properties_into_dataproc_batch(
2506-
batch=self.batch,
2507-
context=context,
2508-
inject_parent_job_info=self.openlineage_inject_parent_job_info,
2509-
inject_transport_info=self.openlineage_inject_transport_info,
2510-
)
2529+
self._inject_openlineage_properties_into_dataproc_batch(context)
25112530

25122531
try:
25132532
self.operation = self.hook.create_batch(
@@ -2670,6 +2689,25 @@ def retry_batch_creation(
26702689
)
26712690
return batch, batch_id
26722691

2692+
def _inject_openlineage_properties_into_dataproc_batch(self, context: Context) -> None:
2693+
try:
2694+
from airflow.providers.google.cloud.openlineage.utils import (
2695+
inject_openlineage_properties_into_dataproc_batch,
2696+
)
2697+
2698+
self.batch = inject_openlineage_properties_into_dataproc_batch(
2699+
batch=self.batch,
2700+
context=context,
2701+
inject_parent_job_info=self.openlineage_inject_parent_job_info,
2702+
inject_transport_info=self.openlineage_inject_transport_info,
2703+
)
2704+
except Exception as e:
2705+
self.log.warning(
2706+
"An error occurred while trying to inject OpenLineage information. "
2707+
"Dataproc batch has not been modified by OpenLineage.",
2708+
exc_info=e,
2709+
)
2710+
26732711

26742712
class DataprocDeleteBatchOperator(GoogleCloudBaseOperator):
26752713
"""
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from __future__ import annotations
18+
19+
from airflow.providers.google.cloud.openlineage.facets import BigQueryJobRunFacet
20+
21+
22+
def test_bigquery_job_run_facet():
23+
facet = BigQueryJobRunFacet(cached=True, billedBytes=123, properties="some_properties")
24+
assert facet.cached is True
25+
assert facet.billedBytes == 123
26+
assert facet.properties == "some_properties"

providers/google/tests/provider_tests/google/cloud/openlineage/test_mixins.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,8 @@
3838
SchemaDatasetFacet,
3939
SchemaDatasetFacetFields,
4040
)
41+
from airflow.providers.google.cloud.openlineage.facets import BigQueryJobRunFacet
4142
from airflow.providers.google.cloud.openlineage.mixins import _BigQueryInsertJobOperatorOpenLineageMixin
42-
from airflow.providers.google.cloud.openlineage.utils import (
43-
BigQueryJobRunFacet,
44-
)
4543
from airflow.providers.openlineage.sqlparser import SQLParser
4644

4745
QUERY_JOB_PROPERTIES = {

0 commit comments

Comments
 (0)