Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for ingesting datasets having multiple dimensions in earth engine #492

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
26 changes: 26 additions & 0 deletions weather_mv/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ _Command options_:
takes extra time in COG creation. Default:False.
* `--use_metrics`: A flag that allows you to add Beam metrics to the pipeline. Default: False.
* `--use_monitoring_metrics`: A flag that allows you to to add Google Cloud Monitoring metrics to the pipeline. Default: False.
* `--partition_dims`: If the dataset contains other dimensions apart from latitude and longitude, partition the dataset into multiple datasets based on these dimensions. A separate COG file will be created for each partition and ingested into Earth Engine. Any unspecified dimensions will be flattened in the resulting COG.
* `--asset_name_format`: The asset name format for each partitioned COG file. This should contain the dimensions no other than partition_dims (along with init_time and valid_time). The dimension names should be enclosed in {} (e.g. a valid format is {init_time}_{valid_time}_{number})
* `--forecast_dim_mapping`: A JSON string containing init_time and valid_time as keys and corresponding dimension names for each key. It is required if init_time or valid_time is used in asset_name_format.
* `--date_format`: A string containing datetime.strftime codes. It is used if the dimension mentioned in asset_name_format is a datetime. Default: %Y%m%d%H%M

Invoke with `ee -h` or `earthengine --help` to see the full range of options.

Expand Down Expand Up @@ -483,6 +487,28 @@ weather-mv ee --uris "gs://your-bucket/*.grib" \
--temp_location "gs://$BUCKET/tmp"
```

Create separate COG files for every value of time and number dimensions:

```bash
weather-mv ee --uris "gs://your-bucket/*.grib" \
--asset_location "gs://$BUCKET/assets" \ # Needed to store assets generated from *.grib
--ee_asset "projects/$PROJECT/assets/test_dir" \
--partition_dims time number
--asset_name_format "{time}_{number}"
```

Create COG files with name of init_time and valid_time (in datetime format) with 'YYYYMMDD' format:

```bash
weather-mv ee --uris "gs://your-bucket/*.grib" \
--asset_location "gs://$BUCKET/assets" \ # Needed to store assets generated from *.grib
--ee_asset "projects/$PROJECT/assets/test_dir" \
--partition_dims time step number \ # step is in timedelta
--forecast_dim_mapping '{"init_time": "time", "valid_time": "step"}'
--asset_name_format "{init_time}_{valid_time}_{number}"
--date_format "%Y%m%D"
```

Limit EE requests:

```bash
Expand Down
440 changes: 327 additions & 113 deletions weather_mv/loader_pipeline/ee.py

Large diffs are not rendered by default.

106 changes: 105 additions & 1 deletion weather_mv/loader_pipeline/ee_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import fnmatch
import logging
import os
import tempfile
import unittest
import xarray as xr
import numpy as np

from .ee import (
get_ee_safe_name,
ConvertToAsset
ConvertToAsset,
add_additional_attrs,
partition_dataset,
construct_asset_name
)
from .sinks_test import TestDataBase

Expand Down Expand Up @@ -107,6 +113,104 @@ def test_convert_to_table_asset__with_multiple_grib_edition(self):
# The size of tiff is expected to be more than grib.
self.assertTrue(os.path.getsize(asset_path) > os.path.getsize(data_path))

def test_convert_to_multi_image_asset(self):
convert_to_multi_image_asset = ConvertToAsset(
asset_location=self.tmpdir.name,
partition_dims=['time', 'step'],
asset_name_format='{init_time}_{valid_time}',
forecast_dim_mapping={
'init_time': 'time',
'valid_time': 'step'
}
)
data_path = f'{self.test_data_folder}/test_data_multi_dimension.nc'
asset_path = os.path.join(self.tmpdir.name)
list(convert_to_multi_image_asset.process(data_path))

# Make sure there are total 9 tiff files generated at target location
self.assertEqual(len(fnmatch.filter(os.listdir(asset_path), '*.tiff')), 9)


class PartitionDatasetTests(TestDataBase):

def setUp(self):
super().setUp()
self.forecast_dim_mapping = {
'init_time': 'time',
'valid_time': 'step'
}
self.date_format = '%Y%m%d%H%M'
self.ds = xr.open_dataset(f'{self.test_data_folder}/test_data_multi_dimension.nc')

def test_add_additional_attrs(self):

sliced_ds = self.ds.isel({'time': 0, 'step': 1})
attrs = add_additional_attrs(sliced_ds, self.forecast_dim_mapping, self.date_format)
attr_names = ['init_time', 'valid_time', 'start_time', 'end_time', 'forecast_seconds']

for name in attr_names:
self.assertTrue(name in attrs)

self.assertEqual(attrs['init_time'], '202412010000')
self.assertEqual(attrs['valid_time'], '202412010600')
self.assertEqual(attrs['start_time'], '2024-12-01T00:00:00Z')
self.assertEqual(attrs['end_time'], '2024-12-01T06:00:00Z')
self.assertEqual(attrs['forecast_seconds'], 6 * 60 * 60)

def test_construct_asset_name(self):

asset_name_format = '{init_time}_{valid_time}_{level}'
attrs = {
'init_time': '202412010000',
'valid_time': '202412010600',
'level_value': np.array(2),
'time_value': np.datetime64('2024-12-01'),
'step_value': np.timedelta64(6, 'h')
}

self.assertEqual(construct_asset_name(attrs, asset_name_format), '202412010000_202412010600_2')

def test_partition_dataset(self):

partition_dims = ['time', 'step']
asset_name_format = '{init_time}_{valid_time}'
partition_datasets = partition_dataset(
self.ds, partition_dims, self.forecast_dim_mapping, asset_name_format, self.date_format
)

# As the ds partitioned on time(3) and step(3), there should be total 9 datasets
self.assertEqual(len(partition_datasets), 9)

dates = ['202412010000', '202412020000', '202412030000']
valid_times = [
'202412010000', '202412010600', '202412011200',
'202412020000', '202412020600', '202412021200',
'202412030000', '202412030600', '202412031200',
]

for i, dataset in enumerate(partition_datasets):

# Make sure the level dimension is flattened and values added in dataArray name
self.assertEqual(len(dataset.data_vars), 6)
self.assertTrue('level_0_x' in dataset.data_vars)
self.assertTrue('level_0_y' in dataset.data_vars)
self.assertTrue('level_0_z' in dataset.data_vars)
self.assertTrue('level_1_x' in dataset.data_vars)
self.assertTrue('level_1_y' in dataset.data_vars)
self.assertTrue('level_1_z' in dataset.data_vars)

# Make sure the dataset contains only 2 dimension: latitude and longitude
self.assertEqual(len(dataset.dims), 2)
self.assertTrue('latitude' in dataset.dims.keys())
self.assertTrue('longitude' in dataset.dims.keys())

# Make sure the data arrays are of 2D
self.assertEqual(dataset['level_0_x'].shape, (18, 36))

# Make sure the dataset have correct name
self.assertTrue('asset_name' in dataset.attrs)
self.assertEqual(dataset.attrs['asset_name'], f'{dates[i // 3]}_{valid_times[i]}')


if __name__ == '__main__':
unittest.main()
2 changes: 1 addition & 1 deletion weather_mv/loader_pipeline/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def _replace_dataarray_names_with_long_names(ds: xr.Dataset):
def _to_utc_timestring(np_time: np.datetime64) -> str:
"""Turn a numpy datetime64 into UTC timestring."""
timestamp = float((np_time - np.datetime64(0, 's')) / np.timedelta64(1, 's'))
return datetime.datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%dT%H:%M:%SZ')
return datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')


def _add_is_normalized_attr(ds: xr.Dataset, value: bool) -> xr.Dataset:
Expand Down
33 changes: 33 additions & 0 deletions weather_mv/loader_pipeline/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import typing as t
import uuid
from functools import partial
from string import Formatter
from urllib.parse import urlparse
import apache_beam as beam
import numpy as np
Expand Down Expand Up @@ -327,6 +328,38 @@ def validate_region(output_table: t.Optional[str] = None,
signal.signal(signal.SIGINT, original_sigtstp_handler)


def get_dims_from_name_format(asset_name_format: str) -> t.List[str]:
"""Returns a list of dimension from the asset name format."""
return [field_name for _, field_name, _, _ in Formatter().parse(asset_name_format) if field_name]


def get_datetime_from(value: np.datetime64) -> datetime.datetime:
return datetime.datetime.fromtimestamp(
(value - np.datetime64(0, 's')) // np.timedelta64(1, 's'),
datetime.timezone.utc
)


def convert_to_string(value: t.Any, date_format : str = '%Y%m%d%H%M', make_ee_safe : bool = False):
"""Converts a given value to string based on the type of value."""
def _make_ee_safe(str_val: str) -> str:
return re.sub(r'[^a-zA-Z0-9-_]+', r'_', str_val)

str_val = ''
if isinstance(value, np.ndarray) and value.size == 1:
value = value.item()

if isinstance(value, float):
str_val = str(round(value, 2))
elif isinstance(value, np.datetime64):
dt = get_datetime_from(value)
str_val = dt.strftime(date_format)
else:
str_val = str(value)

return _make_ee_safe(str_val) if make_ee_safe else str_val


def _shard(elem, num_shards: int):
return (np.random.randint(0, num_shards), elem)

Expand Down
14 changes: 14 additions & 0 deletions weather_mv/loader_pipeline/util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
ichunked,
make_attrs_ee_compatible,
to_json_serializable_type,
convert_to_string
)


Expand Down Expand Up @@ -251,3 +252,16 @@ def test_to_json_serializable_type_datetime(self):
self.assertEqual(self._convert(timedelta(seconds=1)), float(1))
self.assertEqual(self._convert(timedelta(minutes=1)), float(60))
self.assertEqual(self._convert(timedelta(days=1)), float(86400))


class ConvertToStringTests(unittest.TestCase):

def test_convert_scalar_to_string(self):
self.assertEqual(convert_to_string(np.array(5)), '5')
self.assertEqual(convert_to_string(np.array(5.6789)), '5.68')

def test_convert_datetime_to_string(self):
value = np.datetime64('2025-01-24T04:05:06')
self.assertEqual(convert_to_string(value), '202501240405')
self.assertEqual(convert_to_string(value, '%Y/%m/%dT%H:%M:%S'), '2025/01/24T04:05:06')
self.assertEqual(convert_to_string(value, '%Y/%m/%dT%H:%M:%S', True), '2025_01_24T04_05_06')
Binary file not shown.