diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java
index 57efac10e0..df3a8a270a 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java
@@ -5663,8 +5663,8 @@ public final T blockingFirst(@NonNull T defaultItem) {
* sequence.
*
* - Backpressure:
- * - The operator consumes the source {@code Flowable} in an unbounded manner
- * (i.e., no backpressure applied to it).
+ * - The operator requests {@link Flowable#bufferSize()} upfront, then 75% of this
+ * amount when 75% is received.
* - Scheduler:
* - {@code blockingForEach} does not operate by default on a particular {@link Scheduler}.
* - Error handling:
@@ -5676,14 +5676,57 @@ public final T blockingFirst(@NonNull T defaultItem) {
* @param onNext
* the {@link Consumer} to invoke for each item emitted by the {@code Flowable}
* @throws RuntimeException
- * if an error occurs
+ * if an error occurs; {@link Error}s and {@link RuntimeException}s are rethrown
+ * as they are, checked {@link Exception}s are wrapped into {@code RuntimeException}s
* @see ReactiveX documentation: Subscribe
* @see #subscribe(Consumer)
+ * @see #blockingForEach(Consumer, int)
*/
- @BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
+ @BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingForEach(@NonNull Consumer super T> onNext) {
- Iterator it = blockingIterable().iterator();
+ blockingForEach(onNext, bufferSize());
+ }
+
+ /**
+ * Consumes the upstream {@code Flowable} in a blocking fashion and invokes the given
+ * {@code Consumer} with each upstream item on the current thread until the
+ * upstream terminates.
+ *
+ *
+ *
+ * Note: the method will only return if the upstream terminates or the current
+ * thread is interrupted.
+ *
+ * This method executes the {@code Consumer} on the current thread while
+ * {@link #subscribe(Consumer)} executes the consumer on the original caller thread of the
+ * sequence.
+ *
+ * - Backpressure:
+ * - The operator requests the given {@code prefetch} amount upfront, then 75% of this
+ * amount when 75% is received.
+ * - Scheduler:
+ * - {@code blockingForEach} does not operate by default on a particular {@link Scheduler}.
+ * - Error handling:
+ * - If the source signals an error, the operator wraps a checked {@link Exception}
+ * into {@link RuntimeException} and throws that. Otherwise, {@code RuntimeException}s and
+ * {@link Error}s are rethrown as they are.
+ *
+ *
+ * @param onNext
+ * the {@link Consumer} to invoke for each item emitted by the {@code Flowable}
+ * @param bufferSize
+ * the number of items to prefetch upfront, then 75% of it after 75% received
+ * @throws RuntimeException
+ * if an error occurs; {@link Error}s and {@link RuntimeException}s are rethrown
+ * as they are, checked {@link Exception}s are wrapped into {@code RuntimeException}s
+ * @see ReactiveX documentation: Subscribe
+ * @see #subscribe(Consumer)
+ */
+ @BackpressureSupport(BackpressureKind.FULL)
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final void blockingForEach(@NonNull Consumer super T> onNext, int bufferSize) {
+ Iterator it = blockingIterable(bufferSize).iterator();
while (it.hasNext()) {
try {
onNext.accept(it.next());
diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java
index eb33792f34..1933a6db0b 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Observable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java
@@ -5170,11 +5170,50 @@ public final T blockingFirst(@NonNull T defaultItem) {
* if an error occurs
* @see ReactiveX documentation: Subscribe
* @see #subscribe(Consumer)
+ * @see #blockingForEach(Consumer, int)
*/
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final void blockingForEach(@NonNull Consumer super T> onNext) {
- Iterator it = blockingIterable().iterator();
+ blockingForEach(onNext, bufferSize());
+ }
+
+ /**
+ * Consumes the upstream {@code Observable} in a blocking fashion and invokes the given
+ * {@code Consumer} with each upstream item on the current thread until the
+ * upstream terminates.
+ *
+ *
+ *
+ * Note: the method will only return if the upstream terminates or the current
+ * thread is interrupted.
+ *
+ * This method executes the {@code Consumer} on the current thread while
+ * {@link #subscribe(Consumer)} executes the consumer on the original caller thread of the
+ * sequence.
+ *
+ * - Scheduler:
+ * - {@code blockingForEach} does not operate by default on a particular {@link Scheduler}.
+ * - Error handling:
+ * - If the source signals an error, the operator wraps a checked {@link Exception}
+ * into {@link RuntimeException} and throws that. Otherwise, {@code RuntimeException}s and
+ * {@link Error}s are rethrown as they are.
+ *
+ *
+ * @param onNext
+ * the {@link Consumer} to invoke for each item emitted by the {@code Observable}
+ * @param capacityHint
+ * the number of items expected to be buffered (allows reducing buffer reallocations)
+ * @throws RuntimeException
+ * if an error occurs; {@link Error}s and {@link RuntimeException}s are rethrown
+ * as they are, checked {@link Exception}s are wrapped into {@code RuntimeException}s
+ * @see ReactiveX documentation: Subscribe
+ * @see #subscribe(Consumer)
+ */
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @NonNull
+ public final void blockingForEach(@NonNull Consumer super T> onNext, int capacityHint) {
+ Iterator it = blockingIterable(capacityHint).iterator();
while (it.hasNext()) {
try {
onNext.accept(it.next());