-
Notifications
You must be signed in to change notification settings - Fork 30.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream: allow infinite concurrency #48588
base: main
Are you sure you want to change the base?
stream: allow infinite concurrency #48588
Conversation
Review requested:
|
the macOS failed test is unrelated: |
{ | ||
// Allow Infinite concurrency | ||
const stream = Readable.from([1, 2]).map(async (item, { signal }) => { | ||
await setTimeout(10 - item, { signal }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure this asserts concurrency works
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test meant to make sure that concurrency: Infinity
does not throw an error, but you are right, it does not test that concurrency works, fixed it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a timer here? Can't this just be something like:
let count = 0; // or t.mock
Readable.from(Array(1000).fill()).map(() => {
count++;
return new Promise(() => {});
}, { concurrency: Infinity }).toArray().catch(common.mustNotCall());
await setImmediate(); // from timers/promises
strictEqual(count, 1000);
@ronag I'm not sure about this wdyt? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any good reason to allow this API: it's a gateway to memory leaks.
How is it different than allowing Number.MAX_SAFE_INTEGER? |
@mcollina we can log a warning in case the array reaches some size like what's happening when having a lot of listeners to event emitter |
then why add this option in the first place? |
adding this so we can use it in:
which is needed as we don't want to wait for the prev tests to finish before starting a new one (same issue as in #46132)
This has a pitfall in using
concurrency: Infinity
on infinite stream