Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rework zip fix for #537 #595

Merged
merged 2 commits into from
Feb 6, 2022
Merged

rework zip fix for #537 #595

merged 2 commits into from
Feb 6, 2022

Conversation

MainRo
Copy link
Collaborator

@MainRo MainRo commented Jan 22, 2022

The original fix for #525 breaks sequences where some observables emit
item faster than other ones. By completing too soon, the remaining
observables cannot catchup later. A very simple case is in #578 where
the two observables to zip emit their items sequentially.

We can fix both issues by completing whenever an observable completes
and there is no queued item. Otherwise we let the remaining observables
a chance to emit new items before completion.

The original fix for ReactiveX#525 breaks sequences where some observables emit
item faster than other ones. By completing too soon, the remaining
observables cannot catchup later. A very simple case is in ReactiveX#578 where
the two observables to zip emit their items sequentially.

We can fix both issues by completing whenever an observable completes
and there is no queued item. Otherwise we let the remaining observables
a chance to emit new items before completion.
@MainRo MainRo requested review from dbrattli and jcafhe January 22, 2022 22:55
@MainRo
Copy link
Collaborator Author

MainRo commented Jan 22, 2022

@MichaelSchneeberger can you review this?

@coveralls
Copy link

coveralls commented Jan 22, 2022

Coverage Status

Coverage increased (+0.01%) to 93.514% when pulling 2261a11 on MainRo:bugfix/zip into a4e84d8 on ReactiveX:master.

@MainRo MainRo changed the title partially revert #537 rework zip fix for #537 Jan 23, 2022
@MainRo MainRo requested a review from erikkemperman February 1, 2022 21:44
Copy link
Collaborator

@dbrattli dbrattli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

@MichaelSchneeberger
Copy link

I would have done it like this

def next(i):
    if all([len(q) for q in queues]):
        try:
            queued_values = [x.pop(0) for x in queues]
            res = tuple(queued_values)
        except Exception as ex:  # pylint: disable=broad-except
            observer.on_error(ex)
            return
  
        observer.on_next(res)
  
        # after sending the zipped values, complete the observer if at least one upstream observable
        # is completed and its queue has length zero 
        if any((done for queue, done in zip(queues, is_completed) if len(queue)==0)):
            observer.on_completed()

def completed(i):
    is_completed[i] = True

    if len(queues[i]) == 0:
        observer.on_completed()

@MainRo
Copy link
Collaborator Author

MainRo commented Feb 4, 2022

Thank @MichaelSchneeberger, your proposal is simpler and more readable. I updated the PR with it.

@MainRo MainRo merged commit 2f81dc3 into ReactiveX:master Feb 6, 2022
@MainRo MainRo mentioned this pull request Feb 6, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants