<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
鋼鐵知識庫,一個學習python爬蟲、資料分析的知識庫。人生苦短,快用python。
之前我們使用requests庫爬取某個站點的時候,每發出一個請求,程式必須等待網站返回響應才能接著執行,而在整個爬蟲過程中,整個爬蟲程式是一直在等待的,實際上沒有做任何事情。
像這種佔用磁碟/記憶體IO、網路IO的任務,大部分時間是CPU在等待的操作,就叫IO密集型任務。對於這種情況有沒有優化方案呢,當然有,那就是使用aiohttp庫實現非同步爬蟲。
我們在使用requests請求時,只能等一個請求先出去再回來,才會傳送下一個請求。明顯效率不高阿,這時候如果換成非同步請求的方式,就不會有這個等待。一個請求發出去,不管這個請求什麼時間響應,程式通過await掛起協程物件後直接進行下一個請求。
解決方法就是通過 aiohttp + asyncio,什麼是aiohttp?一個基於 asyncio 的非同步 HTTP 網路模組,可用於實現非同步爬蟲,速度明顯快於 requests 的同步爬蟲。
區別就是一個同步一個是非同步。話不多說直接上程式碼看效果。
pip install aiohttp
#!/usr/bin/env python # -*- coding: utf-8 -*- # author: 鋼鐵知識庫 import time import requests # 同步請求 def main(): start = time.time() for i in range(5): res = requests.get('http://httpbin.org/delay/2') print(f'當前時間:{datetime.datetime.now()}, status_code = {res.status_code}') print(f'requests同步耗時:{time.time() - start}') if __name__ == '__main__': main() ''' 當前時間:2022-09-05 15:44:51.991685, status_code = 200 當前時間:2022-09-05 15:44:54.528918, status_code = 200 當前時間:2022-09-05 15:44:57.057373, status_code = 200 當前時間:2022-09-05 15:44:59.643119, status_code = 200 當前時間:2022-09-05 15:45:02.167362, status_code = 200 requests同步耗時:12.785893440246582 '''
可以看到5次請求總共用12.7秒,再來看同樣的請求非同步多少時間。
#!/usr/bin/env python # file: day6-9同步和非同步.py # author: 鋼鐵知識庫 import asyncio import time import aiohttp async def async_http(): # 宣告一個支援非同步的上下文管理器 async with aiohttp.ClientSession() as session: res = await session.get('http://httpbin.org/delay/2') print(f'當前時間:{datetime.datetime.now()}, status_code = {res.status}') tasks = [async_http() for _ in range(5)] start = time.time() # Python 3.7 及以後,不需要顯式宣告事件迴圈,可以使用 asyncio.run()來代替最後的啟動操作 asyncio.run(asyncio.wait(tasks)) print(f'aiohttp非同步耗時:{time.time() - start}') ''' 當前時間:2022-09-05 15:42:32.363966, status_code = 200 當前時間:2022-09-05 15:42:32.366957, status_code = 200 當前時間:2022-09-05 15:42:32.374973, status_code = 200 當前時間:2022-09-05 15:42:32.384909, status_code = 200 當前時間:2022-09-05 15:42:32.390318, status_code = 200 aiohttp非同步耗時:2.5826876163482666 '''
兩次對比可以看到執行過程,時間一個是順序執行,一個是同時執行。這就是同步和非同步的區別。
接下來我們會詳細介紹aiohttp庫的用法和爬取實戰。aiohttp 是一個支援非同步請求的庫,它和 asyncio 配合使用,可以使我們非常方便地實現非同步請求操作。asyncio模組,其內部實現了對TCP、UDP、SSL協定的非同步操作,但是對於HTTP請求,就需要aiohttp實現了。
aiohttp分為兩部分,一部分是Client,一部分是Server。下面來說說aiohttp使用者端部分的用法。
先寫一個簡單的案例
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : 鋼鐵知識庫 import asyncio import aiohttp async def get_api(session, url): # 宣告一個支援非同步的上下文管理器 async with session.get(url) as response: return await response.text(), response.status async def main(): async with aiohttp.ClientSession() as session: html, status = await get_api(session, 'http://httpbin.org/delay/2') print(f'html: {html[:50]}') print(f'status : {status}') if __name__ == '__main__': # Python 3.7 及以後,不需要顯式宣告事件迴圈,可以使用 asyncio.run(main())來代替最後的啟動操作 asyncio.get_event_loop().run_until_complete(main()) ''' html: { "args": {}, "data": "", "files": {}, status : 200 Process finished with exit code 0 '''
aiohttp請求的方法和之前有明顯區別,主要包括如下幾點:
注意:Python3.7及以後的版本中,可以使用asyncio.run(main())代替最後的啟動操作。
對於URL引數的設定,我們可以藉助params設定,傳入一個字典即可,範例如下:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : 鋼鐵知識庫 import aiohttp import asyncio async def main(): params = {'name': '鋼鐵知識庫', 'age': 23} async with aiohttp.ClientSession() as session: async with session.get('https://www.httpbin.org/get', params=params) as res: print(await res.json()) if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main()) ''' {'args': {'age': '23', 'name': '鋼鐵知識庫'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.8 aiohttp/3.8.1', 'X-Amzn-Trace-Id': 'Root=1-63162e34-1acf7bde7a6d801368494c72'}, 'origin': '122.55.11.188', 'url': 'https://www.httpbin.org/get?name=鋼鐵知識庫&age=23'} '''
可以看到實際請求的URL後面帶了字尾,這就是params的內容。
除了get請求,aiohttp還支援其它請求型別,如POST、PUT、DELETE等,和requests使用方式類似。
session.post('http://httpbin.org/post', data=b'data') session.put('http://httpbin.org/put', data=b'data') session.delete('http://httpbin.org/delete') session.head('http://httpbin.org/get') session.options('http://httpbin.org/get') session.patch('http://httpbin.org/patch', data=b'data')
要使用這些方法,只需要把對應的方法和引數替換一下。用法和get類似就不再舉例。
對於響應來說,我們可以用如下方法分別獲取其中的響應情況。狀態碼、響應頭、響應體、響應體二進位制內容、響應體JSON結果,範例如下:
#!/usr/bin/env python # @Author : 鋼鐵知識庫 import aiohttp import asyncio async def main(): data = {'name': '鋼鐵知識庫', 'age': 23} async with aiohttp.ClientSession() as session: async with session.post('https://www.httpbin.org/post', data=data) as response: print('status:', response.status) # 狀態碼 print('headers:', response.headers) # 響應頭 print('body:', await response.text()) # 響應體 print('bytes:', await response.read()) # 響應體二進位制內容 print('json:', await response.json()) # 響應體json資料 if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main())
''' status: 200 headers: <CIMultiDictProxy('Date': 'Tue, 06 Sep 2022 00:18:36 GMT', 'Content-Type': 'application/json', 'Content-Length': '534', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')> body: { "args": {}, "data": "", "files": {}, "form": { "age": "23", "name": "u94a2u94c1u77e5u8bc6u5e93" }, "headers": { "Accept": "*/*", "Accept-Encoding": "gzip, deflate", "Content-Length": "57", "Content-Type": "application/x-www-form-urlencoded", "Host": "www.httpbin.org", "User-Agent": "Python/3.8 aiohttp/3.8.1", "X-Amzn-Trace-Id": "Root=1-631691dc-6aa1b2b85045a1a0481d06e1" }, "json": null, "origin": "122.55.11.188", "url": "https://www.httpbin.org/post" } bytes: b'{n "args": {}, n "data": "", n "files": {}, n "form": {n "age": "23", n "name": "\u94a2\u94c1\u77e5\u8bc6\u5e93"n }, n "headers": {n "Accept": "*/*", n "Accept-Encoding": "gzip, deflate", n "Content-Length": "57", n "Content-Type": "application/x-www-form-urlencoded", n "Host": "www.httpbin.org", n "User-Agent": "Python/3.8 aiohttp/3.8.1", n "X-Amzn-Trace-Id": "Root=1-631691dc-6aa1b2b85045a1a0481d06e1"n }, n "json": null, n "origin": "122.5.132.196", n "url": "https://www.httpbin.org/post"n}n' json: {'args': {}, 'data': '', 'files': {}, 'form': {'age': '23', 'name': '鋼鐵知識庫'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Content-Length': '57', 'Content-Type': 'application/x-www-form-urlencoded', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.8 aiohttp/3.8.1', 'X-Amzn-Trace-Id': 'Root=1-631691dc-6aa1b2b85045a1a0481d06e1'}, 'json': None, 'origin': '122.55.11.188', 'url': 'https://www.httpbin.org/post'} '''
可以看到有些欄位前面需要加await,因為其返回的是一個協程物件(如async修飾的方法),那麼前面就要加await。
我們可以藉助ClientTimeout
物件設定超時,例如要設定1秒的超時時間,可以這麼實現:
#!/usr/bin/env python # @Author : 鋼鐵知識庫 import aiohttp import asyncio async def main(): # 設定 1 秒的超時 timeout = aiohttp.ClientTimeout(total=1) data = {'name': '鋼鐵知識庫', 'age': 23} async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get('https://www.httpbin.org/delay/2', data=data) as response: print('status:', response.status) # 狀態碼 if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main()) ''' Traceback (most recent call last): ####中間省略#### raise asyncio.TimeoutError from None asyncio.exceptions.TimeoutError '''
這裡設定了超時1秒請求延時2秒,發現丟擲異常asyncio.TimeoutError
,如果正常則響應200。
aiohttp可以支援非常高的並行量,但面對高並行網站可能會承受不住,隨時有掛掉的危險,這時需要對並行進行一些控制。現在我們藉助asyncio 的Semaphore來控制並行量,範例如下:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : 鋼鐵知識庫 import asyncio from datetime import datetime import aiohttp # 宣告最大並行量 semaphore = asyncio.Semaphore(2) async def get_api(): async with semaphore: print(f'scrapting...{datetime.now()}') async with session.get('https://www.baidu.com') as response: await asyncio.sleep(2) # print(f'當前時間:{datetime.now()}, {response.status}') async def main(): global session session = aiohttp.ClientSession() tasks = [asyncio.ensure_future(get_api()) for _ in range(1000)] await asyncio.gather(*tasks) await session.close() if __name__ == '__main__': asyncio.get_event_loop().run_until_complete(main()) ''' scrapting...2022-09-07 08:11:14.190000 scrapting...2022-09-07 08:11:14.292000 scrapting...2022-09-07 08:11:16.482000 scrapting...2022-09-07 08:11:16.504000 scrapting...2022-09-07 08:11:18.520000 scrapting...2022-09-07 08:11:18.521000 '''
在main方法裡,我們宣告了1000個task,如果沒有通過Semaphore進行並行限制,那這1000放到gather方法後會被同時執行,並行量相當大。有了號誌的控制之後,同時執行的task數量就會被控制,這樣就能給aiohttp限制速度了。
接下來我們通過非同步方式練手一個小說爬蟲,需求如下:
需求頁面:https://dushu.baidu.com/pc/detail?gid=4308080950
目錄介面:https://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"4308080950"}
詳情介面:
https://dushu.baidu.com/api/pc/getChapterContent?data={"book_id":"4295122774","cid":"4295122774|116332"}
關鍵引數:book_id
:小說ID、cid
:章節id
採集要求:使用協程方式寫入,資料存放進mongo
需求分析:點開需求頁面,通過F12抓包可以發現兩個介面。一個目錄介面,一個詳情介面。
首先第一步先請求目錄介面拿到cid章節id,然後將cid傳遞給詳情介面拿到小說資料,最後存入mongo即可。
話不多說,直接上程式碼:
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Author : 鋼鐵知識庫 # 不合適就是不合適,真正合適的,你不會有半點猶豫。 import asyncio import json,re import logging import aiohttp import requests from utils.conn_db import ConnDb # 紀錄檔格式 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s') # 章節目錄api b_id = '4308080950' url = 'https://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"'+b_id+'"}' headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/104.0.0.0 Safari/537.36" } # 並行宣告 semaphore = asyncio.Semaphore(5) async def download(title,b_id, cid): data = { "book_id": b_id, "cid": f'{b_id}|{cid}', } data = json.dumps(data) detail_url = 'https://dushu.baidu.com/api/pc/getChapterContent?data={}'.format(data) async with semaphore: async with aiohttp.ClientSession(headers=headers) as session: async with session.get(detail_url) as response: res = await response.json() content = { 'title': title, 'content': res['data']['novel']['content'] } # print(title) await save_data(content) async def save_data(data): if data: client = ConnDb().conn_motor_mongo() db = client.baidu_novel collection = db.novel logging.info('saving data %s', data) await collection.update_one( {'title': data.get('title')}, {'$set': data}, upsert=True ) async def main(): res = requests.get(url, headers=headers) tasks = [] for re in res.json()['data']['novel']['items']: # 拿到某小說目錄cid title = re['title'] cid = re['cid'] tasks.append(download(title, b_id, cid)) # 將請求放到列表裡,再通過gather執行並行 await asyncio.gather(*tasks) if __name__ == '__main__': asyncio.run(main())
至此,我們就使用aiohttp完成了對小說章節的爬取。
要實現非同步處理,得先要有掛起操作,當一個任務需要等待 IO 結果的時候,可以掛起當前任務,轉而去執行其他任務,這樣才能充分利用好資源,要實現非同步,需要了解 await 的用法,使用 await 可以將耗時等待的操作掛起,讓出控制權。當協程執行的時候遇到 await,時間迴圈就會將本協程掛起,轉而去執行別的協程,直到其他的協程掛起或執行完畢。
await 後面的物件必須是如下格式之一:
以上就是藉助協程async和非同步aiohttp兩個主要模組完成非同步爬蟲的內容,
aiohttp 以非同步方式爬取網站的耗時遠小於 requests 同步方式,以上列舉的例子希望對你有幫助。
注意,執行緒和協程是兩個概念,後面找機會我們再聊聊程序和執行緒、執行緒和協程的關係
更多關於python aiohttp非同步爬蟲的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45