時(shí)間:2023-04-26 19:00:02 | 來(lái)源:網(wǎng)站運(yùn)營(yíng)
時(shí)間:2023-04-26 19:00:02 來(lái)源:網(wǎng)站運(yùn)營(yíng)
Python爬蟲--代理池維護(hù):import requestsfrom pyquery import PyQuery as pq""" 爬取代理網(wǎng)站的免費(fèi)代理并返回"""class Crawler(object): def get_crawler_proxy(self): proxy_list = self.crawl_xici() return proxy_list def crawl_xici(self): """ 爬取西刺代理 """ proxy_list = [] for i in range(1, 20): url = 'http://www.xicidaili.com/nn/' + str(i) headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Host': 'www.xicidaili.com', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36' } res = requests.get(url, headers=headers) doc = pq(res.text) for odd in doc('.odd').items(): info_list = odd.find('td').text().split(' ') if len(info_list) == 11: proxy = info_list[5].lower().strip() + '://' + info_list[1].strip() + ':' + info_list[2].strip() proxy_list.append(proxy) return proxy_list
import randomimport redisREDIS_HOST = '127.0.0.1'REDIS_PORT = 6379# redis的Sorted Set結(jié)構(gòu)的keyREDIS_KEY = 'proxies'# 初始化優(yōu)先級(jí)的值為10INITAL_SCORE = 10# 優(yōu)先級(jí)最小值MIN_SCORE =0# 優(yōu)先級(jí)最大值MAX_SCORE = 100""" 利用redis的Sorted Set結(jié)構(gòu), 從redis中添加、查詢、獲取代理,以及修改代理的優(yōu)先級(jí)"""class RedisClient(object): def __init__(self, host=REDIS_HOST, port=REDIS_PORT): print('redis連接成功......') self.redisdb = redis.StrictRedis(host=host, port=port) def add(self, proxy, score=INITAL_SCORE): """ 添加代理,利用redis的Sorted Set結(jié)構(gòu)存儲(chǔ) zadd函數(shù)的三個(gè)參數(shù):key是'proxies',按照score確定代理的優(yōu)先級(jí)排序,value為代理proxy 如果proxy已經(jīng)存在redis中,就不添加; 新添加進(jìn)來(lái)的代理proxy默認(rèn)優(yōu)先級(jí)為10 """ if not self.redisdb.zscore(REDIS_KEY, proxy): self.redisdb.zadd(REDIS_KEY, score, proxy) def get_proxy(self): """ 先獲取優(yōu)先級(jí)最高的代理,如果有,就從優(yōu)先級(jí)最高的代理中挑一個(gè),如果沒(méi)有,就按優(yōu)先級(jí)前十的proxy中隨便選一個(gè) """ # 返回key為REDIS_KEY的zset結(jié)構(gòu)中score在給定區(qū)間(100,100)的元素 res = self.redisdb.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE) if len(res): proxy = random.choice(res) else: if self.redisdb.zrevrange(REDIS_KEY, 0, 10): proxy = random.choice(self.redisdb.zrevrange(REDIS_KEY, 0, 10)) else: raise Exception return proxy def decrease(self, proxy): """ 降低代理proxy的優(yōu)先級(jí)score的值 檢測(cè)到代理proxy不可用是,就降低這個(gè)代理的優(yōu)先級(jí),優(yōu)先級(jí)降低至0,就刪除該代理proxy """ score = self.redisdb.zscore(REDIS_KEY, proxy) if score and score > MIN_SCORE: self.redisdb.zincrby(REDIS_KEY, proxy, -1) else: self.redisdb.zrem(REDIS_KEY, proxy) def exist(self, proxy): """ 判斷代理是否存在 """ score = self.redisdb.zscore(REDIS_KEY, proxy) if score: return True else: return False def max(self, proxy): """ 檢測(cè)到代理可用,就將其優(yōu)先級(jí)設(shè)置成最大100 """ self.redisdb.zadd(REDIS_KEY, MAX_SCORE, proxy) def get_proxy_count(self): """ 獲取redis中代理數(shù)量 """ return self.redisdb.zcard(REDIS_KEY) def get_all_proxy(self): """ 獲取全部代理proxy """ return self.redisdb.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE)
from Crawler import Crawlerfrom RedisClient import RedisClientFULL_COUNT = 2000class Getter(object): def __init__(self): self.redis_client = RedisClient() self.crawler = Crawler() def is_full(self): """ 判斷代理池是否滿了 """ if self.redis_client.get_proxy_count() >= FULL_COUNT: return True else: return False def run(self): """ 將爬取到的代理存入redis """ if not self.is_full(): proxy_list = self.crawler.get_crawler_proxy() for proxy in proxy_list: self.redis_client.add(proxy)
import RedisClientimport asyncioimport aiohttpimport tracebackimport time""" 檢測(cè)模塊"""# 用來(lái)測(cè)試代理是否可用的地址test_url = 'http://www.baidu.com'class Tester(object): def __init__(self): self.redisdb = RedisClient.RedisClient() """ 通過(guò)async關(guān)鍵字創(chuàng)建一個(gè)協(xié)程函數(shù) """ async def test_proxy(self, proxy): """ 檢測(cè)代理的可用性 """ # conn = aiohttp.TCPConnector(verify_ssl=False) async with aiohttp.ClientSession() as session: try: if isinstance(proxy, bytes): proxy = proxy.decode('utf-8') print('正在檢測(cè) : %s' % proxy) async with session.get(test_url, proxy=proxy, timeout=15) as res: if res.status == 200: # 說(shuō)明代理可用,將其優(yōu)先級(jí)置為最大 self.redisdb.max(proxy) print('代理可用 : %s' % proxy) else: # 代理不可用,就降低其優(yōu)先級(jí) self.redisdb.decrease(proxy) print('代理不可用 : %s' % proxy) except Exception as e: self.redisdb.decrease(proxy) print('代理不可用 : %s (%s)' % (proxy, e)) def run(self): print('啟動(dòng)檢測(cè)模塊......') try: # 獲取redis中所有爬取到的代理 proxies = self.redisdb.get_all_proxy() loop = asyncio.get_event_loop() # 分批檢測(cè) for i in range(0, len(proxies), 50): test_proxies = proxies[i:i+50] tasks = [] for test_proxy in test_proxies: # 調(diào)用協(xié)程函數(shù),返回協(xié)程對(duì)象 coroutine = self.test_proxy(test_proxy) tasks.append(coroutine) loop.run_until_complete(asyncio.wait(tasks)) time.sleep(5) except Exception as e: print('檢測(cè)模塊出錯(cuò)?。?!')
from flask import Flask, gimport RedisClient""" 對(duì)外提供web接口,通過(guò)提供的web接口,來(lái)獲取redis中的代理 g是上下文對(duì)象,處理請(qǐng)求時(shí),用于臨時(shí)存儲(chǔ)的對(duì)象,每次請(qǐng)求都會(huì)重設(shè)這個(gè)變量。比如:我們可以獲取一些臨時(shí)請(qǐng)求的用戶信息。"""app = Flask(__name__)@app.route('/')def index(): return '<h2>歡迎來(lái)到daacheng代理池系統(tǒng)</h2>'def get(): if not hasattr(g, 'redis'): g.redis = RedisClient.RedisClient() return g.redis@app.route('/random')def get_random_proxy(): # 從代理池中返回一個(gè)代理 redisdb = get() return redisdb.get_proxy()@app.route('/count')def count(): # 查詢代理池中代理的個(gè)數(shù) redisdb = get() return str(redisdb.get_proxy_count())@app.route('/all')def get_all(): # 查詢代理池中代理的個(gè)數(shù) redisdb = get() return str(redisdb.get_all_proxy())if __name__ == '__main__': app.run()
from Getter import Getterfrom Tester import Testerimport multiprocessingimport time""" 調(diào)度模塊"""class Controller(object): """ 獲取功能:爬取代理網(wǎng)站,將代理存儲(chǔ)到redis """ def control_get(self): getter = Getter() while True: getter.run() time.sleep(20) """ 檢測(cè)功能,檢測(cè)redis中的代理是否可用 """ def control_test(self): tester = Tester() while True: tester.run() time.sleep(20) def run(self): print('代理池開始運(yùn)行了......') # 兩個(gè)進(jìn)程 get = multiprocessing.Process(target=self.control_get) get.start() test = multiprocessing.Process(target=self.control_test) test.start()if __name__ == '__main__': control = Controller() control.run()
關(guān)鍵詞:維護(hù),代理,爬蟲
客戶&案例
營(yíng)銷資訊
關(guān)于我們
客戶&案例
營(yíng)銷資訊
關(guān)于我們
微信公眾號(hào)
版權(quán)所有? 億企邦 1997-2025 保留一切法律許可權(quán)利。