首頁 > 軟體

Ruby多執行緒庫(Thread)使用方法詳解

2022-04-18 16:00:34

Thread是Ruby的執行緒庫,Thread庫已經內建在Ruby中,但如果想要使用執行緒安全的Queue、Mutex以及條件變數等,則需要手動require 'thread'

主執行緒main

預設情況下,每個Ruby程序都具備一個主執行緒main,如果沒有建立新的執行緒,所有的程式碼都將在這個主執行緒分支中執行。

使用Thread.main()類方法可獲取當前執行緒組的主執行緒,使用Thread.current()可以獲取當前正在執行的執行緒分支。使用Thread.list()可獲取當前行程群組中所有存活的執行緒。

p Thread.main
p Thread.current
p Thread.main == Thread.current
=begin
#<Thread:0x0000000001d9ae58 run>
#<Thread:0x0000000001d9ae58 run>
true
=end

可見,執行緒其實是一個Thread類的範例物件。

建立Ruby執行緒

使用Thread庫的new()、start()、fork()可建立執行緒,它們幾乎等價,且後兩者是別名關係。

建立執行緒時需傳遞一個程式碼塊或Proc物件引數, 它們是要執行的任務,它們將在新的執行緒分支中執行。如果需要,可以為程式碼塊或Proc物件傳遞引數。

arr=[]
a,b,C=1,2,3
Thread.new(a,b,c) { |d,e,f| arr << d << e << f }
sleep 1
p arr   #=> [1,2,3]

如果主執行緒先執行完成,主執行緒將直接退出,主執行緒的退出將會終止程序,使得其它執行緒也會退出。

Thread.new {puts "hello"}
puts "world"

上述程式碼幾乎總是會輸出world,然後退出,主執行緒的退出使得子執行緒不會輸出"hello"。之所以總是會輸出world而不是輸出hello,這和Ruby的執行緒排程有關,在後面的文章中會詳細解釋Ruby中的執行緒排程。

join()和value()等待執行緒

如果想要等待某個執行緒先執行完成,可使用t.join(),如果執行緒t尚未退出,則join()會阻塞。可以在任意執行緒中呼叫t.join(),誰呼叫誰等待。

t = Thread.new { puts "I am Child" }
t.join  # 等待子執行緒執行完成
puts "I am Parent"

還可以將多個執行緒物件放進陣列,然後執行遍歷join,另一種常見的做法是使用map{}.each(&:join)的方式:

threads = []
3.times do |i|
  # 將多個執行緒加入到陣列中
  threads << Thread.new { puts "Thread #{i}" }
end

# 在main執行緒中join每個執行緒,
# 因此只有3個執行緒全都完成後,main執行緒才會繼續,即退出
threads.each(&:join)
=begin
Thread 1
Thread 0
Thread 2
=end

# 另一種常見方式
3.times.map {|i| Thread.new { puts "Thread #{i}" } }.each(&:join)
Array.new(3) {|i| Thread.new { puts "Thread #{i}" } }.each(&:join)

t.value()t.join()類似,不同之處在於t.value()在內部呼叫t.join()等待執行緒t之後,還會在等待成功時取得該執行緒的返回值。

a = Thread.new { 2 + 2 }
p a.value   #=> 4

注意,對於Ruby來說,無論是否執行join()操作,任務執行完成的執行緒都會馬上被作業系統回收(從OS執行緒表中刪除),但被回收的執行緒仍然能夠使用value()方法來獲取被回收執行緒的返回值。之所以會這樣,我個人猜想,也許是因為Ruby內部已經幫我們執行了join操作並將執行緒返回值儲存在Ruby內部,這樣對於使用者來說就更加安全,而且使用者執行join()或value()操作,可能是在等待Ruby內部的這個值的出現。

執行緒的例外處理

預設情況下,當某個非main執行緒中丟擲異常後,該執行緒將因異常而終止,但是它的終止不會影響其它執行緒。

t = Thread.new {raise "hello"}    # 丟擲異常
sleep 1    # 仍然睡眠1秒後退出

如果使用了t.join()t.value()去等待丟擲異常的執行緒t,異常將會傳播給呼叫這兩個方法的執行緒。例如主執行緒呼叫t.join,如果t會丟擲一次異常,那麼主執行緒在等待過程中還會丟擲一次異常。

t = Thread.new {raise "hello"}    # 丟擲異常
t.join()    # 子執行緒拋異常後,main執行緒也拋異常

如果想要讓任意執行緒出現異常時終止整個程式,可設定類方法Thread.abort_on_exception為true,它會在任意子執行緒丟擲異常後自動傳播給main執行緒,從而終止程序:

Thread.abort_on_exception = true
Thread.new { raise "Error" }
sleep 1   # 不會睡眠完1秒,而是子執行緒異常後立即異常退出

如果想要讓某個特定的執行緒出現異常時終止整個程式,可設定同名的實體方法t.abort_on_exception為true,只有t執行緒異常時才會終止程式。

t1 = Thread.new { raise "Error from t1" }
t1.abort_on_exception = true
sleep 1

另外,執行緒實體方法t.raise()可以直接線上程t丟擲異常。

需注意,Ruby執行緒有一個巨大的缺點:無論是raise丟擲異常還是各種終止(比如kill、exit),都不會執行ensure子句。

執行緒的狀態和生命週期

Ruby中的執行緒具有5種狀態,可通過t.status()檢視,該方法有5種對應的返回值:

- run: 執行緒正在執行(running)或可執行(runnable)  
- sleep: 執行緒處於睡眠態,比如阻塞(如sleep,mutex,io block)  
- false: 執行緒正常退出後的狀態,包括執行完流程、手動退出(t.exit)、訊號終止(t.kill)  
- nil: 執行緒因丟擲異常(比如raise)而退出的狀態  
- aborting: 執行緒被完全kill之前的過渡狀態,不考慮這種狀態的存在

另外,還有兩種統稱狀態:

  • alive:存活的執行緒,等價於run + sleep
  • stop:已停止的執行緒,等價於sleep + dead(false+nil)

可分別使用alive?()stop?()來判斷執行緒是否屬於這兩種統稱狀態。

此外:

Kernel.sleep:讓當前執行緒睡眠指定時長,無引數則永久睡眠,執行緒將進入睡眠佇列
Thread.stop:讓當前執行緒睡眠,進入睡眠佇列,等價於無引數的sleep  
Thread.pass:轉讓CPU,當前執行緒進入就緒佇列而不是睡眠佇列  
t.run:喚醒執行緒t使其進入就緒佇列,同時讓當前執行緒放棄CPU,排程程式將重新排程  
t.wakeup:喚醒執行緒t使其進入就緒佇列,但不會讓當前執行緒放棄CPU,排程程式將不會立即重新排程  

Thread.kill:終止指定執行緒,它將不再被排程  
Thread.exit:終止當前執行緒,它將不再被排程  
t.exit,t.kill,t.terminate:終止執行緒t,t將不再被排程

幾個注意事項:

  • 這裡5個終止執行緒的方式效果上是完全等價的,三個實體方法是別名關係,而兩個類方法的內部也都是呼叫執行緒物件的kill
  • 最好要不加區分地看待run和wakeup
  • 對於Thread.pass,除了知道它轉讓CPU的行為是確定的,不要對它假設任何額外的行為,比如不要認為出讓CPU後一定會排程到其它Ruby執行緒,很有可能會在排程其它一些非Ruby執行緒後再次先排程到本執行緒而非其它Ruby執行緒
  • 需注意,無論是raise丟擲異常還是各種終止(比如kill、exit),都不會執行ensure子句

執行緒私有變數和區域性變數

Ruby程序內的所有執行緒共用程序的虛擬地址空間,所以共用了一些資料。

但執行緒是語句塊或者Proc物件,所以語句塊內部建立的變數是在當前執行緒棧內部的,是每個執行緒私有的變數。

# 主執行緒中的變數
a = 1

# 子執行緒
t1 = Thread.new(3) do |x|
  a += 1
  b=3
  x=4
end

# 主執行緒
t1.join
p a   # 2
#p b  # 報錯,b不存在
#p x  # 報錯,x不存在

Ruby為執行緒提供了區域性變數共用的概念,每個執行緒物件都可以有自己的區域性資料空間(即執行緒本地變數),執行緒物件的區域性空間互不影響,比如兩個執行緒中同時進行正則匹配,兩個執行緒的$~是不一樣且互不影響的。

執行緒物件t的區域性資料空間是t[key]=value,即一個名為t的hash結構,因為物件t是可以共用的,所以它的區域性空間也是共用的。

t1 = Thread.new do
  t = Thread.current
  t[:name] = "junmajinlong"
  t[:age] = 23
end

t1.join

p t1.keys          # [:name, :age]
p t1.key? :gender  # false
p t1[:name]        # "junmajinlong"
t1[:age] = 24  
p t1[:age]         # 24

所以,有這麼幾個方法:

t[key]
t[key]=
t.keys
t.key?

此外還有一個fetch()方法,類似於Hash的fetch(),預設情況下存取不存在的key會異常,可指定預設值或通過語句塊返回預設值。

嚴格來說,從Ruby 1.9出現Fiber之後,t[]不再是執行緒本地變數(thread-local),而是纖程(Fiber)本地變數(fiber-local)。但也支援使用執行緒本地變數:

t.thread_variables
t.thread_variable?
t.thread_variable_get
t.thread_variable_set

執行緒組

預設情況下,所有執行緒都在預設的執行緒組中,這個預設執行緒組是Ruby程式啟動時建立的。可使用ThreadGroup::Default獲取預設執行緒組。

t1 = Thread.new do
  Thread.stop
end

p t1.group
p Thread.current.group
p ThreadGroup::Default
=begin
#<ThreadGroup:0x00000000019bcb60>
#<ThreadGroup:0x00000000019bcb60>
#<ThreadGroup:0x00000000019bcb60>
=end
  • 使用ThreadGroup.new可建立一個自定義的執行緒組
  • 使用tg.add(t)可將執行緒t加入執行緒組tg,這將會從原來的執行緒組移除t再加入新組tg
  • 使用tg.list可列出執行緒組tg中的所有執行緒
  • 使用t.group可獲取執行緒t所屬的執行緒組
  • 子執行緒會繼承父執行緒的執行緒組,即子執行緒也會加入父執行緒所在的執行緒組
tg = ThreadGroup.new
t1 = Thread.new { Thread.stop }
t2 = Thread.new { Thread.stop }
tg.add t1
tg.add t2
pp tg.list
pp t1.group
=begin
[#<Thread:0x000000000196c480 a.rb:4 sleep_forever>,
 #<Thread:0x000000000196c3b8 a.rb:5 sleep_forever>]
#<ThreadGroup:0x000000000196c520>
=end

執行緒組還有一個功能:可使用tg.enclose封閉執行緒組tg,封閉後的執行緒組將不允許內部執行緒移出加入其它組,也不允許外界執行緒加入該組,只允許在該組中建立新執行緒。使用tg.enclosed?測試執行緒組tg是否已封閉。

其實,使用執行緒組可以將多個執行緒分類統一管理,執行緒組本質是一個執行緒陣列加一些額外屬性。比如,可以為執行緒組定義一些額外的針對執行緒組中所有執行緒的功能:wakeup組中的所有執行緒、join所有執行緒、kill所有執行緒。

class ThreadGroup
  def wakeup
    list.each(&:wakeup)
  end
  def join
    list.each { |th| th.join if th != Thread.current }
  end
  def kill
    list.each(&:kill)
  end
end

更多關於Ruby多執行緒知識請檢視下面的相關連結


IT145.com E-mail:sddin#qq.com