首頁 > 軟體

Android實現訊息匯流排的幾種方式詳解

2022-06-15 14:00:26

前言

訊息匯流排又叫事件匯流排,為什麼我們需要一個訊息匯流排呢?是因為隨著專案變大,頁面變多,我們可能出現跨頁面、跨元件、跨執行緒、跨程序傳遞訊息與資料,為了更方便的直接通知到指定的頁面實現具體的邏輯,我們需要訊息匯流排來實現。

從最基本的 BroadcastReceiver 到 EventBus 再到RxBus ,後來官方出了AndroidX jetpack 我們開始使用LiveDataBus,最後到Kotlin的流行出來了FlowBus。我們看看他們是怎麼一步一步演變的。

一、BroadcastReceiver 廣播

我們再初入 Android 的時候都應該學過廣播接收者,分為靜態廣播和動態註冊廣播,在高版本的 Android 中限制了我們一些靜態廣播的使用,不過我們還是能通過動態註冊的方式獲取一些系統的狀態改變。像常用的電量變化、網路狀態變化、簡訊傳送接收的狀態等等。

比如網路變化的監聽:

    IntentFilter intentFilter = new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION);
    application.getApplicationContext().registerReceiver(InstanceHolder.INSTANCE, intentFilter);

在訊息中線中,我們可以使用本地廣播來實現 LocalBroadcastManager 訊息的通知。

    LocalBroadcastManager mLocalBroadcastManager = LocalBroadcastManager.getInstance(mContext);
    
    BroadcastReceiver  mLoginReceiver = new LoginSuccessReceiver();
    mLocalBroadcastManager.registerReceiver(mLoginReceiver, new IntentFilter(Constants.ACTION_LOGIN_SUCCESS));

    private class LoginSuccessReceiver extends BroadcastReceiver {
        @Override
        public void onReceive(Context context, Intent intent) {
            //重新整理Home介面
            refreshHomePage();
        
            //重新整理未讀資訊
            requestUnreadNum();
        }
    }

    //記得要解綁對應的接收器
    mLocalBroadcastManager.unregisterReceiver(mLoginReceiver);

這樣就可以實現一個訊息通知了。相比 EventBus 它的效能和空間的消耗都是較大的,並且只能固定在主執行緒執行。

二、EventBus

EventBus最大的特點就是簡潔、解耦,可以直接傳遞我們自定義的訊息Message。EventBus簡化了應用程式內各元件間、元件與後臺執行緒間的通訊。記得2015年左右是非常火爆的。

EventBus的排程靈活,不依賴於 Context,使用時無需像廣播一樣關注 Context 的注入與傳遞。可繼承、優先順序、粘滯,是 EventBus 比之於廣播的優勢。幾乎可以滿足我們全部的需求。

最初的EventBus其實就是一個方法的集合與查詢,核心是通過register方法把帶有@Subscrib註解的方法和引數之類的東西全部放入一個List集合,然後通過post方法去這個list迴圈查詢到符合條件的方法去執行。

如何使用EventBus,一共分5步:

  @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_event_bus);
        EventBus.getDefault().register(MainActivity.this);  //1.註冊廣播
    }
  @Override
    protected void onDestroy() {
        super.onDestroy();
        EventBus.getDefault().unregister(MainActivity.this); //2.解註冊廣播
    }
/**
 * 3.傳遞什麼型別的。定義一個訊息類
 */
public class MessageEvent {
    public String name;

    public MessageEvent(String name) {
        this.name = name;
    }
}
    @OnClick({R.id.bt_eventbus_send_main, R.id.bt_eventbus_send_sticky})
    public void onClick(View view) {
        switch (view.getId()) {
            case R.id.bt_eventbus_send_main:
                //4.傳送訊息
                EventBus.getDefault().post(new MessageEvent("我是主頁面傳送過來的訊息"));
                finish();
                break;
        }
    }
   /**
     * 5.接受到訊息。需要註解
     *
     * @param event
     */
    @Subscribe(threadMode = ThreadMode.MAIN)   //主執行緒執行
    public void MessageEventBus(MessageEvent event) {
        //5。顯示接受到的訊息
        mTvEventbusResult.setText(event.name);
    }

EventBus的效能開銷其實不大,EventBus2.4.0 版是利用反射來實現的,後來改成 APT 實現之後會好很多。主要問題是需要定義很多的訊息物件,訊息太多之後就感覺管理起來很麻煩。當訊息太多之後容器內部的查詢會出現效能瓶頸。

就算如此 EventBus 也是值得大家使用的。

三、RxBus

RxBus是基於RxJava實現的,強大是強大,但是學習成本比較高,需要額外匯入RxJava RxAndroid等庫,這些庫體積還是較大的。可以實現非同步的訊息等。

本身的實現是很簡單的:

public class RxBus {
    private volatile static RxBus mDefaultInstance;
    private final Subject<Object> mBus;

    private RxBus() {
        mBus = PublishSubject.create().toSerialized();
    }

    public static RxBus getInstance() {
        if (mDefaultInstance == null) {
            synchronized (RxBus.class) {
                if (mDefaultInstance == null) {
                    mDefaultInstance = new RxBus();
                }
            }
        }
        return mDefaultInstance;
    }

    /**
     * 傳送事件
     */
    public void post(Object event) {
        mBus.onNext(event);
    }

    /**
     * 根據傳遞的 eventType 型別返回特定型別(eventType)的 被觀察者
     */
    public <T> Observable<T> toObservable(final Class<T> eventType) {
        return mBus.ofType(eventType);
    }

    /**
     * 判斷是否有訂閱者
     */
    public boolean hasObservers() {
        return mBus.hasObservers();
    }

    public void reset() {
        mDefaultInstance = null;
    }
}

定義訊息物件:

public class MsgEvent {
    private String msg;

    public MsgEvent(String msg) {
        this.msg = msg;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }
}

傳送與接收:

RxBus.getInstance().toObservable(MsgEvent.class).subscribe(new Observer<MsgEvent>() {
            @Override
            public void onSubscribe(Disposable d) {
                
            }

            @Override
            public void onNext(MsgEvent msgEvent) {
                //處理事件
            }

            @Override
            public void onError(Throwable e) {
                  
            }

            @Override
            public void onComplete() {

            }
        });
RxBus.getInstance().post(new MsgEvent("Java"));

缺點是容易記憶體洩露,我們需要使用rxlifecycle 或者使用CompositeDisposable 自己對生命週期進行處理解綁。

四、LiveDataBus

官方出了AndroidX jetpack 內部包含LiveData,它可以感知並遵循Activity、Fragment或Service等元件的生命週期。

為什麼要使用LiveDataBus,正是基於LiveData對元件生命週期可感知的特點,因此可以做到僅在元件處於生命週期的啟用狀態時才更新UI資料。

一個簡單的LiveDataBus的實現:

public final class LiveDataBus {
    private final Map<String, BusMutableLiveData<Object>> bus;
 
    private LiveDataBus() {
        bus = new HashMap<>();
    }
 
    private static class SingletonHolder {
        private static final LiveDataBus DEFAULT_BUS = new LiveDataBus();
    }
 
    public static LiveDataBus get() {
        return SingletonHolder.DEFAULT_BUS;
    }
 
    public <T> MutableLiveData<T> with(String key, Class<T> type) {
        if (!bus.containsKey(key)) {
            bus.put(key, new BusMutableLiveData<>());
        }
        return (MutableLiveData<T>) bus.get(key);
    }
 
    public MutableLiveData<Object> with(String key) {
        return with(key, Object.class);
    }
 
    private static class ObserverWrapper<T> implements Observer<T> {
 
        private Observer<T> observer;
 
        public ObserverWrapper(Observer<T> observer) {
            this.observer = observer;
        }
 
        @Override
        public void onChanged(@Nullable T t) {
            if (observer != null) {
                if (isCallOnObserve()) {
                    return;
                }
                observer.onChanged(t);
            }
        }
 
        private boolean isCallOnObserve() {
            StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
            if (stackTrace != null && stackTrace.length > 0) {
                for (StackTraceElement element : stackTrace) {
                    if ("android.arch.lifecycle.LiveData".equals(element.getClassName()) &&
                            "observeForever".equals(element.getMethodName())) {
                        return true;
                    }
                }
            }
            return false;
        }
    }
 
    private static class BusMutableLiveData<T> extends MutableLiveData<T> {
 
        private Map<Observer, Observer> observerMap = new HashMap<>();
 
        @Override
        public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) {
            super.observe(owner, observer);
            try {
                hook(observer);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
 
        @Override
        public void observeForever(@NonNull Observer<T> observer) {
            if (!observerMap.containsKey(observer)) {
                observerMap.put(observer, new ObserverWrapper(observer));
            }
            super.observeForever(observerMap.get(observer));
        }
 
        @Override
        public void removeObserver(@NonNull Observer<T> observer) {
            Observer realObserver = null;
            if (observerMap.containsKey(observer)) {
                realObserver = observerMap.remove(observer);
            } else {
                realObserver = observer;
            }
            super.removeObserver(realObserver);
        }
 
        private void hook(@NonNull Observer<T> observer) throws Exception {
            //get wrapper's version
            Class<LiveData> classLiveData = LiveData.class;
            Field fieldObservers = classLiveData.getDeclaredField("mObservers");
            fieldObservers.setAccessible(true);
            Object objectObservers = fieldObservers.get(this);
            Class<?> classObservers = objectObservers.getClass();
            Method methodGet = classObservers.getDeclaredMethod("get", Object.class);
            methodGet.setAccessible(true);
            Object objectWrapperEntry = methodGet.invoke(objectObservers, observer);
            Object objectWrapper = null;
            if (objectWrapperEntry instanceof Map.Entry) {
                objectWrapper = ((Map.Entry) objectWrapperEntry).getValue();
            }
            if (objectWrapper == null) {
                throw new NullPointerException("Wrapper can not be bull!");
            }
            Class<?> classObserverWrapper = objectWrapper.getClass().getSuperclass();
            Field fieldLastVersion = classObserverWrapper.getDeclaredField("mLastVersion");
            fieldLastVersion.setAccessible(true);
            //get livedata's version
            Field fieldVersion = classLiveData.getDeclaredField("mVersion");
            fieldVersion.setAccessible(true);
            Object objectVersion = fieldVersion.get(this);
            //set wrapper's version
            fieldLastVersion.set(objectWrapper, objectVersion);
        }
    }
}

註冊與傳送:

LiveDataBus.get()
        .with("key_test", String.class)
        .observe(this, new Observer<String>() {
            @Override
            public void onChanged(@Nullable String s) {
            }
        });

LiveDataBus.get().with("key_test").setValue(s);

LiveDataBus已經算是很好用的,自動註冊解綁,根據Key傳遞泛型T物件,容易查詢對應的接收者,也可以實現可見的觸發和直接觸發,可以實現跨程序,

LiveData有幾點不足,只能在主執行緒更新資料,操作符無法轉換資料,基於 Android Api 實現的,換一個平臺無法適應,基於這幾點又開發出了FlowBus。

五、FlowBus

很多人都說Flow 的出現導致 LiveData 沒那麼重要了,就是因為 LiveData 的場景 都可以使用 Flow 平替了,還能更為的強大和靈活。

StateFlow 可以 替代ViewModel中傳遞資料,SharedFlow 可以實現事件匯流排。(這兩者的異同如果大家有興趣,我可以單獨開一篇講下)。

SharedFlow 就是一種熱流,可以實現一對多的關係,其構造方法支援天然支援普通的訊息傳送與粘性的訊息傳送。一般我們FlowBus都是基於 SharedFlow 來實現:

object FlowBus {
    private val busMap = mutableMapOf<String, EventBus<*>>()
    private val busStickMap = mutableMapOf<String, StickEventBus<*>>()

    @Synchronized
    fun <T> with(key: String): EventBus<T> {
        var eventBus = busMap[key]
        if (eventBus == null) {
            eventBus = EventBus<T>(key)
            busMap[key] = eventBus
        }
        return eventBus as EventBus<T>
    }

    @Synchronized
    fun <T> withStick(key: String): StickEventBus<T> {
        var eventBus = busStickMap[key]
        if (eventBus == null) {
            eventBus = StickEventBus<T>(key)
            busStickMap[key] = eventBus
        }
        return eventBus as StickEventBus<T>
    }

    //真正實現類
    open class EventBus<T>(private val key: String) : LifecycleObserver {

        //私有物件用於傳送訊息
        private val _events: MutableSharedFlow<T> by lazy {
            obtainEvent()
        }

        //暴露的公有物件用於接收訊息
        val events = _events.asSharedFlow()

        open fun obtainEvent(): MutableSharedFlow<T> = MutableSharedFlow(0, 1, BufferOverflow.DROP_OLDEST)

        //主執行緒接收資料
        fun register(lifecycleOwner: LifecycleOwner, action: (t: T) -> Unit) {
            lifecycleOwner.lifecycle.addObserver(this)
            lifecycleOwner.lifecycleScope.launch {
                events.collect {
                    try {
                        action(it)
                    } catch (e: Exception) {
                        e.printStackTrace()
                        YYLogUtils.e("FlowBus - Error:$e")
                    }
                }
            }
        }

        //協程中傳送資料
        suspend fun post(event: T) {
            _events.emit(event)
        }

        //主執行緒傳送資料
        fun post(scope: CoroutineScope, event: T) {
            scope.launch {
                _events.emit(event)
            }
        }

        //自動銷燬
        @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
        fun onDestroy() {
            YYLogUtils.w("FlowBus - 自動onDestroy")
            val subscriptCount = _events.subscriptionCount.value
            if (subscriptCount <= 0)
                busMap.remove(key)
        }
    }

    class StickEventBus<T>(key: String) : EventBus<T>(key) {
        override fun obtainEvent(): MutableSharedFlow<T> = MutableSharedFlow(1, 1, BufferOverflow.DROP_OLDEST)
    }
}

傳送與接收訊息

    // 主執行緒-傳送訊息
    FlowBus.with<String>("test-key-01").post(this@Demo11OneFragment2.lifecycleScope, "Test Flow Bus Message")
    // 接收訊息
    FlowBus.with<String>("test-key-01").register(this) {
            LogUtils.w("收到FlowBus訊息 - " + it)
        }

傳送粘性訊息

 FlowBus.withStick<String>("test-key-02").post(lifecycleScope, "Test Stick Message")
   FlowBus.withStick<String>("test-key-02").register(this){
            LogUtils.w("收到粘性訊息:$it")
        }

Log如下:

總結

其實這麼多訊息匯流排框架,我更推薦EventBus LiveDataBus FlowBus這三種。

總的來說,我們儘量不依賴第三方的框架來實現,那麼 FlowBus 是語言層級的,基於Kotlin的特性實現,比較推薦了,我本人是喜歡用LiveDataBus,基於 Android 開發場景來看,幾乎能滿足全部要求了。

如果大家有原始碼方面的需求可以看看這裡,上面的原始碼也都貼出來了,不過更推薦大家根據不同的型別自行去 Github 上面找對應的實現封裝,功能會更多,健壯性更好。

到此這篇關於Android實現訊息匯流排的幾種方式的文章就介紹到這了,更多相關Android訊息匯流排實現內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com