首頁 > 科技

死磕Spark事件匯流排——聊聊Spark中事件監聽是如何實現的

2021-07-16 03:07:08

Spark中大量採用事件監聽方式,實現driver端的元件之間的通訊。本文就來解釋一下Spark中事件監聽是如何實現的

觀察者模式和監聽器

在設計模式中有一個觀察者模式,該模式建立一種物件與物件之間的依賴關係,一個物件狀態發生改變時立即通知其他物件,其他物件就據此作出相應的反應。其中發生改變的物件稱之為觀察目標(也有叫主題的),被通知的物件稱之為觀察者,可以有多個觀察者註冊到一個觀察目標中,這些觀察者之間沒有聯絡,其數量可以根據需要增減。

事件驅動的非同步化程式設計

Spark-Core內部的事件框架實現了基於事件的非同步化程式設計模式。它的最大好處是可以提升應用程式對物理資源的充分利用,能最大限度的壓榨物理資源,提升應用程式的處理效率。缺點比較明顯,降低了應用程式的可讀性。Spark的基於事件的非同步化程式設計框架由事件框架和非同步執行執行緒池組成,應用程式產生的Event傳送給ListenerBus,ListenerBus再把訊息廣播給所有的Listener,每個Listener收到Event判斷是否自己感興趣的Event,若是,會在Listener獨享的執行緒池中執行Event所對應的邏輯程式塊。下圖展示Event、ListenerBus、Listener、Executor的關係,從事件生成、事件傳播、事件解釋三個方面的視角來看。

我們從執行緒的視角來看,看非同步化處理。非同步化處理體現在事件傳播、事件解釋兩個階段,其中事件解釋的非同步化實現了我們的基於事件的非同步化程式設計。

Spark的實現

Spark-Core、Spark-Streaming採用了分類的思路(分而治之)進行管理,每一大類事件都有獨自的Event、ListenerBus

Event

Spark-Core的核心事件trait是SparkListenerEvent,Spark-Straming的核心事件trait是StreamingListenerEvent

下圖是各種事件實體類:

我們在定義事件需要注意哪些方面呢?我們以SparkListenerTaskStart為例,分析一個事件擁有哪些特徵。

  1. 見名知義,SparkListenerTaskStart,一看名字我們就能猜到是SparkListener的一個任務啟動事件。
  2. 觸發條件,一個事件的觸發條件必須清晰,能夠清晰的描述一個行為,且行為宿主最好是唯一的。SparkListenerTaskStart事件生成的宿主是DAGScheduler,在DAGScheduler產生BeginEvent事件後生成SparkListenerTaskStart。
  3. 事件傳播,事件傳播可選擇Point-Point或者BroadCast,這個可根據業務上的需要權衡、選擇。Spark-Core、Spark-Streaming的事件框架採用BroadCast模式。
  4. 事件解釋,一個事件可以有一個或者多個解釋。Spark-Core、Spark-Streaming由於採用BroadCast模式,所以支援Listener對事件解釋,原則一個Listener對一個事件只有一種解釋。AppStatusListener、EventLoggingListener、ExecutorAllocationManager等分別對SparkListenerTaskStart做了解釋。 我們在設計事件框架上可根據實際需要借鑑以上四點,設計一個最恰當的事件框架。

Listner

Spark-Core的核心監聽triat是SparkListener,Spark-Streaming的核心監聽triat StreamingListener,兩者都代表了一類監聽的抽象

下圖是一些監聽實體類:

ListenerBus

監聽器匯流排物件,Spark程式在運行的過程中,Driver端的很多功能都依賴於事件的傳遞和處理,而事件匯流排在這中間發揮著至關重要的紐帶作用。事件匯流排通過非同步執行緒,提高了Driver執行的效率。Listener註冊到ListenerBus物件中,然後通過ListenerBus物件來實現事件監聽(類似於計算機與周邊裝置之間的關係)

其start方法直接啟動一個dispatchThread,其核心邏輯就是不停地在一個事件佇列eventQueue裡取出事件,如果事件合法且LiverListenerBus沒有被關停,就將事件通知給所有註冊的listener中

其dispatch方法就是向事件佇列裡新增相應的事件。

ListenerBus用於管理所有的Listener,Spark-Core和Spark-Streaming公用相同的trait ListenerBus, 最終都是使用AsyncEventQueue類對Listener進行管理。

LiveListenerBus:

管理所有註冊的Listener,為一類Listener創建一個唯一的AsyncEventQueue,廣播Event到所有的Listener。預設可提供四類AsyncEventQueue分別為‘shared’、‘appStatus’、‘executorManagement’、‘eventLog’。目前Spark-Core並沒有放開類別設定,意謂著最多隻能有上述四類,從設計的嚴謹上來講分類並不是越多越好,每多一個類別,就會多一個AsyncEventQueue例項,每個例項中會包含一個事件傳播的執行緒,對系統的資源佔用還是比較多的。

非同步事件處理執行緒listenerThread

  private val listenerThread = new Thread(name) {    setDaemon(true) //執行緒本身設為守護執行緒     override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {      LiveListenerBus.withinListenerThread.withValue(true) {        while (true) {          eventLock.acquire()//不斷獲取訊號量,訊號量減一,能獲取到說明還有事件未處理          self.synchronized {            processingEvent = true          }          try {            val event = eventQueue.poll  //獲取事件, remove() 和 poll() 方法都是從佇列中刪除第一個元素(head)。            if (event == null) {              // 此時說明沒有事件,但還是拿到訊號量了,這說明stop方法被呼叫了              // 跳出while迴圈,關閉守護程序執行緒              if (!stopped.get) {                throw new IllegalStateException("Polling `null` from eventQueue means" +                  " the listener bus has been stopped. So `stopped` must be true")              }              return            }            // 呼叫ListenerBus的postxToAll(event: E)方法            postxToAll(event)          } finally {            self.synchronized {              processingEvent = false            }          }        }      }    }  }

核心屬性

private val started = new AtomicBoolean(false)private val stopped = new AtomicBoolean(false)//存放事件private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]// 表示佇列中產生和使用的事件數量的計數器,這個訊號量是為了避免消費者執行緒空跑private val eventLock = new Semaphore(0)

核心方法

start

LiveListenerBus在SparkContext的setupAndStartListenerBus中被初始化,並呼叫start方法啟動LiveListenerBus。

  def start(): Unit = {    if (started.compareAndSet(false, true)) {       listenerThread.start() //啟動消費者執行緒    } else {      throw new IllegalStateException(s"$name already started!")    }

stop

停止LiveListenerBus,它將等待佇列事件被處理,但在停止後丟掉所有新的事件。需要注意stop可能會導致長時間的阻塞,執行stop方法的執行緒會被掛起,直到所有的AsyncEventQueue(預設四個)中的dispatch執行緒都退出後執行stop主法的執行緒才會被喚醒。

  def stop(): Unit = {    if (!started.get()) {      throw new IllegalStateException(s"Attempted to stop $name that has not yet started!")    }    if (stopped.compareAndSet(false, true)) {      // Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know      // `stop` is called.      // 釋放一個訊號量,但此時是沒有事件的,從而listenerThread會拿到一個空事件,從而知道該停止了      eventLock.release()      //然後等待消費者執行緒自動關閉      listenerThread.join()    } else {      // Keep quiet    }  }

post

採用廣播的方式事件傳播,這個過程很快,主執行緒只需要把事件傳播給AsyncEventQueue即可,最後由AsyncEventQueue再廣播給相應的Listener

def post(event: SparkListenerEvent): Unit = {    if (stopped.get) {      // Drop further events to make `listenerThread` exit ASAP      logError(s"$name has already stopped! Dropping event $event")      return    }    // 在事件佇列隊尾新增事件    // add()和offer()區別:兩者都是往佇列尾部插入元素,不同的時候,當超出佇列界限的時候,add()方法是拋出異常讓你處理,而offer()方法是直接返回false    val eventAdded = eventQueue.offer(event)    if (eventAdded) {      //如果成功加入佇列,則在訊號量中加一      eventLock.release()    } else {      // 如果事件佇列超過其容量,則將刪除新的事件,這些子類將被通知到刪除事件。      onDropEvent(event)      droppedEventsCounter.incrementAndGet()    }    val droppedEvents = droppedEventsCounter.get    if (droppedEvents > 0) {      // Don't log too frequently   日誌不要太頻繁      // 如果上一次,佇列滿了EVENT_QUEUE_CAPACITY=1000設定的值,就丟掉,然後記錄一個時間,如果一直持續丟掉,那麼每過60秒記錄一次日誌,不然日誌會爆滿的      if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {        if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {          val prevLastReportTimestamp = lastReportTimestamp          lastReportTimestamp = System.currentTimeMillis()          // 記錄一個warn日誌,表示這個事件,被丟棄了          logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +            new java.util.Date(prevLastReportTimestamp))        }      }    }  }

完整流程

  1. 圖中的DAGScheduler、SparkContext、BlockManagerMasterEndpoint、DriverEndpoint及LocalSchedulerBackend都是LiveListenerBus的事件來源,它們都是通過呼叫LiveListenerBus的post方法將訊息提交給事件佇列,每post一個事件,訊號量就加一。
  2. listenerThread不停的獲取訊號量,然後從事件佇列中取出事件,取到事件,則呼叫postForAll把事件分發給已註冊的監聽器,否則,就是取到空事件,它明白這是事件匯流排搞的鬼,它呼叫了stop但是每post事件,從而停止事件匯流排執行緒。

作者:luckywind_509

連結:https://my.oschina.net/whucxf/blog/5127818


IT145.com E-mail:sddin#qq.com