Spark中大量採用事件監聽方式,實現driver端的元件之間的通訊。本文就來解釋一下Spark中事件監聽是如何實現的觀察者模式和監聽器在設計模式中有一個觀察者模式,該模式建立一種
2021-07-16 03:07:08
Spark中大量採用事件監聽方式,實現driver端的元件之間的通訊。本文就來解釋一下Spark中事件監聽是如何實現的
在設計模式中有一個觀察者模式,該模式建立一種物件與物件之間的依賴關係,一個物件狀態發生改變時立即通知其他物件,其他物件就據此作出相應的反應。其中發生改變的物件稱之為觀察目標(也有叫主題的),被通知的物件稱之為觀察者,可以有多個觀察者註冊到一個觀察目標中,這些觀察者之間沒有聯絡,其數量可以根據需要增減。
Spark-Core內部的事件框架實現了基於事件的非同步化程式設計模式。它的最大好處是可以提升應用程式對物理資源的充分利用,能最大限度的壓榨物理資源,提升應用程式的處理效率。缺點比較明顯,降低了應用程式的可讀性。Spark的基於事件的非同步化程式設計框架由事件框架和非同步執行執行緒池組成,應用程式產生的Event傳送給ListenerBus,ListenerBus再把訊息廣播給所有的Listener,每個Listener收到Event判斷是否自己感興趣的Event,若是,會在Listener獨享的執行緒池中執行Event所對應的邏輯程式塊。下圖展示Event、ListenerBus、Listener、Executor的關係,從事件生成、事件傳播、事件解釋三個方面的視角來看。
我們從執行緒的視角來看,看非同步化處理。非同步化處理體現在事件傳播、事件解釋兩個階段,其中事件解釋的非同步化實現了我們的基於事件的非同步化程式設計。
Spark-Core、Spark-Streaming採用了分類的思路(分而治之)進行管理,每一大類事件都有獨自的Event、ListenerBus
Spark-Core的核心事件trait是SparkListenerEvent,Spark-Straming的核心事件trait是StreamingListenerEvent
下圖是各種事件實體類:
我們在定義事件需要注意哪些方面呢?我們以SparkListenerTaskStart為例,分析一個事件擁有哪些特徵。
Spark-Core的核心監聽triat是SparkListener,Spark-Streaming的核心監聽triat StreamingListener,兩者都代表了一類監聽的抽象
下圖是一些監聽實體類:
監聽器匯流排物件,Spark程式在運行的過程中,Driver端的很多功能都依賴於事件的傳遞和處理,而事件匯流排在這中間發揮著至關重要的紐帶作用。事件匯流排通過非同步執行緒,提高了Driver執行的效率。Listener註冊到ListenerBus物件中,然後通過ListenerBus物件來實現事件監聽(類似於計算機與周邊裝置之間的關係)
其start方法直接啟動一個dispatchThread,其核心邏輯就是不停地在一個事件佇列eventQueue裡取出事件,如果事件合法且LiverListenerBus沒有被關停,就將事件通知給所有註冊的listener中
其dispatch方法就是向事件佇列裡新增相應的事件。
ListenerBus用於管理所有的Listener,Spark-Core和Spark-Streaming公用相同的trait ListenerBus, 最終都是使用AsyncEventQueue類對Listener進行管理。
管理所有註冊的Listener,為一類Listener創建一個唯一的AsyncEventQueue,廣播Event到所有的Listener。預設可提供四類AsyncEventQueue分別為‘shared’、‘appStatus’、‘executorManagement’、‘eventLog’。目前Spark-Core並沒有放開類別設定,意謂著最多隻能有上述四類,從設計的嚴謹上來講分類並不是越多越好,每多一個類別,就會多一個AsyncEventQueue例項,每個例項中會包含一個事件傳播的執行緒,對系統的資源佔用還是比較多的。
private val listenerThread = new Thread(name) { setDaemon(true) //執行緒本身設為守護執行緒 override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { LiveListenerBus.withinListenerThread.withValue(true) { while (true) { eventLock.acquire()//不斷獲取訊號量,訊號量減一,能獲取到說明還有事件未處理 self.synchronized { processingEvent = true } try { val event = eventQueue.poll //獲取事件, remove() 和 poll() 方法都是從佇列中刪除第一個元素(head)。 if (event == null) { // 此時說明沒有事件,但還是拿到訊號量了,這說明stop方法被呼叫了 // 跳出while迴圈,關閉守護程序執行緒 if (!stopped.get) { throw new IllegalStateException("Polling `null` from eventQueue means" + " the listener bus has been stopped. So `stopped` must be true") } return } // 呼叫ListenerBus的postxToAll(event: E)方法 postxToAll(event) } finally { self.synchronized { processingEvent = false } } } } } }
private val started = new AtomicBoolean(false)private val stopped = new AtomicBoolean(false)//存放事件private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]// 表示佇列中產生和使用的事件數量的計數器,這個訊號量是為了避免消費者執行緒空跑private val eventLock = new Semaphore(0)
LiveListenerBus在SparkContext的setupAndStartListenerBus中被初始化,並呼叫start方法啟動LiveListenerBus。
def start(): Unit = { if (started.compareAndSet(false, true)) { listenerThread.start() //啟動消費者執行緒 } else { throw new IllegalStateException(s"$name already started!") }
停止LiveListenerBus,它將等待佇列事件被處理,但在停止後丟掉所有新的事件。需要注意stop可能會導致長時間的阻塞,執行stop方法的執行緒會被掛起,直到所有的AsyncEventQueue(預設四個)中的dispatch執行緒都退出後執行stop主法的執行緒才會被喚醒。
def stop(): Unit = { if (!started.get()) { throw new IllegalStateException(s"Attempted to stop $name that has not yet started!") } if (stopped.compareAndSet(false, true)) { // Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know // `stop` is called. // 釋放一個訊號量,但此時是沒有事件的,從而listenerThread會拿到一個空事件,從而知道該停止了 eventLock.release() //然後等待消費者執行緒自動關閉 listenerThread.join() } else { // Keep quiet } }
採用廣播的方式事件傳播,這個過程很快,主執行緒只需要把事件傳播給AsyncEventQueue即可,最後由AsyncEventQueue再廣播給相應的Listener
def post(event: SparkListenerEvent): Unit = { if (stopped.get) { // Drop further events to make `listenerThread` exit ASAP logError(s"$name has already stopped! Dropping event $event") return } // 在事件佇列隊尾新增事件 // add()和offer()區別:兩者都是往佇列尾部插入元素,不同的時候,當超出佇列界限的時候,add()方法是拋出異常讓你處理,而offer()方法是直接返回false val eventAdded = eventQueue.offer(event) if (eventAdded) { //如果成功加入佇列,則在訊號量中加一 eventLock.release() } else { // 如果事件佇列超過其容量,則將刪除新的事件,這些子類將被通知到刪除事件。 onDropEvent(event) droppedEventsCounter.incrementAndGet() } val droppedEvents = droppedEventsCounter.get if (droppedEvents > 0) { // Don't log too frequently 日誌不要太頻繁 // 如果上一次,佇列滿了EVENT_QUEUE_CAPACITY=1000設定的值,就丟掉,然後記錄一個時間,如果一直持續丟掉,那麼每過60秒記錄一次日誌,不然日誌會爆滿的 if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) { val prevLastReportTimestamp = lastReportTimestamp lastReportTimestamp = System.currentTimeMillis() // 記錄一個warn日誌,表示這個事件,被丟棄了 logWarning(s"Dropped $droppedEvents SparkListenerEvents since " + new java.util.Date(prevLastReportTimestamp)) } } } }
作者:luckywind_509
連結:https://my.oschina.net/whucxf/blog/5127818
相關文章
Spark中大量採用事件監聽方式,實現driver端的元件之間的通訊。本文就來解釋一下Spark中事件監聽是如何實現的觀察者模式和監聽器在設計模式中有一個觀察者模式,該模式建立一種
2021-07-16 03:07:08
昨天凌晨,蘋果剛釋出了iOS14.7 RC版(詳見iOS14.7 RC版值得升級嗎 iOS14.7 RC版體驗評測),今天凌晨蘋果又火速釋出了iOS 15的第 3 個測試版系統更新,版本號為 19A5297e,距離 7 月 1
2021-07-16 03:06:55
機床被視為「工業之母」,是現代製造業發展的基礎。小到螺釘,大到飛機、航母,生產工作都離不開機床的支援。而我國,雖然製造業發達,在國際上被譽為是世界工廠。但很長一段時間裡,中
2021-07-16 03:06:44
中國古代的教育家孔子曾提出了「因材施教」的教育理念和教學方法,即根據不同的學生認知水平和學習能力,進行有針對性的教學。傳承千年的教育理想,科大訊飛在深耕智慧教育領域的
2021-07-16 03:06:37
2021年7月15日,vivo舉辦了「你好,自然美」線上主題釋出會,正式釋出vivo S10系列新品——vivo S10及vivo S10 Pro。作為年輕人的新一代潮流自拍旗艦,vivo S10系列前置4400萬畫素
2021-07-16 03:06:22
AI的熱潮已經過去?許多關注AI的人或許會給出的判斷。這種判斷也有一些依據,AI發展三大要素之一的AI晶片,在2016年左右出現大量的初創公司,之後火熱了兩年左右時間後,又逐漸涼了下
2021-07-16 03:06:13