Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure triggering of callback in chord #397

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

s-bessing
Copy link

@s-bessing s-bessing commented Jul 3, 2023

If one task for whatever reason is executed twice the callback is never triggered because the chord will be deleted before the last task is executed. The last task will then raise a warning that the chord can't be found.

s-bessing added 2 commits July 3, 2023 16:13
If one task for whatever reason is executed twice the callback is never triggered because the chord will be deleted and the before the last task is executed. The last task then will raise a warning that the chord cant be found.
@auvipy auvipy self-requested a review July 6, 2023 05:19
@auvipy auvipy closed this Nov 8, 2023
@auvipy auvipy reopened this Nov 8, 2023
Copy link
Contributor

@AllexVeldman AllexVeldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense, you could consider refrasing the warning so it indicates why it's emitted instead of what broke. Something like "Chord %s executed more tasks than expected"

Copy link
Member

@auvipy auvipy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

beside the review comments, we need to make sure the CI is green

@auvipy auvipy closed this Apr 27, 2024
@auvipy auvipy reopened this Apr 27, 2024
@s-bessing
Copy link
Author

beside the review comments, we need to make sure the CI is green

The tests seem to be wrong. The counter goes to 0 but the group was not done.

@s-bessing s-bessing requested a review from auvipy April 30, 2024 15:45
Comment on lines +865 to +866
chord_counter.refresh_from_db()
assert chord_counter.count == 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct, we have a header of 2, 1 is done, the other fails.
If a task in the header fails the chord is done and the callback is not called.

If this is the case, a ChordCounter object will remain in the db for every failed chord.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AllexVeldman but if you have retry logic (on code or worker level) all task eventually are successful, but the callback is never triggered.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://docs.celeryq.dev/en/latest/reference/celery.result.html#celery.result.AsyncResult.ready would suggest deps.ready() would be False in case of a retry, True in case of a failure like in this test.

So this test should call the path all the way to trigger_callback() and fail the Chord, otherwise a Chord can never fail.
It suprises me that this is not the case with your proposed changes, since trigger_callback should be True, deps.ready() should be True so both trigger_callback() and chord_counter.delete() should have been called.

Copy link
Contributor

@AllexVeldman AllexVeldman Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, it took some time but I figured out why this change makes the test pass (which is still not correct).

It's because the MagicMock used for the request parameter to mark_as_done() and mark_as_failure() does not have ignore_result = False set. This in turn would make request.ignore_result evaluate to True, skipping saving the result, which is needed to have deps.ready() work properly. Since we did not rely on deps.ready() in the previous state this never surfaced as an issue.

Add ignore_result = False to the MagicMock and reverting the test changes will fix the test.

@AllexVeldman
Copy link
Contributor

AllexVeldman commented Feb 28, 2025

@s-bessing I started to dig a little into how retries work, and on_chord_part_return is not called when a Task is marked for retry..

This works both on your branch and on main:

"""
Smoke test some Chord setups, including retries and failures.
"""

import os

from celery import Celery, chord
import django
from django.core.management import call_command

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'smoke.settings')

app = Celery(broker='redis://localhost:6379/2')
app.config_from_object('django.conf:settings', namespace='CELERY')


tries = 0

@app.task(name="add")
def add(x, y):
    return x + y

@app.task(name="fail", bind=True, max_retries=3)
def fail(self):
    """Retry 2x, return 5 on the third try"""
    global tries
    tries += 1
    if tries < 3:
        self.retry(countdown=5)
    return 5

@app.task(name="callback")
def callback(numbers):
    return sum(numbers)

if __name__ == "__main__":

    django.setup()
    call_command("migrate")
    from django_celery_results.models import ChordCounter

    header = [
        add.s(1,2),
        add.s(3,4),
        fail.s(),
    ]
    result = chord(header)(callback.s())
    assert ChordCounter.objects.count() == 1
    assert result.get(propagate=False) == 15
    assert ChordCounter.objects.count() == 0

So this PR adds nothing in the case of retries.

Could you elaborate a bit more on how you expect a task (with the same ID) to be executed twice, causing the callback to never be executed?

@AllexVeldman
Copy link
Contributor

For completeness, this is the unittest I added to prove retries work on main:

    def test_on_chord_part_return_retry(self):
        """Test if the callback is executed if a task is retried"""
        gid = uuid()
        tid1 = uuid()
        tid2 = uuid()
        subtasks = [AsyncResult(tid1), AsyncResult(tid2)]
        group = GroupResult(id=gid, results=subtasks)
        self.b.apply_chord(group, self.add.s())

        chord_counter = ChordCounter.objects.get(group_id=gid)
        assert chord_counter.count == 2

        request = mock.MagicMock()
        request.id = subtasks[0].id
        request.group = gid
        request.task = "my_task"
        request.args = ["a", 1, "password"]
        request.kwargs = {"c": 3, "d": "e", "password": "password"}
        request.argsrepr = "argsrepr"
        request.kwargsrepr = "kwargsrepr"
        request.hostname = "celery@ip-0-0-0-0"
        request.periodic_task_name = "my_periodic_task"
        request.ignore_result = False
        result = {"foo": "baz"}

        self.b.mark_as_done(tid1, result, request=request)

        chord_counter.refresh_from_db()
        assert chord_counter.count == 1

        self.b.mark_as_retry(tid2, result, request=request)

        chord_counter.refresh_from_db()
        assert chord_counter.count == 1

        self.b.mark_as_done(tid2, result, request=request)

        with pytest.raises(ChordCounter.DoesNotExist):
            ChordCounter.objects.get(group_id=gid)

        request.chord.delay.assert_called_once()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants