Google opened source agera, a responsive framework, last week and believes it will slowly become familiar to programmers. Personally, I am very interested in such technology, and I have studied RxJava before, so I learned this framework at the first time after learning that Google has opened source. Here I share my learning experience with you. Of course, due to my limited level, this article may play more role is to throw a brick to attract jade, I hope that more gods can join the study of agera in the army, and strive for the early emergence of several convincing articles!
You may learn from this article:
1. What is Agera? That is, its basic concept and general framework.
2. Basic usage of agera.
3. Advanced usage of agera
4. Agera source code analysis.
5. How to package Agera.
5. Differences between Agera and RxJava.
All right, let’s officially start agera’s learning journey.
What is the agera
Before we answer the question of what agera is, we need to understand what responsive programming and functional programming are, and I won’t go into that here, you can go to Google or Wiki. Before agera came into being, there was a famous framework of the same type in Java called RxJava. The RxAndroid version derived from it is more suitable for Android and various “sons” like RxBus, RxBinding and so on are very impressive. So why did Google write agera? I’m not the one who wrote this framework, but one thing is for sure. As the “son” of Google, it has great potential and power in Android. I think this framework will be presented at the UPCOMING I/O conference.
Well, let’s talk more about Agera. Here’s a quote from agera’s GitHub page.
agera is a set of classes and interfaces to help wirte functional,asynchronous and reactive applications for Android.Requires Android SDK version 9 or higher.
Agera is a framework to help Android developers develop functional, asynchronous and responsive applications. It requires Android SDK version 9 or above.
Now that we know what Agera is, we also need to understand that it, like RxJava, is developed based on the Observer model, so there are some concepts that I’ll explain later in this article.
The basic usage of agera
Now that we’ve talked about what agera is, are you excited to try it? Let me take you through the basic usage of agera.
First of all, we should make it clear that since AGera is based on the observer model, what are the observers and the observed in it represented by?
In Agera, there are two basic concepts: Observable and Updatable.
Observable & Updatable
1
public interface Observable {
* Adds {@code updatable} to the {@code Observable}. * * @throws IllegalStateException if the {@link Updatable} was already added or if it was called * from a non-Looper thread */
void addUpdatable(@NonNull Updatable updatable);
* Removes {@code updatable} from the {@code Observable}. * * @throws IllegalStateException if the {@link Updatable} was not added */
void removeUpdatable(@NonNull Updatable updatable);
}
Copy the code
* Called when when an event has occurred. Can be added to {@link Observable}s to be notified * of {@link Observable} events. */
public interface Updatable {
* Called when an event has occurred. */
void update();
}
Copy the code
An Updatable refers to the observer in observer mode, and an Observable refers to the observed in observer mode. The whole Agera is developed on the basis of using Updatable to observe observables and observables to notify Updatable of updates. The code is to register an Updatable into an Observable using the addUpdatable() method of the Observable and call the Updatable update() method when appropriate to notify Updatable updates. Let’s look at a concrete example.
First, the interface is very simple, just a Button and a TextView. Our goal is to change the text display of the TextView after clicking the Button.
Copy the code
public class MainActivity extends AppCompatActivity implements Updatable{
private TextView show;
private Observable observable = new Observable() {
public void addUpdatable(@NonNull Updatable updatable) {
updatable.update();
}
public void removeUpdatable(@NonNull Updatable updatable) {
}
};
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
show = (TextView)findViewById(R.id.show);
}
public void trigger(View view){
observable.addUpdatable(this);
}
public void update() {
show.setText("update!!");
}
}
Copy the code
Looking at our activity code, the first thing we need to do is make our activity implement the Updatable interface, and then change the text of the TextView in the Update () method. Next, we create an Observable that uses the addUpdatable() method of an Observable when we click on a Button. The Observable we defined above calls the update() corresponding to the Updatable instance in its addUpdatable() method, thus completing a simple event subscription.
There is no data in the communication between an Observable and an Updatable. That is to say, when your Observable wants to pass data to the Updatable, it cannot do so. And it doesn’t matter what you do, because there’s no logic associated with the data in the corresponding method arguments.
You might look at this and say, “Well, that sucks! Talk about observer mode and responsive programming when you can’t transfer data! “Don’t worry, this is a deliberate move by Google to detach data from observables and Updatable to achieve the desired ‘Push event,pull data Model’. I’ll talk about this later in the comparison with RxJava, which is the “Push Data Model.”
Repository
The last a while before speaking understood why Google to do this, but still no solution for the data transfer, this time if you rushed to raise their issue to lot, they do and you say: “oh, you don’t always think about having a big news, you don’t ask me zi zi speech data transfer, of course I say zi resign. “
So how exactly does it, no, support data transfer? Google already provides us with an interface called Repository.
1
public interface Repository extends Observable, Supplier {}
Copy the code
As you can see, it descends from an Observable, which means it is an observed. What is this Supplier?
public interface Supplier { * Returns an instance of the appropriate type. The returned object may or may not be a new * instance, depending on the implementation. */ T get(); }Copy the code
And if you look at this code, and you give it the name of the interface, you can guess that this is something that provides data.
To sum up, Repository is both an observed and a source of data to the observer.
Let’s do the talking in code.
The interface is still the same, I don’t have to stick it here, I have a Button and a TextView.
private Supplier supplier = new Supplier() {
public Object get() {
return "update!!";
}
};
private Repository repository = Repositories.repositoryWithInitialValue("a")
.observe()
.onUpdatesPerLoop()
.thenGetFrom(supplier)
.compile();
public void trigger(View view){
repository.addUpdatable(this);
}
public void update() {
show.setText(repository.get());
}
Copy the code
After clicking Button(enter trigger(View View) method), we will do the same as before. We use addUpdatable to register our activity inherited from Updatable into Repository. Then Repository finds something registered to it and, after a series of method executions, calls Updatable’s Update () method. Get () to retrieve the corresponding data.
Here are a few basic but important concepts in Agera:
(1) Observable: the observed in Agera, which notifies the observer to update at an appropriate time.
(2) Updatable: Observer in Agera, used to observe observables.
(3) Supplier: the interface to provide data in Agera. It specifies the data type through the template and obtains the data through the get() method.
(4) Repository: an Observable that provides data integrating Observable and Supplier functions in Agera.
Agera is “Push event,pull data model”, which separates data from events.
No, it’s not. You can see this on GitHub:
In code, Repository calls Updatable’s Update () method after a series of method executions. This is event-passing, or push event, and Updatable receives a wake event. Get data yourself by calling the Repository get() method rather than fetching data from the updata() method. Update (T value) this is pull data. The nice thing about this is that you can lazy load, which we’ll talk about later.
Advanced usage of agera
Having covered the basic concepts of Agera, let’s take a look at its proper posture.
In the previous section, we talked about Repository, which is confusing to most people through the code. Let’s talk about it.
Repository
Let’s start with an example.
private Supplier strSupplier = new Supplier() { public String get() { return "value"; }}; private Function transform = new Function() { public String apply(@NonNull String input) { return "new " + input; }}; private Supplier integerSupplier = new Supplier() { public Integer get() { return 100; }}; private Merger merger = new Merger() { public String merge(@NonNull String s, @NonNull Integer integer) { return s + "plus " + String.valueOf(integer); }}; private Updatable updatable = new Updatable() { public void update() { Log.d("TAG", repository.get()); }}; repository = Repositories.repositoryWithInitialValue("default") .observe() .onUpdatesPerLoop() .getFrom(strSupplier) .transform(transform) .thenMergeIn(integerSupplier,merger) .compile(); repository.addUpdatable(updatable);Copy the code
The only part of this code that I believe you will understand is repository.addupDatable (updatable); This sentence…
Basically, pass an updatable through repository.addupDatable (updatable); This method is registered with the corresponding Repository, and the Repository notifies updatable of the update through a series of method calls. You can see the output in logcat
So what does the main piece of code mean?
private Supplier strSupplier = new Supplier() { public String get() { return "value"; }}; private Function transform = new Function() { public String apply(@NonNull String input) { return "new " + input; }}; private Supplier integerSupplier = new Supplier() { public Integer get() { return 100; }}; private Merger merger = new Merger() { public String merge(@NonNull String s, @NonNull Integer integer) { return s + " plus " + String.valueOf(integer); }}; private Updatable updatable = new Updatable() { public void update() { Log.d("TAG", repository.get()); }}; repository = Repositories.repositoryWithInitialValue("default") .observe() .onUpdatesPerLoop() .getFrom(strSupplier) .transform(transform) .thenMergeIn(integerSupplier,merger) .compile();Copy the code
RxJava has a lot of operators for converting data, like map, flatMap, take, getFrom, Transform and thenMergeIn. Google encapsulates what helps you manipulate data. And you can tell by the name:
RepositoryWithInitialValue means to create a Repository and assign an initial value.
GetFrom means to get data from a Supplier.
Transfrom is a conversion that prefixes the data repository gets from strSupplier with a “new” string via a Function, much like a map in RxJava.
The last thenMergeIn integrates the data provided in the intergerSupplier with the data in our repository.
Finally, the Repository instance is obtained by complie.
Is it similar to RxJava? Is an operation that can be viewed as streaming.
Observe () and onUpdatesPerLoop(). Why is the last one called thenMergeIn() not mergeIn()?
Agera creates a Repository in this way. There is a concept of state.
public interface RepositoryCompilerStates {
interface REventSource {
RFrequency observe(@NonNull Observable... observables);
}
interface RFrequency extends REventSource {
RFlow onUpdatesPer(int millis);
RFlow onUpdatesPerLoop();
}
interface RFlow> extends RSyncFlow {
RFlow getFrom( Supplier supplier);
RTermination> attemptGetFrom(
Supplier> attemptSupplier);
RFlow mergeIn( Supplier supplier,
Merger merger);
RTermination> attemptMergeIn(
Supplier supplier,
Merger> attemptMerger);
RFlow transform( Function function);
RTermination> attemptTransform(
Function> attemptFunction);
TSelf goTo(@NonNull Executor executor);
RSyncFlow goLazy();
}
interface RSyncFlow> {
RSyncFlow getFrom( Supplier supplier);
RTermination> attemptGetFrom(
Supplier> attemptSupplier);
RSyncFlow mergeIn( Supplier supplier,
Merger merger);
RTermination> attemptMergeIn(
Supplier supplier,
Merger> attemptMerger);
RSyncFlow transform( Function function);
RTermination> attemptTransform(
Function> attemptFunction);
RTermination check(@NonNull Predicate predicate);
RTermination check( @NonNull Function caseFunction, @NonNull Predicate casePredicate);
TSelf sendTo(@NonNull Receiver receiver);
TSelf bindWith(@NonNull Supplier secondValueSupplier, @NonNull Binder binder);
RConfig thenSkip();
RConfig thenGetFrom(@NonNull Supplier supplier);
RTermination> thenAttemptGetFrom(
Supplier> attemptSupplier);
RConfig thenMergeIn(@NonNull Supplier supplier, @NonNull Merger merger);
RTermination> thenAttemptMergeIn(
Supplier supplier,
Merger super TPre, ? super TAdd,
? extends Result> attemptMerger);
RConfig thenTransform( @NonNull Function function);
RTermination> thenAttemptTransform(
Function> attemptFunction);
}
interface RTermination {
TRet orSkip();
TRet orEnd(@NonNull Function valueFunction);
}
interface RConfig {
RConfig notifyIf(@NonNull Merger checker);
RConfig onDeactivation(@RepositoryConfig int deactivationConfig);
RConfig onConcurrentUpdate(@RepositoryConfig int concurrentUpdateConfig);
Repository compile();
RFrequency compileIntoRepositoryWithInitialValue(@NonNull TVal2 value);
}
Copy the code
We can see that the interface contains many methods, but if you look closely you will see that the methods defined in this interface are the same ones that we used in Repository to manipulate data. The difference is that they return something else rather than Repository. This return value represents the state of the data Repository is processing.
Here is a summary of some representative states, the others are inherited from one of them, representing similar states.
REventSource: this is the initial state, the Repositories. RepositoryWithInitialValue (), the return value of this method is REventSource, marks the beginning of the event source.
RFrequency: frequency of event sources.
RFlow: Represents the flow of data processing. The methods defined here are all related to data processing, such as getFrom(), mergeIn(), etc. GetFrom () returns RFlow, which means that we can call by flow, such as mergeIn() after getFrom(), but thenXXX() returns RTermination, which means that if you call this method, Then the data processing flow is over.
RTermination: terminates the data processing flow.
RConfig: Various other configurations, such as notifyIf() whether to wake up Updatable, etc.
By defining the state in this way, we can clearly know what state we are dealing with and better understand the whole function call process.
Initialization (Repositories. RepositoryWithInitialValue (…). ) – >
(observe())->
Specify how often events are sent (onUpdatesPerLoop() or onUpdatesPer(…)) ) – >
Processing data streams (various processing functions)->
End data Stream ->
Configure some properties (notifyIf(…)) Etc.) – >
Complie ().
The whole process is irreversible, which means that you cannot call a function like getFrom() after thenMergeIn(). You call thenXXX() to terminate the data processing stream.
So far we have covered the entire Repository flow process, but we will see that the above code is only an abstract interface, so where is the implementation? (Here to give you a better understanding of agera, to look at the source code, although the title is advanced use..)
Let’s look back at the beginning, take a look at the Repositories. RepositoryWithInitialValue () this function.
public static REventSource repositoryWithInitialValue(@NonNull final T initialValue) {
return RepositoryCompiler.repositoryWithInitialValue(initialValue);
}
Copy the code
The RepositoryCompiler function of the same name is called. Let’s see what RepositoryCompiler is.
final class RepositoryCompiler implements RepositoryCompilerStates.RFrequency, RepositoryCompilerStates.RFlow, RepositoryCompilerStates.RTermination, RepositoryCompilerStates.RConfig {
.......
}
Copy the code
To our surprise, it implements the same interfaces mentioned above, meaning that RepositoryCompiler is the class agera uses to manage the state of Repository data processing flows. Let’s see what Repository the compiler() method finally generates.
public Repository compile() {
Repository repository = compileRepositoryAndReset();
recycle(this);
return repository;
}
private Repository compileRepositoryAndReset() {
checkExpect(CONFIG);
Repository repository = CompiledRepository.compiledRepository(initialValue, eventSources, frequency, directives,
notifyChecker, concurrentUpdateConfig, deactivationConfig);
expect = NOTHING;
initialValue = null;
eventSources.clear();
frequency = 0;
directives.clear();
goLazyUsed = false;
notifyChecker = objectsUnequal();
deactivationConfig = RepositoryConfig.CONTINUE_FLOW;
concurrentUpdateConfig = RepositoryConfig.CONTINUE_FLOW;
return repository;
}
Copy the code
You can see that the CompiledRepository method is called.
static Repository compiledRepository( @NonNull final Object initialValue, @NonNull final List eventSources, final int frequency, @NonNull final List directives, @NonNull final Merger notifyChecker, @RepositoryConfig final int concurrentUpdateConfig, @RepositoryConfig final int deactivationConfig) {
Observable eventSource = perMillisecondObservable(frequency,
compositeObservable(eventSources.toArray(new Observable[eventSources.size()])));
Object[] directiveArray = directives.toArray();
return new CompiledRepository(initialValue, eventSource,
directiveArray, notifyChecker, deactivationConfig, concurrentUpdateConfig);
}
Copy the code
The Repository we use is called compiledRepository.
In fact, the names of these classes already give us a good idea of the process.
The first step is to call the Repositories. RepositoryWithInitialValue (). [Repositories] is a utils class that will help us generate Respository.
The rest of the state handling is in the RepositoryCompiler class, which means the Repository compiler, created specifically to generate Repository.
The resulting CompiledRepository is a fully functional Repository.
Well, that’s all you need to know about Repository, and you can write your own. These data processing methods make it as easy to manipulate data as RxJava. Of course, Agera also provides encapsulation for asynchronous operations, like this:
private Executor executor = Executors.newSingleThreadExecutor();
repository = Repositories.repositoryWithInitialValue("default")
.observe()
.onUpdatesPerLoop()
.goTo(executor)
.thenGetFrom(new Supplier() {
public Object get() {
return null;
}
})
.compile();
Copy the code
Use the goTo operator.
Attempt & Result
In the above example, we use A Repository to replace the original Observable, which, combined with the operator, can initially fulfill our various requirements. But there is a problem. What if there is an error in the Supplier get() method? Such as this
private Supplier strSupplier = new Supplier() { public Integer get() { return 1/0; }};Copy the code
Of course this code will not be generated in practice, but there will always be errors. For RxJava, there is good error handling. Does Agera have it? The answer is yes. AttemptXXX () and Result classes.
Let’s start with a piece of code
repository = Repositories.repositoryWithInitialValue(0)
.observe()
.onUpdatesPerLoop()
.thenGetFrom(strSupplier)
.compile();
repository.addUpdatable(this);
Copy the code
If we do it the way we just did, strSupplier’s get() method returns 1/0 and it explodes. The program just exits, and your day is over. But if you do
private Supplier> safeStrSupplier = new Supplier>() { public Result get() { try{ return Result.success(1/ 0); }catch (ArithmeticException e){ return Result.failure(e); }}}; safeRepository = Repositories.repositoryWithInitialValue(Result.absent()) .observe() .onUpdatesPerLoop() .attemptGetFrom(safeStrSupplier).orEnd(new Function>() { public Result apply(@NonNull Throwable input) { return Result.success(2222); } }) .thenTransform(new Function>() { public Result apply(@NonNull Integer input) { return Result.absentIfNull(input); } }) .compile(); safeRepository.addUpdatable(this);Copy the code
AttempGetFrom () instead of getFrom() can be attempGetFrom(), orEnd() can also be used here. The difference between orSkip() and attempGetFrom() is that if an exception is received, orSkip() still notifies Updatable of the update, and orSkip() skips it. Supplier is also different. We use the Result class in safeSupplier to wrap the data we operate on and execute success or failure by calling success() or failure().
So here, if you write code like 1/0 and throw an exception, we can safely catch it and do whatever you want. ThenTransform (), we return result.absEntifNULL (input); Indicates that if the data is empty, we return the default value.
In our daily coding, we should try to adopt such a way to prevent the occurrence of exceptions.
Receiver
Result is mentioned above, here we can use Receiver to work with Result.
private Receiver errorReceiver = new Receiver() { public void accept(@NonNull Throwable value) { trigger.setText(value.toString()); }}; private Receiver successReceiver = new Receiver() { public void accept(@NonNull Integer value) { trigger.setText(String.valueOf(value)); }}; public void update() { safeRepository.get() .ifFailedSendTo(errorReceiver) .ifSucceededSendTo(successReceiver); }Copy the code
As in the previous section, our safeRepository template is Result, so we get a Result in the update() method. Its ifFailedSendTo() and ifFailedSendTo() indicate that if the entire data stream is successfully sent to XX or fails to be sent to XX, xx must implement the Receiver interface here.
* A receiver of objects. */
public interface Receiver {
* Accepts the given {@code value}. */
void accept(@NonNull T value);
}
Copy the code
We can then take the corresponding value in the accept() method.
Reservoir
This thing, in a nutshell, is a queue in reactive programming for producer/consumer operations.
1
public interface Reservoir extends Receiver, Repository> {}
Copy the code
As you can see, it inherits from Receiver and Repository, so it can use accept() to receive data or get() to return data.
We get a Reservior in use by calling the following code.
1
Private Reservoir provider = reservoir.reservoir ();Copy the code
Tracking the source of the reservoir look.
public static Reservoir reservoir(@NonNull final Queue queue) { return new SynchronizedReservoir<>(checkNotNull(queue)); } private static final class SynchronizedReservoir extends BaseObservable implements Reservoir { private final Queue queue; private SynchronizedReservoir(@NonNull final Queue queue) { this.queue = checkNotNull(queue); } public void accept(@NonNull T value) { boolean shouldDispatchUpdate; synchronized (queue) { boolean wasEmpty = queue.isEmpty(); boolean added = queue.offer(value); shouldDispatchUpdate = wasEmpty && added; } if (shouldDispatchUpdate) { dispatchUpdate(); } } public Result get() { T nullableValue; boolean shouldDispatchUpdate; synchronized (queue) { nullableValue = queue.poll(); shouldDispatchUpdate = ! queue.isEmpty(); } if (shouldDispatchUpdate) { dispatchUpdate(); } return absentIfNull(nullableValue); } protected void observableActivated() { synchronized (queue) { if (queue.isEmpty()) { return; } } dispatchUpdate(); }}Copy the code
The SynchronizedReservoir has a queue for accPET and a queue for GET.
I’m sorry to say that I don’t understand Reservio very well. I only know how to use it, but I don’t know why, so I don’t want to give you too much to avoid misunderstanding. If you’re interested, you can check out this wiki.
The Function of the use of
We know that agera, like RxJava, has a lot of operators to use, but let’s imagine if we had a very complex operation, would we have to write a bunch of transform() operators? I’m sure it works, but think about it a little bit, for a generic operation, how do you reuse it this way? Do you want to write for each of the five pages that you want to use?
Google display doesn’t get us into this dilemma, so here’s the [Functions] class.
As the name suggests, it is a utility class, just like our predecessor Repositories. It can combine multiple functions together organically.
private Supplier supplier = new Supplier() { public String get() { return "url"; }}; private Function> strToList = new Function>() { public List apply(@NonNull String input) { List data = new ArrayList<>(); for(int i = 0; i < 10; i++){ data.add(i); } return data; }}; private Predicate filter = new Predicate() { public boolean apply(@NonNull Integer value) { return value > 5; }}; private Function intToStr = new Function() { public String apply(@NonNull Integer input) { return String.valueOf(input); }}; private Function, Integer> getSize = new Function, Integer>() { public Integer apply(@NonNull List input) { return input.size(); }}; Function finalFunc = Functions.functionFrom(String.class) .unpack(strToList) .filter(filter) .map(intToStr) .thenApply(getSize); private Repository repository; repository = Repositories.repositoryWithInitialValue("default") .observe() .onUpdatesPerLoop() .getFrom(supplier) .transform(finalFunc) .thenTransform(new Function() { public String apply(@NonNull Integer input) { return String.valueOf(input); } }) .compile(); repository.addUpdatable(this);Copy the code
Among them, focus on
Function finalFunc = Functions.functionFrom(String.class)
.unpack(strToList)
.filter(filter)
.map(intToStr)
.thenApply(getSize);
Copy the code
The Functions class provides operators such as unpack(), filter(), etc., which concatenate operators and produce a final operator. We can put this operator in our Repository’s data processing state machine, and you can also store finalFunc for reuse purposes.
So much for the advanced use of Agera. How to say, here I also take you into the door, to understand how to use agera is correct, or to rely on your own ah!
Agera source code analysis
Having said the use of Agera, let’s analyze its source code, knowing yourself and knowing your opponent can win every battle.
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
First, we have looked at how Repository is generated by RepositoryCompiler. Let’s take a look at the RepositoryCompiler specific do, first look at its repositoryWithInitialValue () method.
static RepositoryCompilerStates.REventSource repositoryWithInitialValue( @NonNull final TVal initialValue) {
checkNotNull(Looper.myLooper());
RepositoryCompiler compiler = compilers.get();
if (compiler == null) {
compiler = new RepositoryCompiler();
} else {
compilers.set(null);
}
return compiler.start(initialValue);
}
Copy the code
Go to ThreadLocal, get the thread’s compiler, and call its start() method.
private RepositoryCompiler start(@NonNull final Object initialValue) {
checkExpect(NOTHING);
expect = FIRST_EVENT_SOURCE;
this.initialValue = initialValue;
return this;
}
Copy the code
The first is the judgment of expect, indicating what state it is in, and the FIRST_EVENT_SOURCE corresponding to start, which is well understood through the previous analysis.
Next, let’s look at the observe code.
public RepositoryCompiler observe(@NonNull final Observable... observables) {
checkExpect(FIRST_EVENT_SOURCE, FREQUENCY_OR_MORE_EVENT_SOURCE);
for (Observable observable : observables) {
eventSources.add(checkNotNull(observable));
}
expect = FREQUENCY_OR_MORE_EVENT_SOURCE;
return this;
}
Copy the code
We did not pass any arguments in the observer() method, so we will assume that the arguments are null and we will return to the arguments later.
Then there are operations in the frequency state.
public RepositoryCompiler onUpdatesPer(int millis) {
checkExpect(FREQUENCY_OR_MORE_EVENT_SOURCE);
frequency = Math.max(0, millis);
expect = FLOW;
return this;
}
public RepositoryCompiler onUpdatesPerLoop() {
return onUpdatesPer(0);
}
Copy the code
You can see that if the onUpdatesPerLoop() method is called, the event is fired every time, so millis is 0.
Next, events in various flow states, such as getFrom() and thenTransform().
public RepositoryCompiler getFrom(@NonNull final Supplier supplier) {
checkExpect(FLOW);
addGetFrom(supplier, directives);
return this;
}
Copy the code
The addGetFrom() method is called.
static void addGetFrom(@NonNull final Supplier supplier, @NonNull final List directives) {
directives.add(GET_FROM);
directives.add(supplier);
}
Copy the code
The supplier and the corresponding GET_FROM are directly loaded into a list. Notice that the list here is passed in by the previous method. This GET_FROM is just a marker bit.
private static final int END = 0;
private static final int GET_FROM = 1;
private static final int MERGE_IN = 2;
private static final int TRANSFORM = 3;
private static final int CHECK = 4;
private static final int GO_TO = 5;
private static final int GO_LAZY = 6;
private static final int SEND_TO = 7;
private static final int BIND = 8;
private static final int FILTER_SUCCESS = 9;
Copy the code
You can see that each operation has its own tag bit.
ThenTransform () follows.
public RepositoryCompiler thenTransform(@NonNull final Function function) {
transform(function);
endFlow(false);
return this;
}
Copy the code
Transform () and endFlow() are called directly.
17
public RepositoryCompiler transform(@NonNull final Function function) {
checkExpect(FLOW);
addTransform(function, directives);
return this;
}
private void endFlow(boolean skip) {
addEnd(skip, directives);
expect = CONFIG;
}
static void addEnd(boolean skip, @NonNull final List directives) {
directives.add(END);
directives.add(skip);
}
Copy the code
Transform () does the same thing as getFrom() does, adding token bits and function to the list so that the size of our list is now 4. The same is true for endFlow.
After getFrom() and thenTransform(), we get a list of size 6.
Do you know why thenXXX() calls endFlow()? Call thenXXX() to terminate the data processing flow, and the corresponding state goes into termination, so endFlow() is required.
Finally, compile to generate our CompiledRepository.
At this point we have analyzed the entire Repository generation process. Next comes repository.addupDatable ().
First let’s see what CompiledRepository is.
final class CompiledRepository extends BaseObservable implements Repository, Updatable, Runnable {
.....
}
Copy the code
It inherits from A BaseObservable and implements the Repository, Updatable, and Runnable interfaces.
* A partial implementation of {@link Observable} that adheres to the threading contract between * {@link Observable}s and {@link Updatable}s. Subclasses can use {@link #observableActivated()} and * {@link #observableDeactivated()} to control the activation and deactivation of this observable, * and to send out notifications to client updatables with {@link #dispatchUpdate()}. * * For cases where subclassing {@link BaseObservable} is impossible, for example when the * potential class already has a base class, consider using {@link Observables#updateDispatcher()} * to help implement the {@link Observable} interface. */
public abstract class BaseObservable implements Observable { private final Worker worker; protected BaseObservable() { checkState(Looper.myLooper() ! = null, "Can only be created on a Looper thread"); worker = new Worker(this); } public final void addUpdatable(@NonNull final Updatable updatable) { checkState(Looper.myLooper() ! = null, "Can only be added on a Looper thread"); worker.addUpdatable(updatable); } public final void removeUpdatable(@NonNull final Updatable updatable) { checkState(Looper.myLooper() ! = null, "Can only be removed on a Looper thread"); worker.removeUpdatable(updatable); } * Notifies all registered {@link Updatable}s. */ protected final void dispatchUpdate() { worker.dispatchUpdate(); } * Called from the worker looper thread when this {@link Observable} is activated by transitioning * from having no client {@link Updatable}s to having at least one client {@link Updatable}. */ protected void observableActivated() {} * Called from the worker looper thread when this {@link Observable} is deactivated by * transitioning from having at least one client {@link Updatable} to having no client * {@link Updatable}s. */ protected void observableDeactivated() {} public Updatable getUpdatable(){ return worker.getUpdatable(); } * Worker and synchronization lock behind a {@link BaseObservable}. */ static final class Worker { private static final Object[] NO_UPDATABLES_OR_HANDLERS = new Object[0]; private final BaseObservable baseObservable; private final WorkerHandler handler; private Object[] updatablesAndHandlers; private int size; Worker( final BaseObservable baseObservable) { this.baseObservable = baseObservable; this.handler = workerHandler(); this.updatablesAndHandlers = NO_UPDATABLES_OR_HANDLERS; this.size = 0; } public Updatable getUpdatable(){ return (Updatable)updatablesAndHandlers[0]; } synchronized void addUpdatable(@NonNull final Updatable updatable) { add(updatable, workerHandler()); if (size == 1) { handler.obtainMessage(WorkerHandler.MSG_FIRST_ADDED, this).sendToTarget(); } } synchronized void removeUpdatable(@NonNull final Updatable updatable) { remove(updatable); if (size == 0) { handler.obtainMessage(MSG_LAST_REMOVED, this).sendToTarget(); } } void dispatchUpdate() { handler.obtainMessage(MSG_UPDATE, this).sendToTarget(); } private void add(@NonNull final Updatable updatable, @NonNull final Handler handler) { boolean added = false; for (int index = 0; index < updatablesAndHandlers.length; index += 2) { if (updatablesAndHandlers[index] == updatable) { throw new IllegalStateException("Updatable already added, cannot add."); } if (updatablesAndHandlers[index] == null && ! added) { updatablesAndHandlers[index] = updatable; updatablesAndHandlers[index + 1] = handler; added = true; } } if (! added) { final int newIndex = updatablesAndHandlers.length; updatablesAndHandlers = Arrays.copyOf(updatablesAndHandlers, Math.max(newIndex * 2, newIndex + 2)); updatablesAndHandlers[newIndex] = updatable; updatablesAndHandlers[newIndex + 1] = handler; } size++; } private void remove(@NonNull final Updatable updatable) { for (int index = 0; index < updatablesAndHandlers.length; index += 2) { if (updatablesAndHandlers[index] == updatable) { ((WorkerHandler) updatablesAndHandlers[index + 1]).removeMessages( WorkerHandler.MSG_CALL_UPDATABLE, updatable); updatablesAndHandlers[index] = null; updatablesAndHandlers[index + 1] = null; size--; return; } } throw new IllegalStateException("Updatable not added, cannot remove."); } synchronized void sendUpdate() { for (int index = 0; index < updatablesAndHandlers.length; index = index + 2) { final Updatable updatable = (Updatable) updatablesAndHandlers[index]; final WorkerHandler handler = (WorkerHandler) updatablesAndHandlers[index + 1]; if (updatable ! = null) { if (handler.getLooper() == Looper.myLooper()) { updatable.update(); } else { handler.obtainMessage(WorkerHandler.MSG_CALL_UPDATABLE, updatable).sendToTarget(); } } } } void callFirstUpdatableAdded() { baseObservable.observableActivated(); } void callLastUpdatableRemoved() { baseObservable.observableDeactivated(); }}}Copy the code
Here, I intercept the annotation of Observable. With the annotation, we can clearly understand that it is just like a base class, which defines a Worker and encapsulates some general operations. This is where our Repository addUpdatable() method is called.
public final void addUpdatable(@NonNull final Updatable updatable) { checkState(Looper.myLooper() ! = null, "Can only be added on a Looper thread"); worker.addUpdatable(updatable); }Copy the code
First, it will determine whether the Looper of the current thread is empty. This is because the Push event in Agera is based on the handler mechanism of Android. Those who are not familiar with the handler mechanism can read my blog.
The worker’s function with the same name is then called.
synchronized void addUpdatable(@NonNull final Updatable updatable) { add(updatable, workerHandler()); if (size == 1) { handler.obtainMessage(WorkerHandler.MSG_FIRST_ADDED, this).sendToTarget(); }}Copy the code
The add() method is called first.
21
private void add(@NonNull final Updatable updatable, @NonNull final Handler handler) { boolean added = false; for (int index = 0; index < updatablesAndHandlers.length; index += 2) { if (updatablesAndHandlers[index] == updatable) { throw new IllegalStateException("Updatable already added, cannot add."); } if (updatablesAndHandlers[index] == null && ! added) { updatablesAndHandlers[index] = updatable; updatablesAndHandlers[index + 1] = handler; added = true; } } if (! added) { final int newIndex = updatablesAndHandlers.length; updatablesAndHandlers = Arrays.copyOf(updatablesAndHandlers, Math.max(newIndex * 2, newIndex + 2)); updatablesAndHandlers[newIndex] = updatable; updatablesAndHandlers[newIndex + 1] = handler; } size++; }Copy the code
Store the corresponding Updatable and handler in the updatablesAndHandlers array. Handler is created through the workerHandler() method.
12
private static final ThreadLocal> handlers = new ThreadLocal<>(); static WorkerHandler workerHandler() { final WeakReference handlerReference = handlers.get(); WorkerHandler handler = handlerReference ! = null ? handlerReference.get() : null; if (handler == null) { handler = new WorkerHandler(); handlers.set(new WeakReference<>(handler)); } return handler; }Copy the code
Weak references are used to prevent memory leaks. After all, it’s a handler that might have some delayed operations.
At the end of the add() method, size++ is added. Then let’s go back to the addUpdatable() method.
synchronized void addUpdatable(@NonNull final Updatable updatable) { add(updatable, workerHandler()); if (size == 1) { handler.obtainMessage(WorkerHandler.MSG_FIRST_ADDED, this).sendToTarget(); }}Copy the code
If size is 1, the message is sent using handler. As the name of the Message indicates, MSG_FIRST_ADDED must only be triggered on the first addUpdatable.
static final class WorkerHandler extends Handler {
static final int MSG_FIRST_ADDED = 0;
static final int MSG_LAST_REMOVED = 1;
static final int MSG_UPDATE = 2;
static final int MSG_CALL_UPDATABLE = 3;
static final int MSG_CALL_MAYBE_START_FLOW = 4;
static final int MSG_CALL_ACKNOWLEDGE_CANCEL = 5;
static final int MSG_CALL_LOW_PASS_UPDATE = 6;
public void handleMessage(final Message message) {
switch (message.what) {
case MSG_UPDATE:
((Worker) message.obj).sendUpdate();
break;
case MSG_FIRST_ADDED:
((Worker) message.obj).callFirstUpdatableAdded();
break;
case MSG_LAST_REMOVED:
((Worker) message.obj).callLastUpdatableRemoved();
break;
case MSG_CALL_UPDATABLE:
((Updatable) message.obj).update();
break;
case MSG_CALL_MAYBE_START_FLOW:
((CompiledRepository) message.obj).maybeStartFlow();
break;
case MSG_CALL_ACKNOWLEDGE_CANCEL:
((CompiledRepository) message.obj).acknowledgeCancel();
break;
case MSG_CALL_LOW_PASS_UPDATE:
((LowPassFilterObservable) message.obj).lowPassUpdate();
break;
default:
}
}
}
Copy the code
Next, we look at the WorkerHandler, which defines some Message actions, such as MSG_FIRST_ADDED for the first addUpdatable, MSG_UPDATE for the Updatable update, and so on. We will enter the MSG_FIRST_ADDED case and call the Worker’s callFirstUpdatableAdded().
void callFirstUpdatableAdded() {
baseObservable.observableActivated();
}
Copy the code
The observableActivated() method of BaseObservable is called.
1
protected void observableActivated() {}
Copy the code
This is an empty method, overridden in a BaseObservable subclass, called CompiledRepository.
protected void observableActivated() {
eventSource.addUpdatable(this);
maybeStartFlow();
}
Copy the code
Ignoring the eventSource, you can see that maybeStartFlow() is called.
void maybeStartFlow() {
synchronized (this) {
if (runState == IDLE || runState == PAUSED_AT_GO_LAZY) {
runState = RUNNING;
lastDirectiveIndex = -1;
restartNeeded = false;
} else {
return;
}
}
intermediateValue = currentValue;
runFlowFrom(0, false);
}
Copy the code
RunFlowFrom () is called.
private void runFlowFrom(final int index, final boolean asynchronously) { final Object[] directives = this.directives; final int length = directives.length; int i = index; while (0 int directiveType = (Integer) directives[i]; if (asynchronously || directiveType == GO_TO || directiveType == GO_LAZY) { synchronized (this) { if (checkCancellationLocked()) { break; } if (directiveType == GO_TO) { setPausedAtGoToLocked(i); } else if (directiveType == GO_LAZY) { setLazyAndEndFlowLocked(i); return; } } } switch (directiveType) { case GET_FROM: i = runGetFrom(directives, i); break; case MERGE_IN: i = runMergeIn(directives, i); break; case TRANSFORM: i = runTransform(directives, i); break; case CHECK: i = runCheck(directives, i); break; case GO_TO: i = runGoTo(directives, i); break; case SEND_TO: i = runSendTo(directives, i); break; case BIND: i = runBindWith(directives, i); break; case FILTER_SUCCESS: i = runFilterSuccess(directives, i); break; case END: i = runEnd(directives, i); break; }}}Copy the code
Another pile of cases.
1
int directiveType = (Integer) directives[i];
Copy the code
And the switch is this. Remember the cache list? It’s the way the data flows that we just did. We just called getFrom() and thenTransform(), in case GET_FROM, TRANSFORM and END, and called runGetFrom(), runTransform() and runEnd().
private int runGetFrom(@NonNull final Object[] directives, final int index) {
Supplier supplier = (Supplier) directives[index + 1];
intermediateValue = checkNotNull(supplier.get());
return index + 2;
}
private int runTransform(@NonNull final Object[] directives, final int index) {
Function function = (Function) directives[index + 1];
intermediateValue = checkNotNull(function.apply(intermediateValue));
return index + 2;
}
private int runEnd(@NonNull final Object[] directives, final int index) {
boolean skip = (Boolean) directives[index + 1];
if (skip) {
skipAndEndFlow();
} else {
setNewValueAndEndFlow(intermediateValue);
}
return -1;
}
Copy the code
This shows how a method is called in a function. For example, runGetFrom() calls the supplier’s get(), which we passed in during initialization.
This means:
repository = Repositories.repositoryWithInitialValue("default")
.observe()
.onUpdatesPerLoop()
.getFrom(supplier)
.thenTransform(function)
.compile();
Copy the code
When executed, the CompiledRepository will store the corresponding operations in the Cache list, and in the cache
1
repository.addUpdatable(this);
Copy the code
When this code executes, the CompiledRepository will be reported to fetch the corresponding operation and execute it.
Then we look at the final runEnd().
private int runEnd(@NonNull final Object[] directives, final int index) {
boolean skip = (Boolean) directives[index + 1];
if (skip) {
skipAndEndFlow();
} else {
setNewValueAndEndFlow(intermediateValue);
}
return -1;
}
Copy the code
Instead of calling skip, we go straight to setNewValueAndEndFlow().
private synchronized void setNewValueAndEndFlow(@NonNull final Object newValue) { boolean wasRunningLazily = runState == RUNNING_LAZILY; runState = IDLE; intermediateValue = initialValue; if (wasRunningLazily) { currentValue = newValue; } else { setNewValueLocked(newValue); } checkRestartLocked(); }Copy the code
Here we’re not calling the goLazy operation either, so setNewValueLocked() is called.
private void setNewValueLocked(@NonNull final Object newValue) { boolean shouldNotify = notifyChecker.merge(currentValue, newValue); currentValue = newValue; if (shouldNotify) { dispatchUpdate(); }}Copy the code
Here notifyChecker. Merge (currentValue, newValue); This is to check whether newValue is the same as currentValue, which is the last value of CompiledRepository after runXXX(). If the two values are different, dispatchUpdate() is called;
protected final void dispatchUpdate() {
worker.dispatchUpdate();
}
Copy the code
The worker’s function of the same name is called.
void dispatchUpdate() {
handler.obtainMessage(MSG_UPDATE, this).sendToTarget();
}
Copy the code
Again, messages are passed through a handler.
case MSG_UPDATE:
((Worker) message.obj).sendUpdate();
break;
Copy the code
The Worker’s sendUpdate() is called in the handler’s case.
synchronized void sendUpdate() { for (int index = 0; index < updatablesAndHandlers.length; index = index + 2) { final Updatable updatable = (Updatable) updatablesAndHandlers[index]; final WorkerHandler handler = (WorkerHandler) updatablesAndHandlers[index + 1]; if (updatable ! = null) { if (handler.getLooper() == Looper.myLooper()) { updatable.update(); } else { handler.obtainMessage(WorkerHandler.MSG_CALL_UPDATABLE, updatable).sendToTarget(); }}}}Copy the code
Remember the updatablesAndHandlers array? We add the corresponding Updatable and [Updatable handler] to the array when we call the addUpdatable() method. Why do I want to emphasize the Updatable handler here? It’s easy to confuse it with the Repository handler. In fact, there are two handlers. One is the Repository handler, which is held by the Worker and is used to distribute events.
static final int MSG_FIRST_ADDED = 0;
static final int MSG_LAST_REMOVED = 1;
static final int MSG_UPDATE = 2;
static final int MSG_CALL_UPDATABLE = 3;
static final int MSG_CALL_MAYBE_START_FLOW = 4;
static final int MSG_CALL_ACKNOWLEDGE_CANCEL = 5;
static final int MSG_CALL_LOW_PASS_UPDATE = 6;
Copy the code
Worker( final BaseObservable baseObservable) {
this.baseObservable = baseObservable;
this.handler = workerHandler();
this.updatablesAndHandlers = NO_UPDATABLES_OR_HANDLERS;
this.size = 0;
}
Copy the code
You can see that it is initialized in the Worker’s constructor.
And the handler that we get from the updatabase andHandlers is the Updatable handler.
synchronized void addUpdatable(@NonNull final Updatable updatable) { add(updatable, workerHandler()); if (size == 1) { handler.obtainMessage(WorkerHandler.MSG_FIRST_ADDED, this).sendToTarget(); } } private void add(@NonNull final Updatable updatable, @NonNull final Handler handler) { boolean added = false; for (int index = 0; index < updatablesAndHandlers.length; index += 2) { if (updatablesAndHandlers[index] == updatable) { throw new IllegalStateException("Updatable already added, cannot add."); } if (updatablesAndHandlers[index] == null && ! added) { updatablesAndHandlers[index] = updatable; updatablesAndHandlers[index + 1] = handler; added = true; }}... }Copy the code
Is created in addUpdatable(). Don’t confuse the two!
Back to sendUpdate ().
if (updatable != null) {
if (handler.getLooper() == Looper.myLooper()) {
updatable.update();
} else {
handler.obtainMessage(WorkerHandler.MSG_CALL_UPDATABLE, updatable).sendToTarget();
}
}
Copy the code
If looper of the Updatable handler is the same as looper.mylooper () of the Repository handler, then updatable.update() is called. Otherwise, MSG_CALL_UPDATABLE is sent using the Updatable handler.
case MSG_CALL_UPDATABLE:
((Updatable) message.obj).update();
break;
Copy the code
Both call Updatable update(), but the meaning is different. If (handler.getlooper () == looper.mylooper ())) Check whether Updatable and Repository are in the same thread. We certainly are now, but if we change our code to something like this:
new Thread(new Runnable() {
public void run() {
repository.addUpdatable(this);
}
}).start();
Copy the code
This indicates that we called addUpdatable in the child thread. The corresponding:
synchronized void addUpdatable(@NonNull final Updatable updatable) { add(updatable, workerHandler()); if (size == 1) { handler.obtainMessage(WorkerHandler.MSG_FIRST_ADDED, this).sendToTarget(); } } static WorkerHandler workerHandler() { final WeakReference handlerReference = handlers.get(); WorkerHandler handler = handlerReference ! = null ? handlerReference.get() : null; if (handler == null) { handler = new WorkerHandler(); handlers.set(new WeakReference<>(handler)); } return handler; }Copy the code
This code is done in the child thread, and the handler is retrieved from ThreadLocal, which is also the child thread. So in this case, the if judgment is not true, and Repository calls the Updatable handler to distribute things. We all know that handleMessage() is executed on the thread in which the handler is created.
This completes the process of creating a Repository and registering Updatable. Observe () and onUpdatesPerLoop() methods are useless. In addition, if currentValue() is equal to newValue, It does not call sendUpdate(), which means that if we want to register two different updatables in a Repository at different points in time and get the same data (a bit tricky.) At present, it seems impossible. As for the first question, I won’t explain it here. I want you to explore it on your own, because there are many relationships between Repository, Observable and Updatable. I think if you learn it yourself, you will have a better understanding of agera. As for the second problem, LET me tell you the solution — use the goLazy() function.
Like this:
repository = Repositories.repositoryWithInitialValue("default")
.observe()
.onUpdatesPerLoop()
.goLazy()
.thenGetFrom(supplier)
.compile();
Copy the code
Thus, Repository will still send events for the same Result. As for the reason, we still look at the source code.
public RepositoryCompiler goLazy() {
checkExpect(FLOW);
checkGoLazyUnused();
addGoLazy(directives);
goLazyUsed = true;
return this;
}
Copy the code
Again, addGoLazy(Directives) is called
static void addGoLazy(@NonNull final List directives) {
directives.add(GO_LAZY);
}
Copy the code
Then let’s see what GO_LAZY does to runFlowFrom(), which executes the logic.
private void runFlowFrom(final int index, final boolean asynchronously) { final Object[] directives = this.directives; final int length = directives.length; int i = index; while (0 int directiveType = (Integer) directives[i]; if (asynchronously || directiveType == GO_TO || directiveType == GO_LAZY) { synchronized (this) { if (checkCancellationLocked()) { break; } if (directiveType == GO_TO) { setPausedAtGoToLocked(i); } else if (directiveType == GO_LAZY) { setLazyAndEndFlowLocked(i); return; } } } switch (directiveType) { case GET_FROM: i = runGetFrom(directives, i); break; case MERGE_IN: i = runMergeIn(directives, i); break; case TRANSFORM: i = runTransform(directives, i); break; case CHECK: i = runCheck(directives, i); break; case GO_TO: i = runGoTo(directives, i); break; case SEND_TO: i = runSendTo(directives, i); break; case BIND: i = runBindWith(directives, i); break; case FILTER_SUCCESS: i = runFilterSuccess(directives, i); break; case END: i = runEnd(directives, i); break; }}}Copy the code
Focus on this:
else if (directiveType == GO_LAZY) {
setLazyAndEndFlowLocked(i);
return;
}
Copy the code
If it is GO_LAZY, then all subsequent methods related to data manipulation will not be executed! See here, I believe you are so
Go lazy? Don’t worry, we’ll keep watching.
rivate void setLazyAndEndFlowLocked(final int resumeIndex) {
lastDirectiveIndex = resumeIndex;
runState = PAUSED_AT_GO_LAZY;
dispatchUpdate();
checkRestartLocked();
}
Copy the code
First cache this index, lastDirectiveIndex = resumeIndex; Then dispatchUpdate (); But t this didn’t help because our data flow didn’t execute at all. That’s it. Let alone solving the problem, isn’t it creating the problem? ! We can’t even get the data!
Really can’t get it? Above we said that dispatchUpdate() is executed, which means that the udate() method of the corresponding Updatable is called. The update() method typically calls the Repository get() method, and the secret is in that get().
public synchronized Object get() {
if (runState == PAUSED_AT_GO_LAZY) {
int index = lastDirectiveIndex;
runState = RUNNING_LAZILY;
runFlowFrom(continueFromGoLazy(directives, index), false);
}
return currentValue;
}
Copy the code
If runState is PAUSED_AT_GO_LAZY, the logic is executed otherwise currentValue is returned. Here we call goLazy(), so it’s PAUSED_AT_GO_LAZY. RunFlowFrom () is re-executed in the logic.
In this case, since our runState is RUNNING_LAZILY and not GO_LAZY, the following data flow operations are performed up to runEnd().
private int runEnd(@NonNull final Object[] directives, final int index) {
boolean skip = (Boolean) directives[index + 1];
if (skip) {
skipAndEndFlow();
} else {
setNewValueAndEndFlow(intermediateValue);
}
return -1;
}
Copy the code
Again, call setNewValueAndEndFlow().
private synchronized void setNewValueAndEndFlow(@NonNull final Object newValue) { boolean wasRunningLazily = runState == RUNNING_LAZILY; runState = IDLE; intermediateValue = initialValue; if (wasRunningLazily) { currentValue = newValue; } else { setNewValueLocked(newValue); } checkRestartLocked(); }Copy the code
And since we’re at RUNNING_LAZILY, we’re just typing if and we’re directly assigning newValue to currentValue. Finally returned in get(). Recall that if goLazy had not been called, we would have returned the first runFlowFrom() because currentValue and newValue were equal and setNewValueLocked(newValue) would not have been executed at all!
GoLazy means lazy loading. If we don’t use goLazy, when we call addUpdatable() we’re going to stream data. If we use goLazy() we’re going to stream data. All data flow operations are deferred to get(). This is the characteristic of Agera’s “Push event, Pull Data Model”.
Agera encapsulation
After reading the source code, you are not a little tired, here in order to cheer you up, I decided to say some dry goods, but in fact the so-called dry goods are not written by me, I just take you to see how Google is using agera package some features, tell you the correct use posture.
Let’s start with a simple, encapsulated click event.
public class MainActivity extends AppCompatActivity implements Updatable{ private Button observableBtn; private TextView show; private ClickObservable clickObservable; protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); observableBtn = (Button)findViewById(R.id.observable_btn); show = (TextView)findViewById(R.id.show); clickObservable = new ClickObservable(); clickObservable.addUpdatable(this); observableBtn.setOnClickListener(clickObservable); } public void update() { show.setText("update!!" ); } public static class ClickObservable extends BaseObservable implements View.OnClickListener{ public void onClick(View v) { dispatchUpdate(); }}}Copy the code
Very simple, that’s all the code. Our ClickObservable inherits from BaseObservable, which means it is an observable and implements view. OnClickListener, which means it has click functionality. So all we need to do is make our activity inherit from Updatable and register with ClickObservable to distribute events on onClick.
The core concept here is the difference between inheritance and implementation. If you’re going to use Agera to encapsulate a feature. You could consider extending a BaseObservable as above, which means it can be an observable, and implements an interface that means it has such functionality. Finally, use Agera’s Observable/Updatable logic to deliver events where the function needs to.
Let’s consider another scenario. What if the class we have already inherited from a base class? Since Java doesn’t support multiple inheritance, we can’t inherit from A BaseObservable. The answer is no, we can explain by wrapping Broadcast.
public static final class BroadcastObservable extends BroadcastReceiver implements ActivationHandler, Observable { private final UpdateDispatcher updateDispatcher; private final Context context; private final IntentFilter filter; BroadcastObservable( final Context applicationContext, final String... actions) { this.context = checkNotNull(applicationContext); this.updateDispatcher = Observables.updateDispatcher(this); this.filter = new IntentFilter(); for (final String action : actions) { this.filter.addAction(action); } } public void observableActivated(@NonNull final UpdateDispatcher caller) { context.registerReceiver(this, filter); } public void observableDeactivated(@NonNull final UpdateDispatcher caller) { context.unregisterReceiver(this); } public void onReceive(final Context context, final Intent intent) { updateDispatcher.update(); } public void addUpdatable(@NonNull final Updatable updatable) { updateDispatcher.addUpdatable(updatable); } public void removeUpdatable(@NonNull final Updatable updatable) { updateDispatcher.removeUpdatable(updatable); }}Copy the code
This is the wrapped BroadcastObservable. Since it must inherit from BroadcastReceiver, Google came up with an approach that created the ActivationHandler interface.
public interface ActivationHandler {
* Called when the the {@code caller} changes state from having no {@link Updatable}s to * having at least one {@link Updatable}. */
void observableActivated(@NonNull UpdateDispatcher caller);
* Called when the the {@code caller} changes state from having {@link Updatable}s to * no longer having {@link Updatable}s. */
void observableDeactivated(@NonNull UpdateDispatcher caller);
}
Copy the code
The method is not familiar. It’s going to work with updateDispatcher, so let’s see how we can use updateDispatcher.
1
this.updateDispatcher = Observables.updateDispatcher(this);
Copy the code
We have a statement in the Constructor of the BroadcastObservable. Look what updateDispatcher() does.
public static UpdateDispatcher updateDispatcher( @NonNull final ActivationHandler activationHandler) { return new AsyncUpdateDispatcher(activationHandler); } private static final class AsyncUpdateDispatcher extends BaseObservable implements UpdateDispatcher { private final ActivationHandler activationHandler; private AsyncUpdateDispatcher(@Nullable ActivationHandler activationHandler) { this.activationHandler = activationHandler; } protected void observableActivated() { if (activationHandler ! = null) { activationHandler.observableActivated(this); } } protected void observableDeactivated() { if (activationHandler ! = null) { activationHandler.observableDeactivated(this); } } public void update() { dispatchUpdate(); }}Copy the code
UpdateDispatcher inherits from BaseObservable and implements the UpdateDispatcher interface.
public interface UpdateDispatcher extends Observable, Updatable {}
Copy the code
Back in our BroadcastObservable class, take a look at its onReceive() method.
public void onReceive(final Context context, final Intent intent) {
updateDispatcher.update();
}
Copy the code
Updatedispatcher.update () is called directly.
In the AsyncUpdateDispatcher class
public void update() {
dispatchUpdate();
}
Copy the code
DispatchUpdate () is called directly, which notifies the Updatable registered to it. Let’s look at how to register.
public class MainActivity extends AppCompatActivity implements Updatable{ private static final String ACTION = "action"; private TextView trigger; private ContentObservables.BroadcastObservable observable; protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); trigger = (TextView)findViewById(R.id.trigger); observable = (ContentObservables.BroadcastObservable) ContentObservables.broadcastObservable(this,ACTION); observable.addUpdatable(this); } public void send(View view){ Intent intent = new Intent(); intent.setAction(ACTION); sendBroadcast(intent); } public void update() { trigger.setText("update!!" ); } protected void onDestroy() { super.onDestroy(); observable.removeUpdatable(this); }}Copy the code
Observables. AddUpdatable (this) is called directly to register.
public void addUpdatable(@NonNull final Updatable updatable) {
updateDispatcher.addUpdatable(updatable);
}
Copy the code
After calling updateDispatcher’s addUpdatable(), we know that the observableActivated() method is called.
protected void observableActivated() {
if (activationHandler != null) {
activationHandler.observableActivated(this);
}
}
Copy the code
The activationHandler is our BroadcastObservable!
public void observableActivated(@NonNull final UpdateDispatcher caller) {
context.registerReceiver(this, filter);
}
Copy the code
The broadcast is registered directly, after which we call sendBroadcast() back to its onReceive().
public void onReceive(final Context context, final Intent intent) {
updateDispatcher.update();
}
Copy the code
Updatedispatcher.update () is called, and we know that dispatchUpdate() is called to notify registered Updatable.
The idea here is that since your class already inherits from A base class A, it [is] base CLASS A, and can’t be an Observable, so what do you do? We implement the ActivationHandler and Observable interfaces to make it [own] the corresponding functions, and put the UpdateDispatcher into it by [combining]. If you want to encapsulate similar functionality, try this idea.
Agera versus RxJava
Okay, we’re finally at the last level, so let’s talk about something lighter.
What is the difference between Agera and RxJava, both reactive frameworks?
First, agera [lighter], here I look at the number of methods:
Ignore the watermark. This is from my Twitter feed. You can see that agera has a much smaller number of methods than RxJava, which is partly because Agera is open source and still iterating. But for now, Agera does [focus more on Android].
The second and most important point, which I have repeatedly said, is that Agera is “Push event,pull data model”, while RxJava is “Push data model”. Since Agera separates events from data, we can see that There is something called goLazy that doesn’t process data until the get() method is executed.
More people can go to see this issue.
Next up
Notice your younger sister !!!! The graduation project hasn’t started yet!! The paper hasn’t started yet!! Don’t do not graduate college four years in vain to read!!