You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The following workflow works when we are not offloading literals in flytekit
```
import logging
from typing import List
from flytekit import map_task, task, workflow,LaunchPlan
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("flytekit")
logger.setLevel(logging.DEBUG)
@task(cache=True, cache_version="1.1")
def my_30mb_task(i: str) -> str:
return f"Hello world {i}" * 30 * 100 * 1024
@task(cache=True, cache_version="1.1")
def generate_strs(count: int) -> List[str]:
return ["a"] * count
@workflow
def my_30mb_wf(mbs: int) -> List[str]:
strs = generate_strs(count=mbs)
return map_task(my_30mb_task)(i=strs)
@workflow
def big_inputs_wf(input: List[str]):
noop()
@task(cache=True, cache_version="1.1")
def noop():
...
big_inputs_wf_lp = LaunchPlan.get_or_create(name="big_inputs_wf_lp", workflow=big_inputs_wf)
@workflow
def ref_wf(mbs: int):
big_inputs_wf_lp(input=my_30mb_wf(mbs))
```
Without flytekit offloading the return type is OffloadedLiteral{inferredType:{Collection{String}} and when checked against big_inputs_wf launchplan which needs Collection{String} , the LiteralTypeToLiteral returns the inferredType : Collection{String}
If we enable offloading in flytekit, the returned data from map task is
Collection{OffloadedLiteral<{inferredType:{Collection{String}}}
When passing this Input to big_inputs_wf which takes Collection{String} then the type check fails due to LiteralTypeToLiteral returning Collection{OffloadedLiteral{inferredType:{Collection{String}}} as Collection{Collection{String}}
Flytekit handles this case by special casing Collection{OffloadedLiteral} and similar special casing is needed in flyte code base
Tested this by deploying this PR changes
https://dogfood-gcp.cloud-staging.union.ai/console/projects/flytesnacks/domains/development/executions/akxs97cdmkmxhhqp228x/nodes
Earlier it would fail like this https://dogfood-gcp.cloud-staging.union.ai/console/projects/flytesnacks/domains/development/executions/ap4thjp5528kjfspcsds/nodes
```
[UserError] failed to launch workflow, caused by: rpc error: code = InvalidArgument desc = invalid input input wrong type. Expected collection_type:{simple:STRING}, but got collection_type:{collection_type:{simple:STRING}}
```
Rollout to canary and then all prod byoc and serverless tenants
Should this change be upstreamed to OSS (flyteorg/flyte)? If not, please uncheck this box, which is used for auditing. Note, it is the responsibility of each developer to actually upstream their changes. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [x] To be upstreamed to OSS
*TODO: Link Linear issue(s) using [magic words](https://linear.app/docs/github#magic-words). `fixes` will move to merged status, while `ref` will only link the PR.*
* [X] Added tests
* [ ] Ran a deploy dry run and shared the terraform plan
* [ ] Added logging and metrics
* [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list)
* [ ] Updated documentation
0 commit comments