左手用R右手Python系列——多进程/线程数据抓取与网页请求

469 阅读4分钟
原文链接: zhuanlan.zhihu.com

这一篇涉及到如何在网页请求环节使用多进程任务处理功能,因为网页请求涉及到两个重要问题:一是多进程的并发操作会面临更大的反爬风险,所以面临更严峻的反爬风险,二是抓取网页数据需要获取返回值,而且这些返回值需要汇集成一个关系表(数据框)(区别于上一篇中的二进制文件下载,文件下载仅仅执行语句块命令即可,无需收集返回值)。

R语言使用RCurl+XML,Python使用urllib+lxml。

library("RCurl")
library("XML")
library("magrittr")

方案1——自建显式循环:

Getjobs <- function(){
    fullinfo <- data.frame()
    headers <- c("Referer"="https://www.hellobi.com/jobs/search",
            "User-Agent"="Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36"
             )
    d <- debugGatherer()
    handle <- getCurlHandle(debugfunction=d$update,followlocation=TRUE,cookiefile="",verbose = TRUE)
    i = 0
    while (i < 11){
        i = i+1
        url <- sprintf("https://www.hellobi.com/jobs/search?page=%d",i)
        tryCatch({
        content    <- getURL(url,.opts=list(httpheader=headers),.encoding="utf-8",curl=handle) %>% htmlParse() 
        job_item   <- content %>% xpathSApply(.,"//div[@class='job_item_middle pull-left']/h4/a",xmlValue)
        job_links  <- content %>% xpathSApply(.,"//div[@class='job_item_middle pull-left']/h4/a",xmlGetAttr,"href")
        job_info   <- content %>% xpathSApply(.,"//div[@class='job_item_middle pull-left']/h5",xmlValue,trim = TRUE) 
        job_salary <- content %>% xpathSApply(.,"//div[@class='job_item-right pull-right']/h4",xmlValue,trim = TRUE) 
        job_origin <- content %>% xpathSApply(.,"//div[@class='job_item-right pull-right']/h5",xmlValue,trim = TRUE)
        myreslut   <-  data.frame(job_item,job_links,job_info,job_salary,job_origin) 
        fullinfo  <- rbind(fullinfo,myreslut) 
        cat(sprintf("第【%d】页已抓取完毕!",i),sep = "\n")
        },error = function(e){
        cat(sprintf("第【%d】页抓取失败!",i),sep = "\n")
        })
    }
    cat("all page is OK!!!")
    return (fullinfo)
}
system.time(mydata1 <- Getjobs())

整个过程耗时11.03秒。

方案2——使用向量化函数:

Getjobs <- function(i){
    fullinfo <- data.frame()
    headers <- c("Referer"="https://www.hellobi.com/jobs/search",
            "User-Agent"="Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36"
             )
    d <- debugGatherer()
    handle <- getCurlHandle(debugfunction=d$update,followlocation=TRUE,cookiefile="",verbose = TRUE)
    url <- sprintf("https://www.hellobi.com/jobs/search?page=%d",i)
    content    <- getURL(url,.opts=list(httpheader=headers),.encoding="utf-8",curl=handle) %>% htmlParse() 
    job_item   <- content %>% xpathSApply(.,"//div[@class='job_item_middle pull-left']/h4/a",xmlValue)
    job_links  <- content %>% xpathSApply(.,"//div[@class='job_item_middle pull-left']/h4/a",xmlGetAttr,"href")
    job_info   <- content %>% xpathSApply(.,"//div[@class='job_item_middle pull-left']/h5",xmlValue,trim = TRUE) 
    job_salary <- content %>% xpathSApply(.,"//div[@class='job_item-right pull-right']/h4",xmlValue,trim = TRUE) 
    job_origin <- content %>% xpathSApply(.,"//div[@class='job_item-right pull-right']/h5",xmlValue,trim = TRUE)
    data.frame(job_item,job_links,job_info,job_salary,job_origin) %>% return()
}

system.time(mydata <- plyr::ldply(1:10,Getjobs,.progress = "text"))

整个过程耗时9.07m。

方案3——使用多进程包:

system.time({
  library("doParallel")
  library("foreach")
  cl<-makeCluster(4)
  registerDoParallel(cl)
  mydata2 <- foreach(i=1:10,
                      .combine=rbind,
                      .packages = c("RCurl", "XML","magrittr")
                      ) %dopar% Getjobs(i)
  stopCluster(cl)
  })

总耗时5.14秒。

这里解释一下昨天的多进程下载pdf文件为何没有任何效果,我觉得是因为,对于网络I/O密集型的任务,网络下载过程带宽不足,耗时太久,几乎掩盖了多进程的时间节省(pdf文件平均5m)。

Python版:

Python的案例使用urllib、lxml包进行演示。

from urllib.request import urlopen,Request
import pandas as pd
import numpy as np
import time
from lxml import etree

方案1——使用显式循环抓取:

def getjobs(i):
    myresult = {
              "job_item":[],
              "job_links":[],
              "job_info":[],
              "job_salary":[],
              "job_origin":[]
              };
    header ={
             'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36',
             'Referer':'https://www.hellobi.com/jobs/search'
              }
    i =0
    while i < 11:
        i+=1 
        url = "https://www.hellobi.com/jobs/search?page={}".format(i)
        pagecontent=urlopen(Request(url,headers=header)).read().decode('utf-8')
        result = etree.HTML(pagecontent)
        myresult["job_item"].extend(result.xpath('//div[@class="job_item_middle pull-left"]/h4/a/text()'))
        myresult["job_links"].extend(result.xpath('//div[@class="job_item_middle pull-left"]/h4/a/@href'))
        myresult["job_info"].extend([ text.xpath('string(.)').strip() for text in  result.xpath('//div[@class="job_item_middle pull-left"]/h5')])
        myresult["job_salary"].extend(result.xpath('//div[@class="job_item-right pull-right"]/h4/span/text()'))
        myresult["job_origin"].extend(result.xpath('//div[@class="job_item-right pull-right"]/h5/span/text()'))
        time.sleep(1)
        print("正在抓取第【{}】页".format(i))
    print("everything is OK")
    return pd.DataFrame(myresult)
    
if __name__ == "__main__":
    t0 = time.time()
    mydata1 = getjobs(list(range(1,11)))
    t1 = time.time()
    total = t1 - t0
    print("消耗时间:{}".format(total))

总耗时将近19秒,(代码中设置有时延,估测净时间在9秒左右)

方案2——使用多线程方式抓取:

def executeThread(i):
    myresult = {
              "job_item":[], 
             "job_links":[], 
             "job_info":[], 
             "job_salary":[],
              "job_origin":[]
              };
    header ={
              'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36',
              'Referer':'https://www.hellobi.com/jobs/search'
              }
    url = "https://www.hellobi.com/jobs/search?page={}".format(i)
    try:
        pagecontent=urlopen(Request(url,headers=header)).read().decode('utf-8')
        result = etree.HTML(pagecontent)
        myresult["job_item"].extend(result.xpath('//div[@class="job_item_middle pull-left"]/h4/a/text()'))
        myresult["job_links"].extend(result.xpath('//div[@class="job_item_middle pull-left"]/h4/a/@href'))
        myresult["job_info"].extend([ text.xpath('string(.)').strip() for text in  result.xpath('//div[@class="job_item_middle pull-left"]/h5')])
        myresult["job_salary"].extend(result.xpath('//div[@class="job_item-right pull-right"]/h4/span/text()'))
        myresult["job_origin"].extend(result.xpath('//div[@class="job_item-right pull-right"]/h5/span/text()'))
    except:
        pass
    with open('D:/Python/File/hellolive.csv', 'a+') as f:
        pd.DataFrame(myresult).to_csv(f, index = False,header= False if i > 1 else True)

def main():
    threads = []
    for i in range(1,11):
        thread = threading.Thread(target=executeThread,args=(i,))
        threads.append(thread)
        thread.start()
    for i in threads:
        i.join()
    
if __name__ == '__main__':
    t0 = time.time()
    main()
    t1 = time.time()
    total = t1 - t0
    print("消耗时间:{}".format(total))

以上多进程模式仅使用了1.64m,多进程爬虫的优势与单进程相比效率非常明显。

方案3——使用多进程方式抓取:

from  multiprocessing  import Pool
from urllib.request import urlopen,Request
import pandas as pd
import time
from lxml import etree
def executeThread(i):
    myresult = {
              "job_item":[],
              "job_links":[],
              "job_info":[], 
             "job_salary":[],
              "job_origin":[]
              };
    header ={
             'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36',
             'Referer':'https://www.hellobi.com/jobs/search'
              }
    url = "https://www.hellobi.com/jobs/search?page={}".format(i)
    try:
        pagecontent=urlopen(Request(url,headers=header)).read().decode('utf-8')
        result = etree.HTML(pagecontent)
        myresult["job_item"].extend(result.xpath('//div[@class="job_item_middle pull-left"]/h4/a/text()'))
        myresult["job_links"].extend(result.xpath('//div[@class="job_item_middle pull-left"]/h4/a/@href'))
        myresult["job_info"].extend([ text.xpath('string(.)').strip() for text in  result.xpath('//div[@class="job_item_middle pull-left"]/h5')])
        myresult["job_salary"].extend(result.xpath('//div[@class="job_item-right pull-right"]/h4/span/text()'))
        myresult["job_origin"].extend(result.xpath('//div[@class="job_item-right pull-right"]/h5/span/text()'))    except:        pass
    with open('D:/Python/File/hellolive.csv', 'a+') as f:
        pd.DataFrame(myresult).to_csv(f, index = False,header= False if i > 1 else True)

def shell():
    # Multi-process
    pool = Pool(multiprocessing.cpu_count())
    pool.map(excuteThread,list(range(1,11)))
    pool.close()
    pool.join()
    
if __name__ == "__main__":
    #计时开始:
    t0 = time.time()
    shell()
    t1 = time.time()
    total = t1 - t0
    print("消耗时间:{}".format(total))

最后的多进程执行时间差不多也在1.5s左右,但是因为windows的forks问题,不能直接在编辑器中执行,需要将多进程的代码放在.py文件,然后将.py文件在cmd或者PowerShell中执行。

c从今天这些案例可以看出,对于网络I/O密集型任务而言,多线程和多进程确实可以提升任务效率,但是速度越快也意味着面临着更大的反爬压力,特别是在多进程/多线程环境下,并发处理需要做更加加完善的伪装措施,比如考虑提供随机UA/IP,以防过早被封杀。

在线课程请点击文末原文链接:

Hellobi Live |2018年1月16日 R语言爬虫实战案例分享:网易云课堂、知乎live、今日头条、B站视频

往期案例数据请移步本人GitHub:

github.com/ljtyduyu/Da…