【发布时间】:2018-06-20 21:45:29
【问题描述】:
我有两个遗留类 A 和 B,它们都需要设置一个回调,然后通过调用 execute() 函数来执行它。如果执行成功,将调用 onSuccess(),否则调用 onError()。但是B类的执行依赖于A类的执行结果。下面是sn-p的代码。
我知道如何将 A 和 B 分别转换为 Observable,因此我编写了一个 asObservable() 函数。但是,我不知道如何将它们与A到B的顺序结合起来。任何人都知道该怎么做,谢谢。
package rxjava2.exercise.com.example;
import org.reactivestreams.Publisher;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
public class RxJava2 {
public static void main(String[] args) {
A a = new A();
a.setCallback(new A.ACallback() {
@Override
public void onSuccess(final String text) {
final B b = new B(text);
b.setCallback(new B.BCallback() {
@Override
public void onSuccess(int result) {
System.out.println(result);
}
@Override
public void onError() {
}
});
b.execute();
}
@Override
public void onError() {
}
});
a.execute();
}
static class A {
private ACallback mCallback;
void setCallback(ACallback callback) {
mCallback = callback;
}
void execute() {
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
mCallback.onSuccess("5566");
}
}).start();
}
public interface ACallback {
void onSuccess(String text);
void onError();
}
Observable<String> asObservable() {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
setCallback(new ACallback() {
@Override
public void onSuccess(String text) {
emitter.onNext(text);
emitter.onComplete();
}
@Override
public void onError() {
emitter.onError(new IllegalArgumentException());
}
});
execute();
}
});
}
}
static class B {
private BCallback mCallback;
private String mText;
B(String text) {
mText = text;
}
void setCallback(BCallback callback) {
mCallback = callback;
}
void execute() {
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
mCallback.onSuccess(Integer.parseInt(mText + 7788));
}
}).start();
}
interface BCallback {
void onSuccess(int result);
void onError();
}
Observable<Integer> asObservable() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(final ObservableEmitter<Integer> emitter) throws Exception {
setCallback(new B.BCallback() {
@Override
public void onSuccess(int result) {
emitter.onNext(result);
emitter.onComplete();
}
@Override
public void onError() {
emitter.onError((new IllegalArgumentException()));
}
});
execute();
}
});
}
}
}
【问题讨论】:
标签: rx-java2