Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,31 +34,25 @@
public class BlockingObservableNextTest extends RxJavaTest {

private void fireOnNextInNewThread(final Subject<String> o, final String value) {
new Thread() {
@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// ignore
}
o.onNext(value);
new Thread(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// ignore
}
}.start();
o.onNext(value);
}).start();
}

private void fireOnErrorInNewThread(final Subject<String> o) {
new Thread() {
@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// ignore
}
o.onError(new TestException());
new Thread(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// ignore
}
}.start();
o.onError(new TestException());
}).start();
}

static <T> Iterable<T> next(ObservableSource<T> source) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,36 +285,33 @@ public void recursionAndOuterUnsubscribe() throws InterruptedException {
final CountDownLatch completionLatch = new CountDownLatch(1);
final Worker inner = getScheduler().createWorker();
try {
Flowable<Integer> obs = Flowable.unsafeCreate(new Publisher<Integer>() {
@Override
public void subscribe(final Subscriber<? super Integer> subscriber) {
inner.schedule(new Runnable() {
@Override
public void run() {
subscriber.onNext(42);
latch.countDown();
Flowable<Integer> obs = Flowable.unsafeCreate((Publisher<Integer>) subscriber -> {
inner.schedule(new Runnable() {
@Override
public void run() {
subscriber.onNext(42);
latch.countDown();

// this will recursively schedule this task for execution again
inner.schedule(this);
}
});
// this will recursively schedule this task for execution again
inner.schedule(this);
}
});

subscriber.onSubscribe(new Subscription() {
subscriber.onSubscribe(new Subscription() {

@Override
public void cancel() {
inner.dispose();
subscriber.onComplete();
completionLatch.countDown();
}
@Override
public void cancel() {
inner.dispose();
subscriber.onComplete();
completionLatch.countDown();
}

@Override
public void request(long n) {
@Override
public void request(long n) {

}
});
}
});

}
});

final AtomicInteger count = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,35 +277,32 @@ public void run() {

@Test
public final void recursiveSchedulerInObservable() {
Flowable<Integer> obs = Flowable.unsafeCreate(new Publisher<Integer>() {
@Override
public void subscribe(final Subscriber<? super Integer> subscriber) {
final Scheduler.Worker inner = getScheduler().createWorker();
Flowable<Integer> obs = Flowable.unsafeCreate((Publisher<Integer>) subscriber -> {
final Scheduler.Worker inner = getScheduler().createWorker();

AsyncSubscription as = new AsyncSubscription();
subscriber.onSubscribe(as);
as.setResource(inner);
AsyncSubscription as = new AsyncSubscription();
subscriber.onSubscribe(as);
as.setResource(inner);

inner.schedule(new Runnable() {
int i;
inner.schedule(new Runnable() {
int i;

@Override
public void run() {
if (i > 42) {
try {
subscriber.onComplete();
} finally {
inner.dispose();
}
return;
@Override
public void run() {
if (i > 42) {
try {
subscriber.onComplete();
} finally {
inner.dispose();
}
return;
}

subscriber.onNext(i++);
subscriber.onNext(i++);

inner.schedule(this);
}
});
}
inner.schedule(this);
}
});
});

final AtomicInteger lastValue = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,21 +186,18 @@ public final void nestedSchedule() {
final Runnable calledOp = mock(Runnable.class);

Flowable<Object> poller;
poller = Flowable.unsafeCreate(new Publisher<Object>() {
@Override
public void subscribe(final Subscriber<? super Object> aSubscriber) {
final BooleanSubscription bs = new BooleanSubscription();
aSubscriber.onSubscribe(bs);
inner.schedule(new Runnable() {
@Override
public void run() {
if (!bs.isCancelled()) {
calledOp.run();
inner.schedule(this, 5, TimeUnit.SECONDS);
}
poller = Flowable.unsafeCreate((Publisher<Object>) aSubscriber -> {
final BooleanSubscription bs = new BooleanSubscription();
aSubscriber.onSubscribe(bs);
inner.schedule(new Runnable() {
@Override
public void run() {
if (!bs.isCancelled()) {
calledOp.run();
inner.schedule(this, 5, TimeUnit.SECONDS);
}
});
}
}
});
});

InOrder inOrder = Mockito.inOrder(calledOp);
Expand Down