<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
前言 :
上一篇文章:如何使用python生成大量資料寫入es資料庫並查詢操作
模擬學生個人資訊寫入es資料庫,包括姓名、性別、年齡、特點、科目、成績,建立時間。
在寫入資料時未提前建立索引mapping,而是每插入一條資料都包含了索引的資訊。
範例程式碼:【多執行緒寫入資料】【一次性寫入10000*1000條資料】 【本人親測耗時3266秒】
from elasticsearch import Elasticsearch from elasticsearch import helpers from datetime import datetime from queue import Queue import random import time import threading es = Elasticsearch(hosts='http://127.0.0.1:9200') # print(es) names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十'] sexs = ['男', '女'] age = [25, 28, 29, 32, 31, 26, 27, 30] character = ['自信但不自負,不以自我為中心', '努力、積極、樂觀、拼搏是我的人生信條', '抗壓能力強,能夠快速適應周圍環境', '敢做敢拼,腳踏實地;做事認真負責,責任心強', '愛好所學專業,樂於學習新知識;對工作有責任心;踏實,熱情,對生活充滿激情', '主動性強,自學能力強,具有團隊合作意識,有一定組織能力', '忠實誠信,講原則,說到做到,決不推卸責任', '有自制力,做事情始終堅持有始有終,從不半途而廢', '肯學習,有問題不逃避,願意虛心向他人學習', '願意以謙虛態度讚揚接納優越者,權威者', '會用100%的熱情和精力投入到工作中;平易近人', '為人誠懇,性格開朗,積極進取,適應力強、勤奮好學、腳踏實地', '有較強的團隊精神,工作積極進取,態度認真'] subjects = ['語文', '數學', '英語', '生物', '地理'] grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86] create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') def save_to_es(num): """ 批次寫入資料到es資料庫 :param num: :return: """ start = time.time() action = [ { "_index": "personal_info_10000000", "_type": "doc", "_id": i, "_source": { "id": i, "name": random.choice(names), "sex": random.choice(sexs), "age": random.choice(age), "character": random.choice(character), "subject": random.choice(subjects), "grade": random.choice(grades), "create_time": create_time } } for i in range(10000 * num, 10000 * num + 10000) ] helpers.bulk(es, action) end = time.time() print(f"{num}耗時{end - start}s!") def run(): global queue while queue.qsize() > 0: num = queue.get() print(num) save_to_es(num) if __name__ == '__main__': start = time.time() queue = Queue() # 序號資料進佇列 for num in range(1000): queue.put(num) # 多執行緒執行程式 consumer_lst = [] for _ in range(10): thread = threading.Thread(target=run) thread.start() consumer_lst.append(thread) for consumer in consumer_lst: consumer.join() end = time.time() print('程式執行完畢!花費時間:', end - start)
執行結果:
自動建立的索引mapping:
GET personal_info_10000000/_mapping { "personal_info_10000000" : { "mappings" : { "properties" : { "age" : { "type" : "long" }, "character" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "create_time" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "grade" : { "type" : "long" }, "id" : { "type" : "long" }, "name" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "sex" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "subject" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } } } } } }
先建立索引personal_info_5000000,確定好mapping後,再插入資料。
新建索引並設定mapping資訊:
PUT personal_info_5000000 { "settings": { "number_of_shards": 3, "number_of_replicas": 1 }, "mappings": { "properties": { "id": { "type": "long" }, "name": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 32 } } }, "sex": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 8 } } }, "age": { "type": "long" }, "character": { "type": "text", "analyzer": "ik_smart", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "subject": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "grade": { "type": "long" }, "create_time": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" } } } }
檢視新建索引資訊:
GET personal_info_5000000 { "personal_info_5000000" : { "aliases" : { }, "mappings" : { "properties" : { "age" : { "type" : "long" }, "character" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } }, "analyzer" : "ik_smart" }, "create_time" : { "type" : "date", "format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" }, "grade" : { "type" : "long" }, "id" : { "type" : "long" }, "name" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 32 } } }, "sex" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 8 } } }, "subject" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } } } }, "settings" : { "index" : { "routing" : { "allocation" : { "include" : { "_tier_preference" : "data_content" } } }, "number_of_shards" : "3", "provided_name" : "personal_info_50000000", "creation_date" : "1663471072176", "number_of_replicas" : "1", "uuid" : "5DfmfUhUTJeGk1k4XnN-lQ", "version" : { "created" : "7170699" } } } } }
開始插入資料:
範例程式碼: 【單執行緒寫入資料】【一次性寫入10000*500條資料】 【本人親測耗時7916秒】
from elasticsearch import Elasticsearch from datetime import datetime from queue import Queue import random import time import threading es = Elasticsearch(hosts='http://127.0.0.1:9200') # print(es) names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十'] sexs = ['男', '女'] age = [25, 28, 29, 32, 31, 26, 27, 30] character = ['自信但不自負,不以自我為中心', '努力、積極、樂觀、拼搏是我的人生信條', '抗壓能力強,能夠快速適應周圍環境', '敢做敢拼,腳踏實地;做事認真負責,責任心強', '愛好所學專業,樂於學習新知識;對工作有責任心;踏實,熱情,對生活充滿激情', '主動性強,自學能力強,具有團隊合作意識,有一定組織能力', '忠實誠信,講原則,說到做到,決不推卸責任', '有自制力,做事情始終堅持有始有終,從不半途而廢', '肯學習,有問題不逃避,願意虛心向他人學習', '願意以謙虛態度讚揚接納優越者,權威者', '會用100%的熱情和精力投入到工作中;平易近人', '為人誠懇,性格開朗,積極進取,適應力強、勤奮好學、腳踏實地', '有較強的團隊精神,工作積極進取,態度認真'] subjects = ['語文', '數學', '英語', '生物', '地理'] grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86] create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 新增程式耗時的功能 def timer(func): def wrapper(*args, **kwargs): start = time.time() res = func(*args, **kwargs) end = time.time() print('id{}共耗時約 {:.2f} 秒'.format(*args, end - start)) return res return wrapper @timer def save_to_es(num): """ 順序寫入資料到es資料庫 :param num: :return: """ body = { "id": num, "name": random.choice(names), "sex": random.choice(sexs), "age": random.choice(age), "character": random.choice(character), "subject": random.choice(subjects), "grade": random.choice(grades), "create_time": create_time } # 此時若索引不存在時會新建 es.index(index="personal_info_5000000", id=num, doc_type="_doc", document=body) def run(): global queue while queue.qsize() > 0: num = queue.get() print(num) save_to_es(num) if __name__ == '__main__': start = time.time() queue = Queue() # 序號資料進佇列 for num in range(5000000): queue.put(num) # 多執行緒執行程式 consumer_lst = [] for _ in range(10): thread = threading.Thread(target=run) thread.start() consumer_lst.append(thread) for consumer in consumer_lst: consumer.join() end = time.time() print('程式執行完畢!花費時間:', end - start)
執行結果:
先建立索引personal_info_5000000_v2,確定好mapping後,再插入資料。
新建索引並設定mapping資訊:
PUT personal_info_5000000_v2 { "settings": { "number_of_shards": 3, "number_of_replicas": 1 }, "mappings": { "properties": { "id": { "type": "long" }, "name": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 32 } } }, "sex": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 8 } } }, "age": { "type": "long" }, "character": { "type": "text", "analyzer": "ik_smart", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "subject": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "grade": { "type": "long" }, "create_time": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" } } } }
檢視新建索引資訊:
GET personal_info_5000000_v2 { "personal_info_5000000_v2" : { "aliases" : { }, "mappings" : { "properties" : { "age" : { "type" : "long" }, "character" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } }, "analyzer" : "ik_smart" }, "create_time" : { "type" : "date", "format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" }, "grade" : { "type" : "long" }, "id" : { "type" : "long" }, "name" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 32 } } }, "sex" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 8 } } }, "subject" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } } } }, "settings" : { "index" : { "routing" : { "allocation" : { "include" : { "_tier_preference" : "data_content" } } }, "number_of_shards" : "3", "provided_name" : "personal_info_5000000_v2", "creation_date" : "1663485323617", "number_of_replicas" : "1", "uuid" : "XBPaDn_gREmAoJmdRyBMAA", "version" : { "created" : "7170699" } } } } }
批次插入資料:
通過elasticsearch模組匯入helper,通過helper.bulk來批次處理大量的資料。首先將所有的資料定義成字典形式,各欄位含義如下:
範例程式碼: 【程式中途異常,寫入4714000條資料】
from elasticsearch import Elasticsearch from elasticsearch import helpers from datetime import datetime from queue import Queue import random import time import threading es = Elasticsearch(hosts='http://127.0.0.1:9200') # print(es) names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十'] sexs = ['男', '女'] age = [25, 28, 29, 32, 31, 26, 27, 30] character = ['自信但不自負,不以自我為中心', '努力、積極、樂觀、拼搏是我的人生信條', '抗壓能力強,能夠快速適應周圍環境', '敢做敢拼,腳踏實地;做事認真負責,責任心強', '愛好所學專業,樂於學習新知識;對工作有責任心;踏實,熱情,對生活充滿激情', '主動性強,自學能力強,具有團隊合作意識,有一定組織能力', '忠實誠信,講原則,說到做到,決不推卸責任', '有自制力,做事情始終堅持有始有終,從不半途而廢', '肯學習,有問題不逃避,願意虛心向他人學習', '願意以謙虛態度讚揚接納優越者,權威者', '會用100%的熱情和精力投入到工作中;平易近人', '為人誠懇,性格開朗,積極進取,適應力強、勤奮好學、腳踏實地', '有較強的團隊精神,工作積極進取,態度認真'] subjects = ['語文', '數學', '英語', '生物', '地理'] grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86] create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 新增程式耗時的功能 def timer(func): def wrapper(*args, **kwargs): start = time.time() res = func(*args, **kwargs) end = time.time() print('id{}共耗時約 {:.2f} 秒'.format(*args, end - start)) return res return wrapper @timer def save_to_es(num): """ 批次寫入資料到es資料庫 :param num: :return: """ action = [ { "_index": "personal_info_5000000_v2", "_type": "_doc", "_id": i, "_source": { "id": i, "name": random.choice(names), "sex": random.choice(sexs), "age": random.choice(age), "character": random.choice(character), "subject": random.choice(subjects), "grade": random.choice(grades), "create_time": create_time } } for i in range(10000 * num, 10000 * num + 10000) ] helpers.bulk(es, action) def run(): global queue while queue.qsize() > 0: num = queue.get() print(num) save_to_es(num) if __name__ == '__main__': start = time.time() queue = Queue() # 序號資料進佇列 for num in range(500): queue.put(num) # 多執行緒執行程式 consumer_lst = [] for _ in range(10): thread = threading.Thread(target=run) thread.start() consumer_lst.append(thread) for consumer in consumer_lst: consumer.join() end = time.time() print('程式執行完畢!花費時間:', end - start)
執行結果:
先建立索引personal_info_5000000_v2,確定好mapping後,再插入資料。
此過程是在上面批次插入的前提下進行優化,採用python生成器。
建立索引和mapping同上,直接上程式碼:
範例程式碼: 【程式中途異常,寫入3688000條資料】
from elasticsearch import Elasticsearch from elasticsearch import helpers from datetime import datetime from queue import Queue import random import time import threading es = Elasticsearch(hosts='http://127.0.0.1:9200') # print(es) names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十'] sexs = ['男', '女'] age = [25, 28, 29, 32, 31, 26, 27, 30] character = ['自信但不自負,不以自我為中心', '努力、積極、樂觀、拼搏是我的人生信條', '抗壓能力強,能夠快速適應周圍環境', '敢做敢拼,腳踏實地;做事認真負責,責任心強', '愛好所學專業,樂於學習新知識;對工作有責任心;踏實,熱情,對生活充滿激情', '主動性強,自學能力強,具有團隊合作意識,有一定組織能力', '忠實誠信,講原則,說到做到,決不推卸責任', '有自制力,做事情始終堅持有始有終,從不半途而廢', '肯學習,有問題不逃避,願意虛心向他人學習', '願意以謙虛態度讚揚接納優越者,權威者', '會用100%的熱情和精力投入到工作中;平易近人', '為人誠懇,性格開朗,積極進取,適應力強、勤奮好學、腳踏實地', '有較強的團隊精神,工作積極進取,態度認真'] subjects = ['語文', '數學', '英語', '生物', '地理'] grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86] create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 新增程式耗時的功能 def timer(func): def wrapper(*args, **kwargs): start = time.time() res = func(*args, **kwargs) end = time.time() print('id{}共耗時約 {:.2f} 秒'.format(*args, end - start)) return res return wrapper @timer def save_to_es(num): """ 使用生成器批次寫入資料到es資料庫 :param num: :return: """ action = ( { "_index": "personal_info_5000000_v3", "_type": "_doc", "_id": i, "_source": { "id": i, "name": random.choice(names), "sex": random.choice(sexs), "age": random.choice(age), "character": random.choice(character), "subject": random.choice(subjects), "grade": random.choice(grades), "create_time": create_time } } for i in range(10000 * num, 10000 * num + 10000) ) helpers.bulk(es, action) def run(): global queue while queue.qsize() > 0: num = queue.get() print(num) save_to_es(num) if __name__ == '__main__': start = time.time() queue = Queue() # 序號資料進佇列 for num in range(500): queue.put(num) # 多執行緒執行程式 consumer_lst = [] for _ in range(10): thread = threading.Thread(target=run) thread.start() consumer_lst.append(thread) for consumer in consumer_lst: consumer.join() end = time.time() print('程式執行完畢!花費時間:', end - start)
執行結果:
到此這篇關於使用python生成大量資料寫入es資料庫並查詢操作(2)的文章就介紹到這了,更多相關python生成 資料 內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45