事件总线LiveDataBus

使用LiveData实现事件总线

最近在使用谷歌官方的架构组件重构项目,在事件总线的选择方面,以前用的是EventBus,因为现在项目中使用了LiveData,想到了之前看过的美团的一篇文章Android消息总线的演进之路里面讲了使用LiveDataBus,来代替RxBus、EventBus。感觉想法非常,于是项目中开始使用LiveDataBus,使用是非常简单的,不过来需了解实现原理。

开始之前最好先了解一下LiveData可以看这两篇文章:Android Jetpack之LiveDataAndroid Jetpack之Lifecycles

LiveDdataBus的优点

  • 使用简单
  • 代码量非常少
  • 能够感知组件(Activity,Fragment,Service等)的生命周期
  • 不用取消注册也不会内存泄露
  • 使用的是谷歌亲儿子LiveData,官方提供了稳定的依赖包,并且会一直维护

OK下面开始,想要实现一个事件总线的框架,我们需要搞定下面几个东西。

  1. 消息:可以是任何类型的消息
  2. 消息通道:系统API,LiveData,不同的消息通道使用不同的名字。并且可以通过名字获取该通道
  3. 消息总线: 通过一个集合来管理消息通道,比如HashMap
  4. 发布者: 系统API, setValue和postValue这俩都是LiveData提供的方法
  5. 订阅者: 系统API,Observer类

把上面的几个对应到代码中,一个简易的事件总线框架就出来啦如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class LiveDataBus {
//消息总线HashMap 和 消息通道 MutableLiveData
private final Map<String, MutableLiveData<Object>> mBus;

private LiveDataBus() {
mBus = new HashMap<>();
}

private static class SingletonHolder {
private static final LiveDataBus DATA_BUS = new LiveDataBus();
}

public static LiveDataBus get() {
return SingletonHolder.DATA_BUS;
}

public synchronized <T> MutableLiveData<T> with(String target, Class<T> type) {
if (!mBus.containsKey(target)) {
mBus.put(target, new MutableLiveData<>());
}
return (MutableLiveData<T>) mBus.get(target);
}

public MutableLiveData<Object> with(String target) {
return with(target, Object.class);
}
}
public class LiveDataFirstActivity extends AppCompatActivity {
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_live_data_bus);
//传入订阅者和观察者
LiveDataBus.get().with("text",String.class).observe(this, new Observer<String>() {
@Override
public void onChanged(String s) {
ToastUtils.showShort(s);
}
});
}

public void sendMessage(View view) {
//发布消息
LiveDataBus.get().with("text").setValue("哈哈哈");
}

public void Jump(View view) {
Intent intent = new Intent(this,LiveDataSecondActivity.class);
startActivity(intent);
}
}
public class LiveDataSecondActivity extends AppCompatActivity {
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_live_data_bus);
LiveDataBus.get().with("text",String.class).observe(this, new Observer<String>() {
@Override
public void onChanged(String s) {
ToastUtils.showShort(s);
}
});
}
public void sendMessage(View view) {
LiveDataBus.get().with("text").setValue("我是第二个");
}

public void Jump(View view) {
}
}

效果如下:

可以看到已经收到了消息,不过发现两个个问题:

  1. 上面写的这个这个事件总线框架好像自带黏性效果,当我们从第一个Activity发送完消息之后,跳到第二个Activity,如果第二个Activity也订阅了跟第一个Activity中的消息通道中一样的消息,那它也能收到之前发布的消息
  2. 重复接收,当我们从第二个Activity发送一个消息,然后退回到第一个Activity,然后在进入第二个Activity,发现还能收到

怎么解决问题呢,首先需要先找到问题的原因

从LiveData的observe方法开始看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<? super T> observer) {
assertMainThread("observe");
if (owner.getLifecycle().getCurrentState() == DESTROYED) {
// ignore
return;
}
LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner, observer);
ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper);
if (existing != null && !existing.isAttachedTo(owner)) {
throw new IllegalArgumentException("Cannot add the same observer"
+ " with different lifecycles");
}
if (existing != null) {
return;
}
owner.getLifecycle().addObserver(wrapper);
}

这里面创建了一个LifecycleBoundObserver对象,它是observer的包装对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class LifecycleBoundObserver extends ObserverWrapper implements LifecycleEventObserver {
@NonNull
final LifecycleOwner mOwner;

LifecycleBoundObserver(@NonNull LifecycleOwner owner, Observer<? super T> observer) {
super(observer);
mOwner = owner;
}

@Override
boolean shouldBeActive() {
return mOwner.getLifecycle().getCurrentState().isAtLeast(STARTED);
}

@Override
public void onStateChanged(LifecycleOwner source, Lifecycle.Event event) {
if (mOwner.getLifecycle().getCurrentState() == DESTROYED) {
removeObserver(mObserver);
return;
}
activeStateChanged(shouldBeActive());
}

@Override
boolean isAttachedTo(LifecycleOwner owner) {
return mOwner == owner;
}

@Override
void detachObserver() {
mOwner.getLifecycle().removeObserver(this);
}
}

LifecycleBoundObserver继承自ObserverWrapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private abstract class ObserverWrapper {
final Observer<? super T> mObserver;
boolean mActive;
int mLastVersion = START_VERSION;

ObserverWrapper(Observer<? super T> observer) {
mObserver = observer;
}

abstract boolean shouldBeActive();

boolean isAttachedTo(LifecycleOwner owner) {
return false;
}

void detachObserver() {
}

void activeStateChanged(boolean newActive) {
if (newActive == mActive) {
return;
}
// immediately set active state, so we'd never dispatch anything to inactive
// owner
mActive = newActive;
boolean wasInactive = LiveData.this.mActiveCount == 0;
LiveData.this.mActiveCount += mActive ? 1 : -1;
if (wasInactive && mActive) {
onActive();
}
if (LiveData.this.mActiveCount == 0 && !mActive) {
onInactive();
}
if (mActive) {
dispatchingValue(this);
}
}
}

在这个ObserverWrapper的成员变量中,有一个mLastVersion,并被赋值为常量START_VERSION,START_VERSION的值为-1。ok先记下这个值。

下面在看发布消息的方法

1
2
3
4
5
6
protected void setValue(T value) {
assertMainThread("setValue");
mVersion++;
mData = value;
dispatchingValue(null);
}

首先调用了mVersion++;这个是当前的版本,也记住这个值。它是在LiveData创建的时候被赋值的如下

1
2
3
4
5
6
7
8
public LiveData(T value) {
mData = value;
mVersion = START_VERSION + 1;
}
public LiveData() {
mData = NOT_SET;
mVersion = START_VERSION;
}

前面我们知道mLastVersion的初始值为START_VERSION,所以mVersion的值刚开始的时候肯定是大于等于mLastVersion的。

然后调用了dispatchingValue方法分发消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void dispatchingValue(@Nullable ObserverWrapper initiator) {
if (mDispatchingValue) {
mDispatchInvalidated = true;
return;
}
mDispatchingValue = true;
do {
mDispatchInvalidated = false;
if (initiator != null) {
considerNotify(initiator);
initiator = null;
} else {
for (Iterator<Map.Entry<Observer<? super T>, ObserverWrapper>> iterator =
mObservers.iteratorWithAdditions(); iterator.hasNext(); ) {
considerNotify(iterator.next().getValue());
if (mDispatchInvalidated) {
break;
}
}
}
} while (mDispatchInvalidated);
mDispatchingValue = false;
}

我们传进来的initiator参数是null,所以这里面会循环消息通道,找到其中的观察者,然后调用considerNotify方法通知观察者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void considerNotify(ObserverWrapper observer) {
if (!observer.mActive) {
return;
}
if (!observer.shouldBeActive()) {
observer.activeStateChanged(false);
return;
}
if (observer.mLastVersion >= mVersion) {
return;
}
observer.mLastVersion = mVersion;
//noinspection unchecked
observer.mObserver.onChanged((T) mData);
}

重点来啦,这里有个判断if (observer.mLastVersion >= mVersion)当我们的observer包装对象中的mLastVersion大于或者等于mVersion的时候就返回,返回了也就不会调用下面的observer.mObserver.onChanged((T) mData)方法通知订阅者了。反之则会通知订阅者

然而前面我们知道,当我们新建一个订阅者的包装对象的时候,它的mLastVersion成员变量会被赋值为-1,mVersion初始值会被赋值为0或者-1,当调用setValue方法的时候,mVersion的值还会++,所以if (observer.mLastVersion >= mVersion)是不成立的,因此一直都能收到消息。

现在知道原因啦,下面就是解决问题了

那解决这个问题的思路就是想办法不让它走到considerNotify方法中的observer.mObserver.onChanged((T) mData);。所以这一行前面的代码我们都可以发挥想象空间。比如:

  1. 我们可以更改mLastVersion让它跟mVersion相等,这样就可以直接返回不会再走观察者的onChanged方法了。不过想改这个mLastVersion值,因为它所在的类是个私有的抽象类,我们无法直接拿到,所以只能通过反射更改了,有点麻烦。
  2. 事件分发的时候会回调观察者的onChanged方法,我们可以自己实现一个Observer的包装类,然后在其onChanged方法之前做拦截。

现在先使用第一种方法解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* 重写MutableLiveData,实现它的observe方法,在其中反射改变mLastVersion的值
* @param <T>
*/
private class BusMutableLiveData<T> extends MutableLiveData<T>{
@Override
public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<? super T> observer) {
super.observe(owner, observer);
//先调用super方法,将observer的包装对象放入map中在反射更改。
try {
hook(observer);
} catch (Exception e) {
e.printStackTrace();
}
}

private void hook(Observer<? super T> observer) throws Exception {
//获取LiveData的class
Class<LiveData> liveDataClass = LiveData.class;
//反射回去LiveData的成员变量mObservers
Field fileObservers = liveDataClass.getDeclaredField("mObservers");
//设置该属性可更改
fileObservers.setAccessible(true);
//get方法获取的是当前对象的实例,这里就是mObservers这个Map集合
Object objectObservers = fileObservers.get(this);
//获取map对象的类
Class<?> classObservers = objectObservers.getClass();
//获取集合的Map方法
Method methodGet = classObservers.getDeclaredMethod("get", Object.class);
//设置get方法可以被访问
methodGet.setAccessible(true);
//执行get方法拿出当前观察者对应的对象
Object objectWrapperEntry = methodGet.invoke(objectObservers,observer);
//定义一个空对象
Object objectWrapper = null;
//判断objectWrapperEntry是否是Map.Entry类型
if(objectWrapperEntry instanceof Map.Entry){
//如果是拿出他的值,其实就是LifecycleBoundObserver
objectWrapper = ((Map.Entry) objectWrapperEntry).getValue();
}
//如果是空抛个异常
if(objectWrapper == null){
throw new RuntimeException("objectWrapper is null");
}
//因为mLastVersion在LifecycleBoundObserver的父类ObserverWrapper中,所以拿到它的父类
Class<?> classObserverWrapper = objectWrapper.getClass().getSuperclass();
//获取到mLastVersion字段
Field fieldLastVersion = classObserverWrapper.getDeclaredField("mLastVersion");
//设置该字段可以更改
fieldLastVersion.setAccessible(true);

//获取LiveData中的mVersion值
Field fileVersion = liveDataClass.getDeclaredField("mVersion");
//设置该值可以被访问
fileVersion.setAccessible(true);
//获取mVersion的值
Object objVersion = fileVersion.get(this);
//给mLastVersion赋值
fieldLastVersion.set(objectWrapper,objVersion);
}
}

重写MutableLiveData,实现它的observe方法,在其中反射改变mLastVersion的值,然后把Map中的通道改成我们自己的BusMutableLiveData。这样就可以解决前面的问题了用法不变效果如下:

使用第二种方法解决:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private class BusMutableLiveData<T> extends MutableLiveData<T>{

int mCurrentVersion;
/**
* 是否需要更新数据,当主动调用setValue或者postValue的时候才触发
*/
@Override
public void setValue(T value) {
mCurrentVersion++;
super.setValue(value);
}

@Override
public void postValue(T value) {
mCurrentVersion++;
super.postValue(value);
}

@Override
public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<? super T> observer) {
super.observe(owner, new ObserverWrapper<T>(observer,mCurrentVersion,this));
}
}

private class ObserverWrapper<T> implements Observer<T>{

private Observer<? super T> mObserver;
private int mVersion;
private BusMutableLiveData<T> mLiveData;
public ObserverWrapper(Observer<? super T> observer,int version,BusMutableLiveData<T> liveData) {
mObserver = observer;
mVersion = version;
mLiveData = liveData;
}

@Override
public void onChanged(T t) {
if(mLiveData.mCurrentVersion>mVersion&&mObserver!=null){
mObserver.onChanged(t);
}
}
}

第二种方法,系统的version信息拿不到,我们仿照系统自己弄个mCurrentVersion,在新的观察者来的时候,吧当前mCurrentVersion传给他,setValue或者postValue的时候给通道的mCurrentVersion加一。

然后自定义一个观察者的包装类,ObserverWrapper,在其onChanged方法中判断isChangeData为true的时候才更新数据。用法不变效果跟前面那个一样。

源码位置在这里

コメント

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×