是的,您在 .combineLatest() 中处理任意数量的 Observable,但仍有解决方法。我对这个问题很感兴趣,并提出了以下解决方案——我们可以存储有关某些数据源的信息——最后一个值和源 ID(字符串和资源 ID),并将所有数据隧道化到某个公共管道中。为此,我们可以使用 PublishSubject。我们还需要跟踪连接状态,因为我们应该在订阅时将订阅保存到每个源,并在我们从该源取消订阅时将其切断。
我们存储来自每个源的最后数据,因此我们可以告诉用户哪个源刚刚发出新值,回调将只包含源 id。用户可以通过源 ID 获取任何源的最后一个值。
我想出了以下代码:
import android.util.Log;
import android.widget.EditText;
import com.jakewharton.rxbinding.widget.RxTextView;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action1;
import rx.subjects.PublishSubject;
public class MultiSourceCombinator {
String LOG_TAG = MultiSourceCombinator.class.getSimpleName();
/**
* We can't handle arbitrary number of sources by CombineLatest, but we can pass data along
* with information about source (sourceId)
*/
private static class SourceData{
String data = "";
Integer sourceId = 0;
}
/**
* Keep id of source, subscription to that source and last value emitted
* by source. This value is passed when source is attached
*/
private class SourceInfo{
Subscription sourceTracking;
Integer sourceId;
SourceData lastData;
SourceInfo(int sourceId, String data){
this.sourceId = sourceId;
// initialize last data with empty value
SourceData d = new SourceData();
d.data = data;
d.sourceId = sourceId;
this.lastData = d;
}
}
/**
* We can tunnel data from all sources into single pipe. Subscriber can treat it as
* Observable<SourceData>
*/
private PublishSubject<SourceData> dataDrain;
/**
* Stores all sources by their ids.
*/
Map<Integer, SourceInfo> sources;
/**
* Callback, notified whenever source emit new data. it receives source id.
* When notification is received by client, it can get value from source by using
* getLastSourceValue(sourceId) method
*/
Action1<Integer> sourceUpdateCallback;
public MultiSourceCombinator(){
dataDrain = PublishSubject.create();
sources = new HashMap<>();
sourceUpdateCallback = null;
// We have to process data, ccoming from common pipe
dataDrain.asObservable()
.subscribe(newValue -> {
if (sourceUpdateCallback == null) {
Log.w(LOG_TAG, "Source " + newValue.sourceId + "emitted new value, " +
"but used did't set callback ");
} else {
sourceUpdateCallback.call(newValue.sourceId);
}
});
}
/**
* Disconnect from all sources (sever Connection (s))
*/
public void stop(){
Log.i(LOG_TAG, "Unsubscribing from all sources");
// copy references to aboid ConcurrentModificatioinException
ArrayList<SourceInfo> t = new ArrayList(sources.values());
for (SourceInfo si : t){
removeSource(si.sourceId);
}
// right now there must be no active sources
if (!sources.isEmpty()){
throw new RuntimeException("There must be no active sources");
}
}
/**
* Create new source from edit field, subscribe to this source and save subscription for
* further tracking.
* @param editText
*/
public void addSource(EditText editText, int sourceId){
if (sources.containsKey(sourceId)){
Log.e(LOG_TAG, "Source with id " + sourceId + " already exist");
return;
}
Observable<CharSequence> source = RxTextView.textChanges(editText).skip(1);
String lastValue = editText.getText().toString();
Log.i(LOG_TAG, "Source with id " + sourceId + " has data " + lastValue);
// Redirect data coming from source to common pipe, to do that attach source id to
// data string
Subscription sourceSubscription = source.subscribe(text -> {
String s = new String(text.toString());
SourceData nextValue = new SourceData();
nextValue.sourceId = sourceId;
nextValue.data = s;
Log.i(LOG_TAG, "Source " + sourceId + "emits new value: " + s);
// save vlast value
sources.get(sourceId).lastData.data = s;
// pass new value down pipeline
dataDrain.onNext(nextValue);
});
// create SourceInfo
SourceInfo sourceInfo = new SourceInfo(sourceId, lastValue);
sourceInfo.sourceTracking = sourceSubscription;
sources.put(sourceId, sourceInfo);
}
/**
* Unsubscribe source from common pipe and remove it from list of sources
* @param sourceId
* @throws IllegalArgumentException
*/
public void removeSource(Integer sourceId) throws IllegalArgumentException {
if (!sources.containsKey(sourceId)){
throw new IllegalArgumentException("There is no source with id: " + sourceId);
}
SourceInfo si = sources.get(sourceId);
Subscription s = si.sourceTracking;
if (null != s && !s.isUnsubscribed()){
Log.i(LOG_TAG, "source " + sourceId + " is active, unsubscribing from it");
si.sourceTracking.unsubscribe();
si.sourceTracking = null;
}
// source is disabled, remove it from list
Log.i(LOG_TAG, "Source " + sourceId + " is disabled ");
sources.remove(sourceId);
}
/**
* User can get value from any source by using source ID.
* @param sourceId
* @return
* @throws IllegalArgumentException
*/
public String getLastSourceValue(Integer sourceId) throws IllegalArgumentException{
if (!sources.containsKey(sourceId)){
throw new IllegalArgumentException("There is no source with id: " + sourceId);
}
String lastValue = sources.get(sourceId).lastData.data;
return lastValue;
}
public void setSourceUpdateCallback(Action1<Integer> sourceUpdateFeedback) {
this.sourceUpdateCallback = sourceUpdateFeedback;
}
}
我们可以像这样在 UI 中使用它:
import android.app.Activity;
import android.os.Bundle;
import android.util.Log;
import android.widget.EditText;
import android.widget.Toast;
import butterknife.BindView;
import butterknife.ButterKnife;
public class EdiTextTestActivity extends Activity {
@BindView(R.id.aet_et1)
public EditText et1;
@BindView(R.id.aet_et2)
public EditText et2;
@BindView(R.id.aet_et3)
public EditText et3;
private MultiSourceCombinator multiSourceCombinator;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_edit_text_test);
ButterKnife.bind(this);
multiSourceCombinator = new MultiSourceCombinator();
multiSourceCombinator.setSourceUpdateCallback(id -> {
Toast.makeText(EdiTextTestActivity.this, "New value from source: " + id + " : " +
multiSourceCombinator.getLastSourceValue(id), Toast.LENGTH_SHORT).show();
});
}
@Override
protected void onPause() {
// stop tracking all fields
multiSourceCombinator.stop();
super.onPause();
}
@Override
protected void onResume() {
super.onResume();
// Register fields
multiSourceCombinator.addSource(et1, R.id.aet_et1);
multiSourceCombinator.addSource(et2, R.id.aet_et2);
multiSourceCombinator.addSource(et3, R.id.aet_et3);
}
}