Operator Introduction
Operator: Processes emitted data and sends change propagation – changes are implemented through operators and can be propagated downward
1.RxJava1 operator source analysis
Func1 interface 2.Operator interface
1.1 RxJava1 instance
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if(! subscriber.isUnsubscribed()) { subscriber.onNext("1");
subscriber.onNext("2"); subscriber.onCompleted(); }}})./ / processing
map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return Integer.parseInt(s)+2;
}
}).
subscribe(new Observer<Integer>() {
@Override
public void onCompleted(a) {
Log.d("kpioneer"."onCompleted:");
}
@Override
public void onError(Throwable e) {}@Override
public void onNext(Integer integer) {
Log.d("kpioneer"."onNext:" + integer + ",integer instanceOf"+ integer.getClass()); }});Copy the code
run
06-11 10:10:20.008 14148-14148/com.haocai.rxjavademo D/kpioneer: onNext:3,integer instanceOfclass java.lang.Integer
06-11 10:10:20.008 14148-14148/com.haocai.rxjavademo D/kpioneer: onNext:4,integer instanceOfclass java.lang.Integer
06-11 10:10:20.008 14148-14148/com.haocai.rxjavademo D/kpioneer: onCompleted:
Copy the code
1.2RxJava1 operator source
RxJava1 OnSubscribeMap classes
public final class OnSubscribeMap<T.R> implements OnSubscribe<R> {
final Observable<T> source;
final Func1<? super T, ? extends R> transformer;
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
@Override
public void call(final Subscriber<? super R> o) {
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
source.unsafeSubscribe(parent);
}
static final class MapSubscriber<T.R> extends Subscriber<T> {
final Subscriber<? super R> actual;
final Func1<? super T, ? extends R> mapper;
boolean done;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
@Override
public void onError(Throwable e) {
if (done) {
RxJavaHooks.onError(e);
return;
}
done = true;
actual.onError(e);
}
@Override
public void onCompleted(a) {
if (done) {
return;
}
actual.onCompleted();
}
@Override
public void setProducer(Producer p) { actual.setProducer(p); }}}Copy the code
RxJava1 OnSubscribeLift classes
public final class OnSubscribeLift<T.R> implements OnSubscribe<R> {
final OnSubscribe<T> parent;
final Operator<? extends R, ? super T> operator;
public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handlingExceptions.throwIfFatal(e); st.onError(e); }}catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to uso.onError(e); }}}Copy the code
1.3 Transformation principle (core operator lift) :
1. Accept the current Operator 2 of the original OnSubscribe. Create a new OnSubscribe and return a new Observable
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
Copy the code
3. Package the old Subscriber with the new one
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
source.unsafeSubscribe(parent);
Copy the code
4. Complete the conversion in the new Subscriber and then transfer it to the old Subscriber
@Override
public void onNext(T t) {
R result;
try {
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
Copy the code
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
Copy the code
Analysis of the
The core implementation uses a proxy mechanism
2.RxJava2 operator source analysis
2.1. RxJava2 instance
Observable.
create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
if(! e.isDisposed()) { e.onNext("1");
e.onNext("2");
e.onComplete();
}
}
}).
map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s)+2;
}
}).
subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d("kpioneer"."onSubscribe:");
}
@Override
public void onNext(Integer value) {
Log.d("kpioneer"."onNext:" + value);
}
@Override
public void onError(Throwable e) {}@Override
public void onComplete(a) {
Log.d("kpioneer"."onComplete"); }}); Flowable. create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
if(! e.isCancelled()) { e.onNext("1");
e.onNext("2");
e.onComplete();
}
}
}, BackpressureStrategy.DROP).
map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s)+2;
}
}).
subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
Log.d("kpioneer"."onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d("kpioneer"."onNext:" + integer);
}
@Override
public void onError(Throwable t) {}@Override
public void onComplete(a) {
Log.d("kpioneer"."onComplete"); }});Copy the code
run
06-11 10:20:56. 688. 16675-16675 / com haocai. Rxjavademo D/kpioneer: onSubscribe: 06-11 10:20:56. 688. 16675-16675 / com haocai. Rxjavademo D/kpioneer: 06-11 10:20:56 onNext: 3. 698. 16675-16675 / com haocai. Rxjavademo D/kpioneer: 06-11 10:20:56 onNext: 4. 698. 16675-16675 / com haocai. Rxjavademo D/kpioneer: The onComplete 06-11 10:20:56. 758, 16675-16675 / com. Haocai. Rxjavademo D/kpioneer: 06-11 onSubscribe 10:20:56. 758, 16675-16675 / com. Haocai. Rxjavademo D/kpioneer: 06-11 10:20:56 onNext: 3. 758. 16675-16675 / com haocai. Rxjavademo D/kpioneer: 06-11 10:20:56 onNext: 4. 768. 16675-16675 / com haocai. Rxjavademo D/kpioneer: the onCompleteCopy the code
2.2.RxJava2 operator source code
Function interface
ObservableMap: No back pressure
public final class ObservableMap<T.U> extends AbstractObservableWithUpstream<T.U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T.U> extends BasicFuseableObserver<T.U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if(sourceMode ! = NONE) { actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll(a) throws Exception {
T t = qs.poll();
returnt ! =null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null; }}}Copy the code
1. ObservableMap inherited AbstractObservableWithUpstream abstract class 2. Using the AbstractObservableWithUpstream subscribeActual method in 3. Subscribe to the Observer from the original Observable
public final class ObservableLift<R.T> extends AbstractObservableWithUpstream<T.R> {
/** The actual operator. */
final ObservableOperator<? extends R, ? super T> operator;
public ObservableLift(ObservableSource<T> source, ObservableOperator<? extends R, ? super T> operator) {
super(source);
this.operator = operator;
}
@Override
public void subscribeActual(Observer<? super R> s) {
Observer<? super T> observer;
try {
observer = ObjectHelper.requireNonNull(operator.apply(s), "Operator " + operator + " returned a null Observer");
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Disposable already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
thrownpe; } source.subscribe(observer); }}Copy the code
FlowableMap: Has back pressure
public final class FlowableMap<T.U> extends AbstractFlowableWithUpstream<T.U> {
final Function<? super T, ? extends U> mapper;
public FlowableMap(Flowable<T> source, Function<? super T, ? extends U> mapper) {
super(source);
this.mapper = mapper;
}
@Override
protected void subscribeActual(Subscriber<? super U> s) {
if (s instanceof ConditionalSubscriber) {
source.subscribe(new MapConditionalSubscriber<T, U>((ConditionalSubscriber<? super U>)s, mapper));
} else {
source.subscribe(newMapSubscriber<T, U>(s, mapper)); }}static final class MapSubscriber<T.U> extends BasicFuseableSubscriber<T.U> {
final Function<? super T, ? extends U> mapper;
MapSubscriber(Subscriber<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if(sourceMode ! = NONE) { actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll(a) throws Exception {
T t = qs.poll();
returnt ! =null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null; }}static final class MapConditionalSubscriber<T.U> extends BasicFuseableConditionalSubscriber<T.U> {
final Function<? super T, ? extends U> mapper;
MapConditionalSubscriber(ConditionalSubscriber<? super U> actual, Function<? super T, ? extends U> function) {
super(actual);
this.mapper = function;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if(sourceMode ! = NONE) { actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
@Override
public boolean tryOnNext(T t) {
if (done) {
return false;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return true;
}
return actual.tryOnNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll(a) throws Exception {
T t = qs.poll();
returnt ! =null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null; }}Copy the code
1. FlowableMap inherited AbstractFlowableWithUpstream 2. Use of AbstractFlowableWithUpstream subscribeActual method 3. Subscribe the converted Subscriber FlowableLift with the original Flowable
public final class FlowableLift<R.T> extends AbstractFlowableWithUpstream<T.R> {
/** The actual operator. */
final FlowableOperator<? extends R, ? super T> operator;
public FlowableLift(Flowable<T> source, FlowableOperator<? extends R, ? super T> operator) {
super(source);
this.operator = operator;
}
@Override
public void subscribeActual(Subscriber<? super R> s) {
try {
Subscriber<? super T> st = operator.apply(s);
if (st == null) {
throw new NullPointerException("Operator " + operator + " returned a null Subscriber");
}
source.subscribe(st);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Subscription has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
thrownpe; }}}Copy the code
2.3 the Operator interface
1. Implement this interface 2. Transform in subscribeActual 3. Use to extend custom operatorsCopy the code
Analysis of the
RxJava2 with and without back pressure core implementations use a proxy mechanism
3.RxJava1 operator function imitation implementation
Operator interface implementation
- The Operator interface is an abstract interface to the Operator
- The Operator interface is used to handle specific transformations
Lift operator
- The basic principle of transformation
- Each Operator implements the Operator interface and calls the lift Operator
The map operators
- The most basic operator
- As the name suggests, used for mapping
public class Caller<T> {
final OnCall<T> onCall;
public Caller(OnCall<T> onCall) {
this.onCall = onCall;
}
public static <T> Caller<T> create(OnCall<T> onCall) {
return new Caller<>(onCall);
}
public Calling call(Receiver<T> receiver) {
this.onCall.call(receiver);
return receiver;
}
public final <R> Caller<R> lift(final Operator<R, T> operator) {
return create(new OnCallLift<>(onCall, operator));
}
public final <R> Caller<R> map(Func1<T, R> func) {
return lift(new MapOperator<T, R>(func));
}
public interface OnCall<T> extends Action1<Receiver<T>> {}public interface Operator<R.T> extends Func1<Receiver<R>, Receiver<T>> {}}Copy the code
public interface Func1<T.R>{
R call(T t);
}
Copy the code
public class MapOperator<T.R> implements Caller.Operator<R.T> {
private final Func1<T, R> mapper;
public MapOperator(Func1<T, R> mapper) {
this.mapper = mapper;
}
@Override
public Receiver<T> call(Receiver<R> rReceiver) {
return new MapReceiver<>(rReceiver, this.mapper); }}Copy the code
阿鲁纳恰尔邦
public class MapReceiver<T, R> extends Receiver<T> { private final Receiver<R> actual; private final Func1<T, R> mapper; public MapReceiver(Receiver<R> actual, Func1<T, R> mapper) { this.actual = actual; this.mapper = mapper; } @Override public void onCompleted() { this.actual.onCompleted(); } @Override public void onError(Throwable t) { this.actual.onError(t); } @Override public void onReceive(T t) { R tR = this.mapper.call(t); this.actual.onReceive(tR); }}Copy the code
public class OnCallLift<T.R> implements Caller.OnCall<R> {
private final Caller.OnCall<T> parent;
private final Caller.Operator<R, T> operator;
public OnCallLift(Caller.OnCall<T> parent, Caller.Operator<R, T> operator) {
this.parent = parent;
this.operator = operator;
}
@Override
public void call(Receiver<R> rReceiver) {
Receiver<T> tReceiver = this.operator.call(rReceiver);
this.parent.call(tReceiver); }}Copy the code
call
public class Lesson2_2Activity extends AppCompatActivity {
@Override
protected void onCreate(final Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_custom_test);
ButterKnife.bind(this);
}
@OnClick(R.id.testDo)
public void onViewClicked(a) {
Caller.
create(new Caller.OnCall<String>() {
@Override
public void call(Receiver<String> stringReceiver) {
if(! stringReceiver.isUnCalled()) { stringReceiver.onReceive("1");
stringReceiver.onReceive("2");
stringReceiver.onCompleted();
}
}
}).
map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return Integer.parseInt(s)+2;
}
}).
call(new Receiver<Integer>() {
@Override
public void onCompleted(a) {
Log.d("kpioneer"."onCompleted");
}
@Override
public void onError(Throwable t) {}@Override
public void onReceive(Integer integer) {
Log.d("kpioneer"."onReceive:"+ integer); }}); }}Copy the code
The Log output
06-11 16:45:26. 988. 10850-10850 / com haocai. Rxjavademo D/kpioneer: 06-11 16:45:26 onReceive: 3. 988. 10850-10850 / com haocai. Rxjavademo D/kpioneer: 06-11 16:45:26 onReceive: 4. 988. 10850-10850 / com haocai. Rxjavademo D/kpioneer: onCompletedCopy the code
3.RxJava2(no back pressure) operator function imitation implementation
CallerWithUpstream (similar to AbstractObservableWithUpstream)
- An abstract class
- Have callActual method
- The implementation operator implements this method
The map operators
- The most basic operator
- As the name suggests, used for mapping
CallerOperator interface
- I’m doing the transformation in callActual
- Can be used to extend operators
Related codes:
public abstract class Caller<T> {
public static <T> Caller<T> create(CallerOnCall<T> callerOnCall) {
return new CallerCreate<>(callerOnCall);
}
public void call(Callee<T> callee) {
callActual(callee);
}
protected abstract void callActual(Callee<T> callee);
public <R> Caller<R> lift(CallerOperator<R, T> operator) {
return new CallerLift<>(this, operator);
}
public <R> Caller<R> map(Function<T, R> function) {
return new CallerMap<>(this, function); }}Copy the code
public interface CallerOperator<T.R> {
Callee<R> call(Callee<T> callee);
}
Copy the code
/** * Created by Xionghu on 2018/6/11. * Desc: Return source Caller */
public interface CallerSource<T> {
Caller<T> source(a);
}
Copy the code
阿鲁纳恰尔邦
public abstract class CallerWithUpstream<T, R> extends Caller<R> implements CallerSource<T> { protected final Caller<T> source; public CallerWithUpstream(Caller<T> source) { this.source = source; } @Override public Caller<T> source() { return source; }}Copy the code
public class CallerLift<R.T> extends CallerWithUpstream<T.R> {
private final CallerOperator<R, T> mOperator;
public CallerLift(Caller<T> source, CallerOperator<R, T> mOperator) {
super(source);
this.mOperator = mOperator;
}
@Override
protected void callActual(Callee<R> callee) { Callee<T> tCallee = mOperator.call(callee); source.call(tCallee); }}Copy the code
public interface Function<T.R> {
R call(T t);
}
Copy the code
public class CallerMap<T.R> extends CallerWithUpstream<T.R> {
private Function<T, R> function;
public CallerMap(Caller<T> source, Function<T, R> function) {
super(source);
this.function = function;
}
@Override
protected void callActual(Callee<R> callee) {
source.call(new MapCallee<>(callee, function));
}
static class MapCallee<T.R> implements Callee<T> {
private final Callee<R> mCallee;
private final Function<T, R> mFunction;
public MapCallee(Callee<R> mCallee, Function<T, R> mFunction) {
this.mCallee = mCallee;
this.mFunction = mFunction;
}
@Override
public void onCall(Release release) {
mCallee.onCall(release);
}
@Override
public void onReceive(T t) {
R tR = mFunction.call(t);
mCallee.onReceive(tR);
}
@Override
public void onCompleted(a) {
mCallee.onCompleted();
}
@Override
public void onError(Throwable t) { mCallee.onError(t); }}}Copy the code
/** * Created by Xionghu on 2018/6/11. * Desc: Created by Xionghu on 2018/6/11. ** * Created by Xionghu on 2018/6/11
public class Lesson2_3Activity extends AppCompatActivity {
@Override
protected void onCreate(final Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_custom_test);
ButterKnife.bind(this);
}
@OnClick(R.id.testDo)
public void onViewClicked(a) {
Caller.
create(new CallerOnCall<String>() {
@Override
public void call(CallerEmitter<String> callerEmitter) {
callerEmitter.onReceive("1");
callerEmitter.onReceive("2");
callerEmitter.onCompleted();
}
}).
map(new Function<String, Integer>() {
@Override
public Integer call(String s) {
return Integer.parseInt(s);
}
}).
call(new Callee<Integer>() {
@Override
public void onCall(Release release) {
Log.d("kpioneer"."onCall");
}
@Override
public void onReceive(Integer integer) {
Log.d("kpioneer"."onReceive:" + integer);
}
@Override
public void onCompleted(a) {
Log.d("kpioneer"."onCompleted");
}
@Override
public void onError(Throwable t) {}}); Caller. create(new CallerOnCall<String>() {
@Override
public void call(CallerEmitter<String> callerEmitter) {
callerEmitter.onReceive("3");
callerEmitter.onReceive("4");
callerEmitter.onCompleted();
}
}).
lift(new CallerOperator<Integer, String>() {
@Override
public Callee<String> call(final Callee<Integer> callee) {
return new Callee<String>() {
@Override
public void onCall(Release release) {
callee.onCall(release);
}
@Override
public void onReceive(String s) {
callee.onReceive(Integer.parseInt(s));
}
@Override
public void onCompleted(a) {
callee.onCompleted();
}
@Override
public void onError(Throwable t) { callee.onError(t); }}; } }). call(new Callee<Integer>() {
@Override
public void onCall(Release release) {
Log.d("kpioneer"."onCall");
}
@Override
public void onReceive(Integer integer) {
Log.d("kpioneer"."onReceive:" + integer);
}
@Override
public void onCompleted(a) {
Log.d("kpioneer"."onCompleted");
}
@Override
public void onError(Throwable t) {
Log.d("kpioneer"."onError"); }}); }}Copy the code
06-11 18:03:27. 268. 24409-24409 / com haocai. Rxjavademo D/kpioneer: 06-11 18:03:27 onCall. 268, 24409-24409 / com. Haocai. Rxjavademo D/kpioneer: 06-11 18:03:27 onReceive: 1. 268. 24409-24409 / com haocai. Rxjavademo D/kpioneer: 06-11 18:03:27 onReceive: 2. 268, 24409-24409 / com. Haocai. Rxjavademo D/kpioneer: 06-11 onCompleted 18:03:27. 268, 24409-24409 / com. Haocai. Rxjavademo D/kpioneer: 06-11 18:03:27 onCall. 268, 24409-24409 / com. Haocai. Rxjavademo D/kpioneer: 06-11 18:03:27 onReceive: 3. 268. 24409-24409 / com haocai. Rxjavademo D/kpioneer: 06-11 18:03:27 onReceive: 4. 268. 24409-24409 / com haocai. Rxjavademo D/kpioneer: onCompletedCopy the code
4.RxJava2(with back pressure) operator function imitation write implementation
TelephonerOperator interface
- I’m doing the transformation in callActual
- Can be used to extend operators
TelephonerWithUpstream (similar to AbstractObservableWithUpstream)
- An abstract class
- Have callActual method
- The implementation operator implements this method
Related to the source code
public abstract class Telephoner<T> {
public static <T> Telephoner<T> create(TelephonerOnCall<T> telephonerOnCall){
return new TelephonerCreate<>(telephonerOnCall);
}
public void call(Receiver<T> receiver) { callActual(receiver); }protected abstract void callActual(Receiver<T> receiver);
public <R> Telephoner<R> map(Function<T, R> function) {
return new TelephonerMap<>(this, function);
}
public <R> Telephoner<R> lift(TelephonerOperator<R, T> telephonerOperator) {
return new TelephonerLift<>(this, telephonerOperator); }}Copy the code
/** * Created by Xionghu on 2018/6/12. * Desc: lift operator */
public class TelephonerLift<R.T> extends TelephonerWithUpstream<T.R> {
private final TelephonerOperator<R, T> operator;
public TelephonerLift(Telephoner<T> source, TelephonerOperator<R, T> operator) {
super(source);
this.operator = operator;
}
@Override
protected void callActual(Receiver<R> receiver) { Receiver<T> tReceiver = operator.call(receiver); source.call(tReceiver); }}Copy the code
import com.haocai.mylibrary.rxJava2.Function;
/** * Created by Xionghu on 2018/6/12. * Desc: map operator */
public class TelephonerMap<T.R> extends TelephonerWithUpstream<T.R> {
private Function<T, R> trFunction;
public TelephonerMap(Telephoner<T> source, Function<T, R> trFunction) {
super(source);
this.trFunction = trFunction;
}
@Override
protected void callActual(Receiver<R> receiver) {
source.call(new MapReceiver<>(receiver, trFunction));
}
static class MapReceiver<T.R> implements Receiver<T> {
private final Receiver<R> rReceiver;
private final Function<T, R> trFunction;
public MapReceiver(Receiver<R> rReceiver, Function<T, R> trFunction) {
this.rReceiver = rReceiver;
this.trFunction = trFunction;
}
@Override
public void onCall(Drop d) {
rReceiver.onCall(d);
}
@Override
public void onReceive(T t) {
R tr = trFunction.call(t);
rReceiver.onReceive(tr);
}
@Override
public void onError(Throwable t) {
rReceiver.onError(t);
}
@Override
public void onCompleted(a) { rReceiver.onCompleted(); }}}Copy the code
/** * Created by Xionghu on 2018/6/12. * Desc: operator interface */
public interface TelephonerOperator<T.R> {
Receiver<R> call(Receiver<T> callee);
}
Copy the code
阿鲁纳恰尔邦
/** * Created by Xionghu on 2018/6/11. * Desc: Telephoner */ public interface TelephonerSource<T> {Telephoner<T> source(); }Copy the code
public abstract class TelephonerWithUpstream<T.R> extends Telephoner<R> implements TelephonerSource {
protected final Telephoner<T> source;
public TelephonerWithUpstream(Telephoner<T> source) {
this.source = source;
}
@Override
public Telephoner source(a) {
returnsource; }}Copy the code
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import com.haocai.mylibrary.rxJava2.Function;
import com.haocai.mylibrary.rxJava2.backpressure.Drop;
import com.haocai.mylibrary.rxJava2.backpressure.Receiver;
import com.haocai.mylibrary.rxJava2.backpressure.Telephoner;
import com.haocai.mylibrary.rxJava2.backpressure.TelephonerEmitter;
import com.haocai.mylibrary.rxJava2.backpressure.TelephonerOnCall;
import com.haocai.mylibrary.rxJava2.backpressure.TelephonerOperator;
import com.haocai.rxjavademo.R;
import butterknife.ButterKnife;
import butterknife.OnClick;
/** * Created by Xionghu on 2018/6/11. * Desc: Created by Xionghu on 2018/6/11. ** * Created by Xionghu on 2018/6/11
public class Lesson2_4Activity extends AppCompatActivity {
@Override
protected void onCreate(final Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_custom_test);
ButterKnife.bind(this);
}
@OnClick(R.id.testDo)
public void onViewClicked(a) {
Telephoner.
create(new TelephonerOnCall<String>() {
@Override
public void call(TelephonerEmitter<String> telephonerEmitter) {
telephonerEmitter.onReceive("1");
telephonerEmitter.onReceive("2");
telephonerEmitter.onCompleted();
}
}).
map(new Function<String, Integer>() {
@Override
public Integer call(String s) {
return Integer.parseInt(s);
}
}).
call(new Receiver<Integer>() {
@Override
public void onCall(Drop d) {
d.request(Long.MAX_VALUE);
Log.d("kpioneer"."onCall");
}
@Override
public void onReceive(Integer integer) {
Log.d("kpioneer"."onReceive:" + integer);
}
@Override
public void onError(Throwable t) {
Log.d("kpioneer"."onError");
}
@Override
public void onCompleted(a) {
Log.d("kpioneer"."onCompleted"); }}); Telephoner. create(new TelephonerOnCall<String>() {
@Override
public void call(TelephonerEmitter<String> telephonerEmitter) {
telephonerEmitter.onReceive("3");
telephonerEmitter.onReceive("4");
telephonerEmitter.onCompleted();
}
}).
lift(new TelephonerOperator<Integer, String>() {
@Override
public Receiver<String> call(final Receiver<Integer> receiver) {
return new Receiver<String>() {
@Override
public void onCall(Drop d) {
receiver.onCall(d);
}
@Override
public void onReceive(String s) {
receiver.onReceive(Integer.parseInt(s));
}
@Override
public void onError(Throwable t) {
receiver.onError(t);
}
@Override
public void onCompleted(a) { receiver.onCompleted(); }}; } }). call(new Receiver<Integer>() {
@Override
public void onCall(Drop d) {
d.request(Long.MAX_VALUE);
Log.d("kpioneer"."onCall");
}
@Override
public void onReceive(Integer integer) {
Log.d("kpioneer"."onReceive:" + integer);
}
@Override
public void onError(Throwable t) {
Log.d("kpioneer"."onError");
}
@Override
public void onCompleted(a) {
Log.d("kpioneer"."onCompleted"); }}); }}Copy the code
6-12 09:56:50. 108, 22364-22364 / com. Haocai. Rxjavademo D/kpioneer: 06-12 09:56:50 onCall. 108, 22364-22364 / com. Haocai. Rxjavademo D/kpioneer: 06-12 09:56:50 onReceive: 1. 108. 22364-22364 / com haocai. Rxjavademo D/kpioneer: 06-12 09:56:50 onReceive: 2. 108. 22364-22364 / com haocai. Rxjavademo D/kpioneer: OnCompleted 06-12 09:56:50. 108, 22364-22364 / com. Haocai. Rxjavademo D/kpioneer: 06-12 09:56:50 onCall. 108, 22364-22364 / com. Haocai. Rxjavademo D/kpioneer: 06-12 09:56:50 onReceive: 3. 108. 22364-22364 / com haocai. Rxjavademo D/kpioneer: 06-12 09:56:50 onReceive: 4. 108. 22364-22364 / com haocai. Rxjavademo D/kpioneer: onCompletedCopy the code