Skip to content

Commit

Permalink
Fix rfi from string in same node (#139)
Browse files Browse the repository at this point in the history
* Fix rfi from string in same node

* Remove unused script
  • Loading branch information
Tomperez98 authored Jan 30, 2025
1 parent e3a0341 commit 1751cf7
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 21 deletions.
41 changes: 23 additions & 18 deletions src/resonate/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,24 @@
T = TypeVar("T")


def _set_retry_policy(invocation: LFI | RFI) -> retry_policy.RetryPolicy | None:
if not isinstance(invocation.unit, Invocation):
retry_policy = None
elif isinstance(invocation.unit.fn, str):
retry_policy = invocation.opts.retry_policy or None
elif isgeneratorfunction(invocation.unit.fn):
retry_policy = invocation.opts.retry_policy or never()
else:
assert iscoroutinefunction(invocation.unit.fn) or isfunction(invocation.unit.fn)
retry_policy = invocation.opts.retry_policy or exponential(
base_delay=1,
factor=2,
max_retries=-1,
max_delay=30,
)
return retry_policy


@final
class Record(Generic[T]):
def __init__(
Expand All @@ -38,24 +56,7 @@ def __init__(
self._result: Result[T, Exception] | None = None
self.children: list[Record[Any]] = []
self.invocation: LFI | RFI = invocation
self.retry_policy: retry_policy.RetryPolicy | None
if not isinstance(invocation.unit, Invocation):
self.retry_policy = None
elif isinstance(invocation.unit.fn, str):
self.retry_policy = invocation.opts.retry_policy or None
elif isgeneratorfunction(invocation.unit.fn):
self.retry_policy = invocation.opts.retry_policy or never()
else:
assert iscoroutinefunction(invocation.unit.fn) or isfunction(
invocation.unit.fn
)
self.retry_policy = invocation.opts.retry_policy or exponential(
base_delay=1,
factor=2,
max_retries=-1,
max_delay=30,
)

self.retry_policy = _set_retry_policy(invocation=invocation)
self._attempt: int = 1
self.durable_promise: DurablePromiseRecord | None = None
self._task: TaskRecord | None = None
Expand All @@ -70,6 +71,10 @@ def __init__(
self.parent.id if self.parent else None,
)

def overwrite_invocation(self, invocation: RFI) -> None:
self.invocation = invocation
self.retry_policy = _set_retry_policy(invocation)

def get_coro(self) -> ResonateCoro[T]:
assert self._coro
return self._coro
Expand Down
2 changes: 1 addition & 1 deletion src/resonate/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ def _process_invoke_msg(
assert record.is_root
assert isinstance(record.invocation, RFI)
assert record.durable_promise is not None
record.invocation = rfi
record.overwrite_invocation(rfi)
else:
record = Record[Any](
id=invoke_msg.root_durable_promise.id,
Expand Down
4 changes: 2 additions & 2 deletions tests/test_functionality.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def test_golden_device_rfi() -> None:
group = "test-golden-device-rfi"

def foo_golden_device_rfi(ctx: Context, n: str) -> Generator[Yieldable, Any, str]:
p: Promise[str] = yield ctx.rfi(bar_golden_device_rfi, n).options(
p: Promise[str] = yield ctx.rfi("bar_golden_device_rfi", n).options(
id="bar", send_to=poll(group)
)
assert isinstance(p, Promise)
Expand Down Expand Up @@ -281,7 +281,7 @@ def exec_id(n: int) -> str:
def factorial_rfi(ctx: Context, n: int) -> Generator[Yieldable, Any, int]:
if n == 0:
return 1
p = yield ctx.rfi(factorial_rfi, n - 1).options(
p = yield ctx.rfi("factorial_rfi", n - 1).options(
id=exec_id(n - 1), send_to=poll(group)
)
return n * (yield p)
Expand Down

0 comments on commit 1751cf7

Please sign in to comment.