<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
POST post-v1_1-2021.02,post-v1_1-2021.03,post-v1_1-2021.04/_update_by_query { "query": { "bool": { "must": [ { "term": { "join_field": { "value": "post" } } }, { "term": { "platform": { "value": "toutiao" } } }, { "exists": { "field": "liked_count" } } ] } }, "script":{ "source":"ctx._source.liked_count=0", "lang":"painless" } }
PUT user_tiktok/_doc/_mapping?include_type_name=true { "post_signature":{ "StuClass":{ "type":"keyword" }, "post_token":{ "type":"keyword" } } } PUT user_toutiao/_mapping { "properties": { "user_token": { "type": "text" } } }
from celery import Celery from elasticsearch import Elasticsearch import logging import arrow import pytz from elasticsearch.helpers import scan, streaming_bulk import redis pool_16_8 = redis.ConnectionPool(host='10.0.3.100', port=6379, db=8, password='EfcHGSzKqg6cfzWq') rds_16_8 = redis.StrictRedis(connection_pool=pool_16_8) logger = logging.getLogger('elasticsearch') logger.disabled = False logger.setLevel(logging.INFO) es_zoo_connection = Elasticsearch('http://eswriter:e s密碼@e sip:4000', dead_timeout=10, retry_on_timeout=True) logger = logging.getLogger(__name__) class ES(object): index = None doc_type = None id_field = '_id' version = '' source_id_field = '' aliase_field = '' separator = '-' aliase_func = None es = None tz = pytz.timezone('Asia/Shanghai') logger = logger @classmethod def mget(cls, ids=None, index=None, **kwargs): index = index or cls.index docs = cls.es.mget(body={'ids': ids}, doc_type=cls.doc_type, index=index, **kwargs) return docs @classmethod def count(cls, query=None, index=None, **kwargs): index = index or cls.index c = cls.es.count(doc_type=cls.doc_type, body=query, index=index, **kwargs) return c.get('count', 0) @classmethod def upsert(cls, doc, doc_id=None, index=None, doc_as_upsert=True, **kwargs): body = { "doc": doc, } if doc_as_upsert: body['doc_as_upsert'] = True id = doc_id or cls.id_name(doc) index = index or cls.index_name(doc) cls.es.update(index, id, cls.doc_type, body, **kwargs) @classmethod def search(cls, index=None, query=None, **kwargs): index = index or cls.index return cls.es.search(index=index, body=query, **kwargs) @classmethod def scan(cls, query, index=None, **kwargs): return scan(cls.es, query=query, index=index or cls.index, **kwargs) @classmethod def index_name(cls, doc): if cls.aliase_field and cls.aliase_field in doc.keys(): aliase_part = doc[cls.aliase_field] if isinstance(aliase_part, str): aliase_part = arrow.get(aliase_part) if isinstance(aliase_part, int): aliase_part = arrow.get(aliase_part).astimezone(cls.tz) if cls.version: index = '{}{}{}{}{}'.format(cls.index, cls.separator, cls.version, cls.separator, cls.aliase_func(aliase_part)) else: index = '{}{}{}'.format(cls.index, cls.separator, cls.aliase_func(aliase_part)) else: index = cls.index return index @classmethod def id_name(cls, doc): id = doc.get(cls.id_field) and doc.pop(cls.id_field) or doc.get(cls.source_id_field) if not id: print('========', doc) assert id, 'doc _id must not be None' return id @classmethod def bulk_upsert(cls, docs, **kwargs): """ 批次操作文章, 僅支援 index 和 update """ op_type = kwargs.get('op_type') or 'update' chunk_size = kwargs.get('chunk_size') if op_type == 'update': upsert = kwargs.get('upsert', True) if upsert is None: upsert = True else: upsert = False actions = cls._gen_bulk_actions(docs, cls.index_name, cls.doc_type, cls.id_name, op_type, upsert=upsert) result = streaming_bulk(cls.es, actions, chunk_size=chunk_size, raise_on_error=False, raise_on_exception=False, max_retries=5, request_timeout=25) return result @classmethod def _gen_bulk_actions(cls, docs, index_name, doc_type, id_name, op_type, upsert=True, **kwargs): assert not upsert or (upsert and op_type == 'update'), 'upsert should use "update" as op_type' for doc in docs: # 支援 index_name 作為一個工廠函數 if callable(index_name): index = index_name(doc) else: index = index_name if op_type == 'index': _source = doc elif op_type == 'update' and not upsert: _source = {'doc': doc} elif op_type == 'update' and upsert: _source = {'doc': doc, 'doc_as_upsert': True} else: continue if callable(id_name): id = id_name(doc) else: id = id_name # 生成 Bulk 動作 action = { "_op_type": op_type, "_index": index, "_type": doc_type, "_id": id, "_source": _source } yield action class tiktokEsUser(ES): index = 'user_tiktok' doc_type = '_doc' id_field = '_id' source_id_field = 'user_id' es = es_zoo_connection from kombu import Exchange, Queue, binding def data_es_route_task_spider(name, args, kwargs, options, task=None, **kw): return { 'exchange': 'tiktok', 'exchange_type': 'topic', 'routing_key': name } class DataEsConfig_download(object): broker_url = 'amqp://使用者:密碼@ip:埠/' task_ignore_result = True task_serializer = 'json' accept_content = ['json'] task_default_queue = 'default' task_default_exchange = 'default' task_default_routing_key = 'default' exchange = Exchange('tiktok', type='topic') task_queues = [ Queue( 'tiktok.user_avatar.download', [binding(exchange, routing_key='tiktok.user_avatar.download')], queue_arguments={'x-queue-mode': 'lazy'} ), Queue( 'tiktok.post_avatar.download', [binding(exchange, routing_key='tiktok.post_avatar.download')], queue_arguments={'x-queue-mode': 'lazy'} ), Queue( 'tiktok.post.spider', [binding(exchange, routing_key='tiktok.post.spider')], queue_arguments={'x-queue-mode': 'lazy'} ), Queue( 'tiktok.post.save', [binding(exchange, routing_key='tiktok.post.save')], queue_arguments={'x-queue-mode': 'lazy'} ), Queue( 'tiktok.user.save', [binding(exchange, routing_key='tiktok.user.save')], queue_arguments={'x-queue-mode': 'lazy'} ), Queue( 'tiktok.post_avatar.invalid', [binding(exchange, routing_key='tiktok.post_avatar.invalid')], queue_arguments={'x-queue-mode': 'lazy'} ), Queue( 'tiktok.user_avatar.invalid', [binding(exchange, routing_key='tiktok.user_avatar.invalid')], queue_arguments={'x-queue-mode': 'lazy'} ), Queue( 'tiktok.comment.save', [binding(exchange, routing_key='tiktok.comment.save')], queue_arguments={'x-queue-mode': 'lazy'} ), ] task_routes = (data_es_route_task_spider,) enable_utc = True timezone = "Asia/Shanghai" # 下載app tiktok_app = Celery( 'tiktok', include=[ 'task.tasks', ] ) tiktok_app.config_from_object(DataEsConfig_download) # 發任務生產者,更新輿情user歷史資訊 def send_post(): query = { "query": { "bool": { "must": [ { "exists": { "field": "post_signature" } }, { "range": { "following_num": { "gte": 1000 } } } ] } }, "_source": ["region", "sec_uid", "post_signature"] } # query = { # "query": { # "bool": { # "must": [ # {"exists": { # "field": "post_signature" # }}, # { # "match": { # "region": "MY" # } # } # ] # } # }, # "_source": ["region", "sec_uid", "post_signature"] # } r = tiktokEsUser.scan(query=query, scroll='30m', request_timeout=100) for item in map(lambda x: x['_source'], r): tiktok_app.send_task('tiktok.post.spider', args=(item,)) def send_sign_token(): query = { "query": { "bool": { "must": [ { "exists": { "field": "post_signature" } }, { "range": { "following_num": { "gte": 1000 } } }, { "range": { "create_time": { "gte": "2021-01-06T00:00:00", "lte": "2021-01-06T01:00:00" } } } ] } }, "_source": ["user_id", "sec_uid"] } r = tiktokEsUser.scan(query=query, scroll='30m', request_timeout=100) for item in map(lambda x: x['_source'], r): tiktok_app.send_task('tiktok.user.sign_token', args=(item,)) if __name__ == '__main__': send_post() # send_sign_token()
以上就是go語言實現Elasticsearches批次修改查詢及傳送MQ操作範例的詳細內容,更多關於go實現Elasticsearches修改查詢傳送MQ的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45