網(wǎng)站首頁 編程語言 正文
有個需求場景是這樣的,使用redis控制scrapy運(yùn)行的數(shù)量。當(dāng)系統(tǒng)的后臺設(shè)置為4時,只允許scapry啟動4個任務(wù),多余的任務(wù)則進(jìn)行排隊。
概況
最近做了一個django + scrapy + celery + redis 的爬蟲系統(tǒng),客戶購買的主機(jī)除了跑其他程序外,還要跑我開發(fā)的這套程序,所以需要手動控制scrapy的實(shí)例數(shù)量,避免過多的爬蟲給系統(tǒng)造成負(fù)擔(dān)。
流程設(shè)計
1、爬蟲任務(wù)由用戶以請求的方式發(fā)起,所有的用戶的請求統(tǒng)一進(jìn)入到celery進(jìn)行排隊;
2、任務(wù)數(shù)量控制的執(zhí)行就交給reids,經(jīng)由celery保存到redis,包含了爬蟲啟動所需要的必要信息,從redis取一條信息即可啟動一個爬蟲;
3、通過scrapyd的接口來獲取當(dāng)前在運(yùn)行的爬蟲數(shù)量,以便決定下一步流程:如果小于4,則從redis中取相應(yīng)數(shù)量的信息來啟動爬蟲,如果大于等于4,則繼續(xù)等待;
4、如果在運(yùn)行爬蟲的數(shù)量有所減少,則及時從reids中取相應(yīng)數(shù)量的信息來啟動爬蟲。
代碼實(shí)現(xiàn)
業(yè)務(wù)代碼有點(diǎn)復(fù)雜和啰嗦,此處使用偽代碼來演示
import redis
# 實(shí)例化一個redis連接池
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, decode_responses=True, db=4, password='')
r = redis.Redis(connection_pool=pool)
# 爬蟲實(shí)例限制為4 即只允許4個scrapy實(shí)例在運(yùn)行
limited = 4
# 聲明redis的樂觀鎖
lock = r.Lock()
# lock.acquire中有while循環(huán),即它會線程阻塞,直到當(dāng)前線程獲得redis的lock,才會繼續(xù)往下執(zhí)行代碼
if lock.acquire():
# 1、從reids中取一條爬蟲信息
info = redis.get()
# 2、while循環(huán)監(jiān)聽爬蟲運(yùn)行的數(shù)量
while True:
req = requests.get('http://127.0.0.1:6800/daemonstatus.json').json()
# 統(tǒng)計當(dāng)前有多少個爬蟲在運(yùn)行
running = req.get('running') + req.get('pending')
# 3、判斷是否等待還是要增加爬蟲數(shù)量
# 3.1 如果在運(yùn)行的數(shù)量大于等于設(shè)置到量 則繼續(xù)等待
if running >= limited:
continue
# 3.2 如果小于 則啟動爬蟲
start_scrapy(info)
# 3.3 將info從redis中刪除
redis.delete(info)
# 3.4 釋放鎖
lock.release()
break
當(dāng)前,這只是偽代碼而已,實(shí)際的業(yè)務(wù)邏輯可能是非常復(fù)雜的,如:
@shared_task
def scrapy_control(key_uuid):
r = redis.Redis(connection_pool=pool)
db = MysqlDB()
speed_limited = db.fetch_config('REPTILE_SPEED')
speed_limited = int(speed_limited[0])
keywords_num = MysqlDB().fetch_config('SEARCH_RANDOM')
keywords_num = int(keywords_num[0])
# while True:
lock = r.lock('lock')
with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 進(jìn)入處理環(huán)節(jié)' + '\n')
try:
# acquire默認(rèn)阻塞 如果獲取不到鎖時 會一直阻塞在這個函數(shù)的while循環(huán)中
if lock.acquire():
with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 獲得鎖' + '\n')
# 1 從redis中獲取信息
redis_obj = json.loads(r.get(key_uuid))
user_id = redis_obj.get('user_id')
contents = redis_obj.get('contents')
# 2 使用while循環(huán)處理核心邏輯
is_hold_print = True
while True:
req = requests.get('http://127.0.0.1:6800/daemonstatus.json').json()
running = req.get('running') + req.get('pending')
# 3 如果仍然有足夠的爬蟲在運(yùn)行 則hold住redis鎖,等待有空余的爬蟲位置讓出
if running >= speed_limited:
if is_hold_print:
with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 爬蟲在運(yùn)行,線程等待中' + '\n')
is_hold_print = False
time.sleep(1)
continue
# 4 有空余的爬蟲位置 則往下走
# 4.1 處理完所有的內(nèi)容后 釋放鎖
if len(contents) == 0:
r.delete(key_uuid)
with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 任務(wù)已完成,從redis中刪除' + '\n')
lock.release()
with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 釋放鎖' + '\n')
break
# 4.2 創(chuàng)建task任務(wù)
task_uuid = str(uuid.uuid4())
article_obj = contents.pop()
article_id = article_obj.get('article_id')
article = article_obj.get('content')
try:
Task.objects.create(
task_uuid = task_uuid,
user_id = user_id,
article_id = article_id,
content = article
)
except Exception as e:
with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + '->' + str(task_uuid) + ' 創(chuàng)建Task出錯: ' + str(e) + '\n')
# finally:
# 4.3 啟動爬蟲任務(wù) 即便創(chuàng)建task失敗也會啟動
try:
task_chain(user_id, article, task_uuid, keywords_num)
except Exception as e:
with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 啟動任務(wù)鏈?zhǔn)? ' + str(e) + '\n')
# 加入sleep 防止代碼執(zhí)行速度快于爬蟲啟動速度而導(dǎo)致當(dāng)前線程啟動額外的爬蟲
time.sleep(5)
except Exception as e:
with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 獲得鎖之后的操作出錯: ' + str(e) + '\n')
lock.release()
小坑
scrapy啟動速度相對較慢,所以while循環(huán)中,代碼中執(zhí)行到了爬蟲的啟動,需要sleep一下再去通過scrapyd接口獲取爬蟲運(yùn)行的數(shù)量,如果立刻讀取,可能會造成誤判。
原文鏈接:https://www.cnblogs.com/mooremok/p/16961634.html
相關(guān)推薦
- 2022-07-12 如何利用python實(shí)現(xiàn)kmeans聚類_python
- 2022-09-24 python?繪制3D圖案例分享_python
- 2022-12-10 C++?vector與數(shù)組轉(zhuǎn)換寫入/讀出文件方式_C 語言
- 2022-01-18 移動端自適應(yīng)布局(viewport+rem)
- 2022-10-10 conda創(chuàng)建環(huán)境、安裝包、刪除環(huán)境步驟詳細(xì)記錄_python
- 2023-01-19 Scrapy中詭異xpath的匹配內(nèi)容失效問題及解決_python
- 2022-05-01 Python中的bytes類型用法及實(shí)例分享_python
- 2022-05-29 C語言?超詳細(xì)模擬實(shí)現(xiàn)單鏈表的基本操作建議收藏_C 語言
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支