Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue hitting max records in a batch #3119

Open
EvanJGunn opened this issue Mar 5, 2025 · 5 comments · May be fixed by #3120
Open

Issue hitting max records in a batch #3119

EvanJGunn opened this issue Mar 5, 2025 · 5 comments · May be fixed by #3120

Comments

@EvanJGunn
Copy link

The Issue

My team is getting this invalid array length error in our consumer:
{"level":"info","msg":"consumer/broker/1395569342 disconnecting due to error processing FetchRequest: kafka: error decoding packet: invalid array length","time":"2025-02-27 20:17:36"}

I tracked the error down and it looks like its happening here:

numRecs, err := pd.getArrayLength()

return -1, errInvalidArrayLength

Unless I am mistaken, it appears that the maximum number of records in a batch is set at 131070.

The Question

Is there a reason the max records in a batch is set at 131070?

More Background

My team is consuming a lot of small records from confluents warpstream product. For whatever reason, it looks like Fetch Max gets ignored by the warpstream product, so that unfortunately is not a solution for us.

@puellanivis
Copy link
Contributor

Following commits back to the first commit with this limit:

	if tmp > pd.avail() || tmp > 2*math.MaxUint16 {
		return -1, errors.New("kafka getArrayCount: unreasonably long array")
	}

It seems it was just an arbitrary choice to prevent trying to allocate “unreasonable” array lengths. We probably ought to limit it against how many objects could remain if the whole rest of the packet were the array. (Or the number of bytes remaining.) This should prevent most malicious encodings?

@richardartoul
Copy link
Contributor

@puellanivis yeah I think what you're suggesting makes sense. Usually an invalid encoding will result in a stupendously large number anyways, there's a lot more wiggle room, but kafka batches can be really large in terms of record count, especially if the messages are small

rmb938 added a commit to rmb938/sarama that referenced this issue Mar 5, 2025
created getArrayLengthNoLimit as record batches can container more
then 2*math.MaxUint16 records. The packet decoder will make sure
then the array length isn't greater then the number of bytes remaining
to be decoding in the packet.

Also added a test for large record counts.

Fixes: IBM#3119
rmb938 added a commit to rmb938/sarama that referenced this issue Mar 5, 2025
created `getArrayLengthNoLimit` as record batches can contain more
than `2*math.MaxUint16 records`. The packet decoder will make sure
that the array length isn't greater than the number of bytes remaining
to be decoding in the packet.

Also added a test for large record counts.

Fixes: IBM#3119
@rmb938
Copy link

rmb938 commented Mar 5, 2025

Hey All, I made a PR to fix this issue #3120

rmb938 added a commit to rmb938/sarama that referenced this issue Mar 5, 2025
created `getArrayLengthNoLimit` as record batches can contain more
than `2*math.MaxUint16 records`. The packet decoder will make sure
that the array length isn't greater than the number of bytes remaining
to be decoding in the packet.

Also added a test for large record counts.

Fixes: IBM#3119
Signed-off-by: Ryan Belgrave <[email protected]>
@EvanJGunn
Copy link
Author

I tested using this branch and it appears to work perfectly. Thank you.

@dnwe
Copy link
Collaborator

dnwe commented Mar 10, 2025

My team is consuming a lot of small records from confluents warpstream product. For whatever reason, it looks like Fetch Max gets ignored by the warpstream product, so that unfortunately is not a solution for us.

I'm curious about this one as they explicitly document that you might have to tune-up your MaxBytes. From their documentation:

When Kafka clients issue Fetch requests, they specify a limit on the amount of data to be returned (in aggregate, and per-partition) by the Broker. In Apache Kafka, the brokers interpret the limit in terms of compressed bytes, but in WarpStream the Agents interpret the limit in terms of uncompressed bytes. For more details on why this decision was made, check out our more detailed documentation about compression in WarpStream. That said, the practical implication of this decision is that for some workloads, you will need to tune your Kafka consumer clients to request more data per individual Fetch request to achieve the same throughput with WarpStream.

However, my preference to fix this right now would probably be for us to replace the tmp > 2*math.MaxUint16 arbitrary number safety blanket with tmp > MaxResponseSize as we already have that constant (that a user can change via sarama.MaxResponseSize = ...) which (somewhat surprisingly) is what we use for MaxBytes in a FetchRequest (rather than the sarama.Config Consumer.Fetch settings...)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants