首頁 > 軟體

Ruby3多執行緒並行Ractor使用方法詳解

2022-04-18 16:00:30

Ruby 3 Ractor官方手冊:https://github.com/ruby/ruby/blob/master/doc/ractor.md

在Ruby3之前,使用Thread來建立新的執行緒,但這種方式建立的多執行緒是並行而非並行的,MRI有一個全域性直譯器鎖GIL來控制同一時刻只能有一個執行緒在執行:

# main Thread

t1 = Thread.new do 
  # new Thread
  sleep 3
end
t1.join

Ruby3通過Ractor(Ruby Actor,Actor模型通過訊息傳遞的方式來修改狀態)支援真正的多執行緒並行,多個Ractor之間可並行獨立執行。

# main Ractor

# 建立一個可與main Ractor並行執行的Ractor
r = Ractor.new do
  sleep 2
  Ractor.yield "hello"
end

puts r.take

需注意,每個Ractor中至少有一個原生Ruby執行緒,但每個Ractor內部都擁有獨立的GIL,使得Ractor內部在同一時刻最多隻能有一個執行緒在執行。從這個角度來看,Ractor實際上是直譯器執行緒,每個直譯器執行緒擁有一個全域性直譯器鎖。

如果main Ractor退出,則其他Ractor也會收到退出訊號,就像main Thread退出時,其他Thread也會退出一樣。

建立Ractor

使用Ractor.new建立一個Ractor範例,建立範例時需指定一個語句塊,該語句塊中的程式碼會在該Ractor中執行。

r = Ractor.new do
  puts "new Ractor"
end

可在new方法的引數上為該Ractor範例指定名稱:

r = Ractor.new(name: "ractor1") do
  puts "new Ractor"
end

puts r.name  # ractor 1

new方法也可指定其他引數,這些引數必須在name引數之前,且這些引數將直接原樣傳遞給語句塊引數:

arr = [11, 22, 33]
r = Ractor.new(arr, name: "r1") do |arr|
  puts "arr"
end
sleep 1

關於new的引數,稍後還會有解釋。

可使用Ractor.current獲取當前的Ractor範例,使用Ractor.count獲取當前存活的Ractor範例數量。

Ractor之間傳遞訊息

Ractor傳遞訊息的方式分兩種:

  • Push方式:向某個特定的Ractor範例推播訊息,可使用r.send(Msg)或別名r << Msg向該Ractor範例傳送訊息,並在該Ractor範例內部使用Ractor.receive或別名Ractor.recv或它們的同名私有方法來接收推播進來的訊息
    • Ractor還提供了Ractor.receive_if {expr}方法,表示只在expr為true時才接收訊息,receive等價於receive_if {true}
  • Pull方式:從某個特定的Ractor範例拉取訊息,可在該Ractor範例內部使用Ractor.yield向外傳送訊息,並在需要的地方使用r.take獲取傳輸出來的訊息
    • Ractor.new的語句塊返回值,相當於Ractor.yield,它也可被r.take接收

因此,對於Push方式,要求知道訊息傳遞的目標Ractor,對於Pull方式,要求知道訊息的來源Ractor。

# yield + take
r = Ractor.new {Ractor.yield "hello"}
puts r.take

# send + receive
r1 = Ractor.new do
  # Ractor.receive或Ractor.recv
  # 或同名私有方法:receive、recv
  puts Ractor.receive
end
r1.send("hello")
r1.take    # 本次take取得r1語句塊的返回值,即puts的返回值nil

使用new方法建立Ractor範例時,可指定new的引數,這些引數會被原樣傳遞給Ractor的語句塊引數。

arr = [11, 22, 33]
r = Ractor.new(arr) { |arr| ...}

實際上,new的引數等價於在Ractor語句塊的開頭使用了Ractor.receive接收訊息:

r = Ractor.new 'ok' { |msg| msg }
r.take #=> 'ok'

# 基本等價於
r = Ractor.new do
  msg = Ractor.receive
  msg
end
r.send 'ok'
r.take #=> 'ok'

訊息埠

Ractor之間傳遞訊息時,實際上是通過Ractor的訊息埠進行傳遞的。

每個Ractor都有自己的incoming port和outgoing port:

  • incoming port:是該Ractor接收訊息的埠,r.sendRactor.receive使用該埠
    • 每個incoming port都連線到一個大小不限的佇列上
    • r.send傳入的訊息都會寫入該佇列,由於該佇列大小不限,因此r.send從不阻塞
    • Ractor.receive從該佇列彈出訊息,當佇列為空時,Ractor.receive被阻塞直到新訊息出現
    • 可使用r.close_incoming關閉incoming port,關閉該埠後,r.send將直接報錯,Ractor.receive將先從佇列中取資料,當佇列為空後,再呼叫Ractor.receive將報錯
  • outgoing port:是該Ractor向外傳出訊息的埠,Ractor.yieldr.take使用該埠
    • Ractor.yield或Ractor語句塊返回時,訊息從outgoing port流出
    • 當沒有r.take接收訊息時,r內部的Ractor.yield將被阻塞
    • 當r內部沒有Ractor.yield時,r.take將被阻塞
    • Ractor.yield從outgoing port傳出的訊息可被任意多個r.take等待,但只有一個r.take可獲取到該訊息
    • 可使用r.close_outgoing關閉outgoing port,關閉該埠後,再呼叫r.takeRactor.yield將直接報錯。如果r.take正被阻塞(等待Ractor.yield傳出訊息),關閉outgoing port操作將取消所有等待中的take並報錯

Ractor.select等待訊息

可使用Ractor.select(r1,r2,r3...)等待一個或多個Ractor範例outgoing port上的訊息(因此,select主要用於等待Ractor.yield的訊息),等待到第一個訊息後立即返回。

Ractor.select的返回值格式為[r, obj],其中:

  • r表示等待到的那個Ractor範例
  • obj表示接收到的訊息物件

例如:

r1 = Ractor.new{'r1'}
r2 = Ractor.new{'r2'}
rs = [r1, r2]
as = []

# Wait for r1 or r2's Ractor.yield
r, obj = Ractor.select(*rs)
rs.delete(r)
as << obj

# Second try (rs only contain not-closed ractors)
r, obj = Ractor.select(*rs)
rs.delete(r)
as << obj
as.sort == ['r1', 'r2'] #=> true

通常來說,會使用Ractor.select來輪詢等待多個Ractor範例的訊息,通用化的處理流程參考如下:

# 充當管道功能的Ractor:接收訊息並行送出去,並不斷迴圈
pipe = Ractor.new do
  loop do
    Ractor.yield Ractor.receive
  end
end

RN = 10
# rs變數儲存了10個Ractor範例
# 每個Ractor範例都從管道pipe中取一次訊息然後由本Ractor傳送出去
rs = RN.times.map{|i|
  Ractor.new pipe, i do |pipe, i|
    msg = pipe.take
    msg # ping-pong
  end
}
# 向管道中傳送10個資料
RN.times{|i| pipe << i}

# 輪詢等待10個Ractor範例的outgoing port
# 每等待成功一次,從rs中刪除所等待到的Ractor範例,
# 然後繼續等待剩下的Ractor範例
RN.times.map{
  r, n = Ractor.select(*rs)
  rs.delete r
  n
}.sort #=> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

此外,Ractor.select除了可等待訊息外,也可以用來yield傳遞訊息,更多用法參考官方手冊:Ractor.select

Ractor並行時如何避免競態

多個Ractor之間是可並行執行的,為了避免Ractor之間傳遞資料時出現競態問題,Ractor採取了一些措施:

  • 對於不可變物件,它們可直接在Ractor之間共用,此時傳遞它們的參照
  • 對於可變物件,它們不可直接在Ractor之間共用,此時傳遞資料時,預設先按位元組逐位元組拷貝,然後後傳遞副本
  • 也可以顯式指定行動資料,將某份資料從Ractor1移動到另一個Ractor2中,即轉移資料的所有權(參考Rust的所有權規則),轉移所有權後,原始所有者Ractor中將無法再存取該資料

傳遞可共用物件:傳遞參照

可共用的物件:自動傳遞它們的參照,效率高

  • 不可變物件可在Ractor之間直接共用(如Integer、symbol、true/false、nil),如:
    • i=123:i是可共用的
    • s="str".freeze:s是可共用的
    • h={c: Object}.freeze:h是可共用的,因為Object是一個類物件,類物件是可共用的
    • a=[1,[2],3].freeze:a不可共用,因為凍結後仍然包含可變的[2]
  • Class/Module物件,即類物件自身和模組物件自身是可共用的
  • Ractor物件自身是可共用的

例如:

i = 33
r = Ractor.new do
  m = recv
  puts m.object_id
end

r.send(i)  # 傳遞i
r.take     # 等待Ractor執行結束(語句塊返回)
puts i.object_id  # i傳遞後仍然可用
=begin
67
67
=end

值得注意的是,Ractor物件是可共用的,因此可將某個Ractor範例傳遞給另一個Ractor範例。例如:

pipe = Ractor.new do
  loop do
    Ractor.yield Ractor.receive
  end
end

RN = 10
rs = RN.times.map{|i|
  # pipe是一個Ractor範例,這裡作為引數傳遞給其他的Ractor範例
  Ractor.new pipe, i do |pipe, i|
    pipe << i
  end
}

RN.times.map{
  pipe.take
}.sort #=> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

傳遞不可共用物件:傳遞副本

絕大多數物件不是可直接共用的。在Ractor之間傳遞不可共用的物件時,預設會傳遞deep-copy後的副本,即按位元組拷貝的方式拷貝該物件的每一個位元組。這種方式效率較低。

例如:

arr = [11, 22, 33]  # 陣列是可變的,不可共用
r = Ractor.new do
  m = recv
  puts "copied: #{m.object_id}"
end

r.send(arr)  # 傳遞陣列,此時將逐位元組拷貝陣列
r.take
puts "origin: #{arr.object_id}"

=begin
copied: 60
origin: 80
=end

從結果看,兩個Ractor內的arr不是同一個物件。

需注意,對於全域性唯一的物件來說(比如數值、nil、false、true、symbol),逐位元組拷貝時並不會拷貝它們。例如:

arr = %i[lang action sub]
r = Ractor.new do
  m = recv
  puts "copied: #{m.object_id}, #{m[0].object_id}, #{m[1].object_id}"
end

r.send(arr)
r.take
puts "origin: #{arr.object_id}, #{arr[0].object_id}, #{arr[1].object_id}"

=begin
copied: 60, 80, 1046748
origin: 100, 80, 1046748
=end

注意,Thread物件無法拷貝,因此無法在Ractor之間傳遞。

轉移資料所有權

還可以讓r.send(msg, move: true)Ractor.yield(msg, move: true)傳遞資料時,明確表示要移動而非拷貝資料,即轉移資料的所有權(從原來的所有者Ractor範例轉移到目標Ractor範例)。

無論是可共用還是不可共用的物件,都可以轉移所有權,只不過轉移可共用物件的所有權沒有意義,因為轉移之後,原所有者仍然擁有所有權。

因此,通常只對不可共用的資料來轉移所有權,轉移所有權後,原所有者將無法存取該資料。

str = "hello"
puts str.object_id
r = Ractor.new do
  m = recv
  puts m.object_id
end

r.send(str, move: true)  # 轉移str的所有權
r.take
#puts str.object_id  # 轉移所有權後再存取str,將報錯

=begin
60
80
=end

值得注意的是,移動的本質是記憶體拷貝,它底層也一樣是逐位元組拷貝原始資料的過程,所以移動傳遞資料的效率和傳遞副本資料的效率是類似的。移動傳遞和傳遞副本的區別之處在於所有權,移動傳遞後,原所有者Ractor範例將無法存取該資料,而拷貝傳遞方式則允許原所有者存取

注意,Thread物件無法轉移所有權,因此無法在Ractor之間傳遞。

不可共用變成可共用:Ractor.make_shareable

對於不可共用的資料obj,可通過Ractor.make_shareable(obj)方法將其轉變為可共用的資料,預設轉變的方式是逐層次地遞迴凍結obj。也可指定額外的引數Ractor.make_shareable(obj, copy: true),此時將深拷貝obj得其副本,再讓副本(逐層遞迴凍結)轉變為可共用資料。

例如:

arr = %w[lang action sub]
puts arr.object_id
r = Ractor.new do
  m = recv
  puts m.object_id
end

r.send(Ractor.make_shareable(arr))
r.take
puts arr.object_id
puts arr.frozen?

輸出:

60
60
60
true

範例

工作者執行緒池:

require 'prime'

pipe = Ractor.new do
  loop do
    Ractor.yield Ractor.receive
  end
end

N = 1000
RN = 10
workers = (1..RN).map do
  Ractor.new pipe do |pipe|
    while n = pipe.take
      Ractor.yield [n, n.prime?]
    end
  end
end

(1..N).each{|i|
  pipe << i
}

pp (1..N).map{
  _r, (n, b) = Ractor.select(*workers)
  [n, b]
}.sort_by{|(n, b)| n}

Pipeline:

# pipeline with yield/take
r1 = Ractor.new do
  'r1'
end

r2 = Ractor.new r1 do |r1|
  r1.take + 'r2'
end

r3 = Ractor.new r2 do |r2|
  r2.take + 'r3'
end

p r3.take #=> 'r1r2r3'

更多關於Ruby3多執行緒並行Ractor使用方法請檢視下面的相關連結


IT145.com E-mail:sddin#qq.com