diff --git a/src/test/java/io/reactivex/rxjava4/internal/operators/observable/BlockingObservableNextTest.java b/src/test/java/io/reactivex/rxjava4/internal/operators/observable/BlockingObservableNextTest.java index bfc0abfa37..2cd59e502f 100644 --- a/src/test/java/io/reactivex/rxjava4/internal/operators/observable/BlockingObservableNextTest.java +++ b/src/test/java/io/reactivex/rxjava4/internal/operators/observable/BlockingObservableNextTest.java @@ -34,31 +34,25 @@ public class BlockingObservableNextTest extends RxJavaTest { private void fireOnNextInNewThread(final Subject o, final String value) { - new Thread() { - @Override - public void run() { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - // ignore - } - o.onNext(value); + new Thread(() -> { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + // ignore } - }.start(); + o.onNext(value); + }).start(); } private void fireOnErrorInNewThread(final Subject o) { - new Thread() { - @Override - public void run() { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - // ignore - } - o.onError(new TestException()); + new Thread(() -> { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + // ignore } - }.start(); + o.onError(new TestException()); + }).start(); } static Iterable next(ObservableSource source) { diff --git a/src/test/java/io/reactivex/rxjava4/schedulers/AbstractSchedulerConcurrencyTests.java b/src/test/java/io/reactivex/rxjava4/schedulers/AbstractSchedulerConcurrencyTests.java index b0349c66ad..eadd1367e2 100644 --- a/src/test/java/io/reactivex/rxjava4/schedulers/AbstractSchedulerConcurrencyTests.java +++ b/src/test/java/io/reactivex/rxjava4/schedulers/AbstractSchedulerConcurrencyTests.java @@ -285,36 +285,33 @@ public void recursionAndOuterUnsubscribe() throws InterruptedException { final CountDownLatch completionLatch = new CountDownLatch(1); final Worker inner = getScheduler().createWorker(); try { - Flowable obs = Flowable.unsafeCreate(new Publisher() { - @Override - public void subscribe(final Subscriber subscriber) { - inner.schedule(new Runnable() { - @Override - public void run() { - subscriber.onNext(42); - latch.countDown(); + Flowable obs = Flowable.unsafeCreate((Publisher) subscriber -> { + inner.schedule(new Runnable() { + @Override + public void run() { + subscriber.onNext(42); + latch.countDown(); - // this will recursively schedule this task for execution again - inner.schedule(this); - } - }); + // this will recursively schedule this task for execution again + inner.schedule(this); + } + }); - subscriber.onSubscribe(new Subscription() { + subscriber.onSubscribe(new Subscription() { - @Override - public void cancel() { - inner.dispose(); - subscriber.onComplete(); - completionLatch.countDown(); - } + @Override + public void cancel() { + inner.dispose(); + subscriber.onComplete(); + completionLatch.countDown(); + } - @Override - public void request(long n) { + @Override + public void request(long n) { - } - }); + } + }); - } }); final AtomicInteger count = new AtomicInteger(); diff --git a/src/test/java/io/reactivex/rxjava4/schedulers/AbstractSchedulerTests.java b/src/test/java/io/reactivex/rxjava4/schedulers/AbstractSchedulerTests.java index e4574be0e9..42e21cd88a 100644 --- a/src/test/java/io/reactivex/rxjava4/schedulers/AbstractSchedulerTests.java +++ b/src/test/java/io/reactivex/rxjava4/schedulers/AbstractSchedulerTests.java @@ -277,35 +277,32 @@ public void run() { @Test public final void recursiveSchedulerInObservable() { - Flowable obs = Flowable.unsafeCreate(new Publisher() { - @Override - public void subscribe(final Subscriber subscriber) { - final Scheduler.Worker inner = getScheduler().createWorker(); + Flowable obs = Flowable.unsafeCreate((Publisher) subscriber -> { + final Scheduler.Worker inner = getScheduler().createWorker(); - AsyncSubscription as = new AsyncSubscription(); - subscriber.onSubscribe(as); - as.setResource(inner); + AsyncSubscription as = new AsyncSubscription(); + subscriber.onSubscribe(as); + as.setResource(inner); - inner.schedule(new Runnable() { - int i; + inner.schedule(new Runnable() { + int i; - @Override - public void run() { - if (i > 42) { - try { - subscriber.onComplete(); - } finally { - inner.dispose(); - } - return; + @Override + public void run() { + if (i > 42) { + try { + subscriber.onComplete(); + } finally { + inner.dispose(); } + return; + } - subscriber.onNext(i++); + subscriber.onNext(i++); - inner.schedule(this); - } - }); - } + inner.schedule(this); + } + }); }); final AtomicInteger lastValue = new AtomicInteger(); diff --git a/src/test/java/io/reactivex/rxjava4/schedulers/TestSchedulerTest.java b/src/test/java/io/reactivex/rxjava4/schedulers/TestSchedulerTest.java index 460286c735..3651ab0f89 100644 --- a/src/test/java/io/reactivex/rxjava4/schedulers/TestSchedulerTest.java +++ b/src/test/java/io/reactivex/rxjava4/schedulers/TestSchedulerTest.java @@ -186,21 +186,18 @@ public final void nestedSchedule() { final Runnable calledOp = mock(Runnable.class); Flowable poller; - poller = Flowable.unsafeCreate(new Publisher() { - @Override - public void subscribe(final Subscriber aSubscriber) { - final BooleanSubscription bs = new BooleanSubscription(); - aSubscriber.onSubscribe(bs); - inner.schedule(new Runnable() { - @Override - public void run() { - if (!bs.isCancelled()) { - calledOp.run(); - inner.schedule(this, 5, TimeUnit.SECONDS); - } + poller = Flowable.unsafeCreate((Publisher) aSubscriber -> { + final BooleanSubscription bs = new BooleanSubscription(); + aSubscriber.onSubscribe(bs); + inner.schedule(new Runnable() { + @Override + public void run() { + if (!bs.isCancelled()) { + calledOp.run(); + inner.schedule(this, 5, TimeUnit.SECONDS); } - }); - } + } + }); }); InOrder inOrder = Mockito.inOrder(calledOp);