<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
最近做了很多資料淨化以及摸底的工作,由於處理的資料很大,所以採用了spark進行輔助處理,期間遇到了很多問題,特此記錄一下,供大家學習。
由於比較熟悉python, 所以筆者採用的是pyspark,所以下面給的demo都是基於pyspark,其實其他語言指令碼一樣,重在學習思想,具體實現改改對應的API即可。
這裡儘可能的把一些坑以及實現技巧以demo的形式直白的提供出來,順序不分先後。有了這些demo,大家在實現自己各種各樣需求尤其是一些有難度需求的時候,就可以參考了,當然了有時間筆者後續還會更新一些demo,感興趣的同學可以關注下。
首先說一個最基本思想:能map絕不reduce。
換句話說當在實現某一需求時,要儘可能得用map類的運算元,這是相當快的。但是聚合類的運算元通常來說是相對較慢,如果我們最後不得不用聚合類運算元的時候,我們也要把這一步邏輯看看能不能儘可能的往後放,而把一些諸如過濾什麼的邏輯往前放,這樣最後的資料量就會越來越少,再進行聚合的時候就會快很多。如果反過來,那就得不償失了,雖然最後實現的效果是一樣的,但是時間差卻是數量級的。
這裡列一下我們最常用的運算元
rdd = rdd.filter(lambda x: fun(x)) rdd = rdd.map(lambda x: fun(x)) rdd = rdd.flatMap(lambda x: fun(x)) rdd = rdd.reduceByKey(lambda a, b: a + b)
filter: 過濾,滿足條件的返回True, 需要過濾的返回False。
map: 每條樣本做一些共同的操作。
flatMap: 一條拆分成多條返回,具體的是list。
reduceByKey: 根據key進行聚合。
一個最常見的場景就是需要對某一個欄位進行聚合:假設現在我們有一份流水錶,其每一行資料就是一個使用者的一次點選行為,那現在我們想統計一下每個使用者一共點選了多少次,更甚至我們想拿到每個使用者點選過的所有item集合。虛擬碼如下:
def get_key_value(x): user = x[0] item = x[1] return (user, [item]) rdd = rdd.map(lambda x: get_key_value(x)) rdd = rdd.reduceByKey(lambda a, b: a + b)
首先我們先通過get_key_value函數將每條資料轉化成(key, value)的形式,然後通過reduceByKey聚合運算元進行聚合,它就會把相同key的資料聚合在一起,說到這裡,大家可能不覺得有什麼?這算什麼trick!其實筆者這裡想展示的是get_key_value函數返回形式:[item] 。
為了對比,這裡筆者再列一下兩者的區別:
def get_key_value(x): user = x[0] item = x[1] return (user, [item]) def get_key_value(x): user = x[0] item = x[1] return (user, item)
可以看到第一個的value是一個列表,而第二個就是單純的item,我們看reduceByKey這裡我們用的具體聚合形式是相加,列表相加就是得到一個更大的列表即:
所以最後我們就拿到了:每個使用者點選過的所有item集合,具體的是一個列表。
在日常中我們需要抽樣出一部分資料進行資料分析或者實驗,甚至我們需要將資料等分成多少份,一份一份用(後面會說),這個時候怎麼辦呢?
當然了spark也有類似sample這樣的抽樣運算元
那其實我們也可以實現,而且可以靈活控制等分等等且速度非常快,如下:
def get_prefix(x, num): prefix = random.randint(1, num) return [x, num] def get_sample(x): prefix = x[1] if prefix == 1: return True else: return False rdd = rdd.map(lambda x: get_prefix(x, num)) rdd = rdd.filter(lambda x: get_sample(x))
假設我們需要抽取1/10的資料出來,總的思路就是先給每個樣本打上一個[1,10]的亂數,然後只過濾出打上1的資料即可。
以此類推,我們還可以得到3/10的資料出來,那就是在過濾的時候,取出打上[1,2,3]的即可,當然了[4,5,6]也行,只要取三個就行。
有的時候需要在兩個集合之間做笛卡爾積,假設這兩個集合是A和B即兩個rdd。
首先spark已經提供了對應的API即cartesian,具體如下:
rdd_cartesian = rdd_A.cartesian(rdd_B)
其更具體的用法和返回形式大家可以找找相關部落格,很多,筆者這裡不再累述。
但是其速度非常慢
尤其當rdd_A和rdd_B比較大的時候,這個時候怎麼辦呢?
這個時候我們可以藉助廣播機制,其實已經有人也用了這個trick:
https://www.jb51.net/article/203197.htm
首先說一下spark中的廣播機制,假設一個變數被申請為了廣播機制,那麼其實是快取了一個唯讀的變數在每臺機器上,假設當前rdd_A比較小,rdd_B比較大,那麼我可以把rdd_A轉化為廣播變數,然後用這個廣播變數和每個rdd_B中的每個元素都去做一個操作,進而實現笛卡爾積的效果,好了,筆者給一下pyspark的實現:
def ops(A, B): pass def fun(A_list, B): result = [] for cur_A in A_list: result.append(cur_A + B) return result rdd_A = sc.broadcast(rdd_A.collect()) rdd_cartesian = rdd_B.flatMap(lambda x: fun(rdd_A.value, x))
可以看到我們先把rdd_A轉化為廣播變數,然後通過flatMap,將rdd_A和所有rdd_B中的單個元素進行操作,具體是什麼操作大家可以在ops函數中自己定義自己的邏輯。
關於spark的廣播機制更多講解,大家也可以找找檔案,很多的,比如:
https://www.cnblogs.com/Lee-yl/p/9777857.html
但目前為止,其實還沒有真真結束,從上面我們可以看到,rdd_A被轉化為了廣播變數,但是其有一個重要的前提:那就是rdd_A比較小。但是當rdd_A比較大的時候,我們在轉化的過程中,就會報記憶體錯誤,當然了可以通過增加設定:
spark.driver.maxResultSize=10g
但是如果rdd_A還是極其大呢?換句話說rdd_A和rdd_B都是非常大的,哪一個做廣播變數都是不合適的,怎麼辦呢?
其實我們一部分一部分的做。假設我們把rdd_A拆分成10份,這樣的話,每一份的量級就降下來了,然後把每一份轉化為廣播變數且都去和rdd_B做笛卡爾積,最後再彙總一下就可以啦。
有了想法,那麼怎麼實現呢?
分批大家都會了,如上。但是這裡面會有另外一個問題,那就是這個廣播變數名會被重複利用,在進行下一批廣播變數的時候,需要先銷燬,再建立,demo如下:
def ops(A, B): pass def fun(A_list, B): result = [] for cur_A in A_list: result.append(cur_A + B) return result def get_rdd_cartesian(rdd_A, rdd_B): rdd_cartesian = rdd_B.flatMap(lambda x: fun(rdd_A.value, x)) return rdd_cartesian for i in range(len(rdd_A_batch)) qb_rdd_temp = rdd_A_batch[i] qb_rdd_temp = sc.broadcast(qb_rdd_temp.collect()) rdd_cartesian_batch = get_rdd_cartesian(qb_rdd_temp, rdd_B) dw.saveToTable(rdd_cartesian_batch, tdw_table, "p_" + ds, overwrite=False) qb_rdd_temp.unpersist()
可以看到,最主要的就是unpersist()
說到廣播機制,這裡就再介紹一個稍微複雜的demo,乘熱打鐵。
做演演算法的同學,可能經常會遇到向量索引這一場景:即每一個item被表徵成一個embedding,然後兩個item的相似度便可以基於embedding的餘弦相似度進行量化。向量索引是指假設來了一個query,候選池子裡面假設有幾百萬的doc,最終目的就是要從候選池子中挑選出與query最相似的n個topk個doc。
關於做大規模數量級的索引已經有很多現成好的API可以用,最常見的包比如有faiss。如果還不熟悉faiss的同學,可以先簡單搜一下其基本用法,看看demo,很簡單。
好啦,假設現在query的量級是10w,doc的量級是100w,面對這麼大的量級,我們當然是想通過spark來並行處理,加快計算流程。那麼該怎麼做呢?
這時我們便可以使用spark的廣播機制進行處理啦,而且很顯然doc應該是廣播變數,因為每一個query都要和全部的doc做計算。
廢話不多說,直接看實現
首先建立doc索引:
# 獲取index embedding,並collect,方便後續建立索引 index_embedding_list = index_embedding_rdd.collect() all_ids = np.array([row[1] for row in index_embedding_list], np.str) all_vectors = np.array([str_to_vec(row[2]) for row in index_embedding_list], np.float32) del(index_embedding_list) #faiss.normalize_L2(all_vectors) print(all_ids[:2]) print(all_vectors[:2]) print("all id size: {}, all vec shape: {}".format(len(all_ids), all_vectors.shape)) # 建立index索引,並轉化為廣播變數 faiss_index = FaissIndex(all_ids, all_vectors, self.args.fast_mode, self.args.nlist, self.args.nprobe) del(all_vectors) del(all_ids) print("broadcast start") bc_faiss_index = self.sc.broadcast(faiss_index) print("broadcast done")
這裡的index_embedding_rdd就是doc的embedding,可以看到先要collect,然後建立索引。
建立完索引後,就可以開始計算了,但是這裡會有一個問題就是query的量級也是比較大的,如果一起計算可能會OM,所以我們分批次進行即batch:
# 開始檢索 # https://blog.csdn.net/wx1528159409/article/details/125879542 query_embedding_rdd = query_embedding_rdd.repartition(300) top_n = 5 batch_size = 1000 query_sim_rdd = query_embedding_rdd.mapPartitions( lambda iters: batch_get_nearest_ids( iters, bc_faiss_index, top_n, batch_size ) )
假設query_embedding_rdd是全部query的embedding,為了實現batch,我們先將query_embedding_rdd進行分割區repartition,然後每個batch進行,可以看到核心就是batch_get_nearest_ids這個函數:
def batch_get_nearest_ids(iters, bc_faiss_index, top_n, batch_size): import mkl mkl.get_max_threads() res = list() rows = list() for it in iters: rows.append(it) if len(rows) >= batch_size: batch_res = __batch_get_nearest_ids(rows, bc_faiss_index, top_n) res.extend(batch_res) rows = list() if rows: batch_res = __batch_get_nearest_ids(rows, bc_faiss_index, top_n) res.extend(batch_res) return res
從這裡可以清楚的看到就是組batch,組夠一個batch後就可以給當前這個batch內的query進行計算最相似的候選啦即__batch_get_nearest_ids這個核心函數:
def __batch_get_nearest_ids(rows, bc_faiss_index, top_n): import mkl mkl.get_max_threads() import faiss embs = [str_to_vec(row[3]) for row in rows] vec = np.array(embs, np.float32) #faiss.normalize_L2(vec) similarities, dst_ids = bc_faiss_index.value.batch_search(vec, top_n) batch_res = list() for i in range(len(rows)): batch_res.append([str("\t".join([rows[i][1], rows[i][2]])), "$$$".join(["\t".join(dst.split("\t")+[str(round(sim, 2))]) for dst, sim in zip(dst_ids[i], similarities[i])])]) return batch_res
這裡就是真真的呼叫faiss的索引API進行召回啦,當然了batch_res這個就是結果,自己可以想怎麼定義都行,筆者這裡不僅返回了召回的item,還返回了query自身的一些資訊。
在map的時候,不論是self的類成員還是類方法都要放到外面,不要放到類裡面,不然會報錯
總之,在用spark做任何需求之前,一定要牢記能map就map,儘量不要聚合運算元,實在不行就儘可能放到最後。
以上就是Spark處理trick總結分析的詳細內容,更多關於Spark處理trick的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45