阅读 2629

使用WebMagic+ActiveMQ+Quartz实现全国城镇天气自动更新的API接口开发

一 简介

我在之前的某个项目中需要用到天气接口,但是遍观网上的天气API要么是收费的要么有使用次数或者频率的限制。因此我决定根据网上的专业天气网站结合爬虫技术自己开发一套天气自动定时抓取更新的API接口

(1)技术依赖:

  • SSM(Spring+Spring MVC+Mybatis):项目基本架构
  • WebMagic:轻量型爬虫框架,用于抓取每个城镇的天气以及抓取免费代理IP
  • ActiveMQ:消息中间件,用于在定时更新全国城镇天气时将每个城镇天气的更新任务压入消息队列中,之后再使用多个消费者去消费这些消息(PS:多个消息消费者同时更新这些城镇的天气)
  • Quartz:定时调度,用于设置每隔多久更新一次天气;每隔多久抓取一次代理IP;每隔多久检测一下数据库中的代理IP是否仍然有效
  • Apache CXF:用于对外发布SOAP风格和RESTFul风格的接口

(2)环境依赖:

  • JDK7及以上
  • Tomcat7及以上
  • ActiveMQ-5.14.1及以上
  • MYSQL5及以上

注:项目中使用到的SQL文件:raw.githubusercontent.com/zifangsky/W…

(3)项目运行:

i)源码下载:

全部代码已经开源,项目地址是:github.com/zifangsky/W…
PS:希望感兴趣的朋友给我来一波star,谢谢!

ii)配置依赖环境:

首先编译源代码,生成war包并将war包放到Tomcat中。接着分别启动运行MYSQL和ActiveMQ

修改配置文件中对应的MYSQL和ActiveMQ的连接参数以及定时任务的更新频率,配置文件的路径是:src/main/env/dev/

iii)运行项目:

启动Tomcat之后,根据前面设置的定时任务将会在指定时间开始执行天气更新任务、代理IP获取任务以及代码IP的可用性检测任务

(4)RESTful接口:

除了一些基本的SOAP风格的接口之外,我还对外发布了4个RESTful风格的接口。它们分别是:

i)随机返回一个可用的代理IP:

http://localhost:7080/WeatherSpider/services/rest/proxyIpService/getRandomOne

ii)返回当前所有可用的代理IP:

http://localhost:7080/WeatherSpider/services/rest/proxyIpService/getAll
其效果如下:

当前所有可用代理IP

注:可以在这个网站对json字符串进行格式化:json.cn/

iii)根据城镇CODE返回一个城镇天气:

http://localhost:7080/WeatherSpider/services/rest/weatherService/getWeatherByStationCode?stationCode=101060404
其效果如下:

根据CODE查询天气详情

iv)根据城镇名称模糊查询,返回所有匹配的城镇天气:

http://localhost:7080/WeatherSpider/services/rest/weatherService/getWeatherByStationName?stationName=朝阳
其效果如下:

根据关键字模糊查询天气详情

在上面我介绍了这个天气API小项目的一些基本情况,以及如何根据源代码在本地运行,最后还介绍了几个对外发布的RESTful风格的接口。在接下来的篇幅中我将介绍如何来开发这样的天气API以及代理IP池API,感兴趣的同学可以跟我继续往下看下去


二 全国城镇天气自动更新API开发

(1)天气数据源选取:

要想实现用爬虫抓取全国城镇的天气,那么我们首先需要选取一个合适的天气数据来源。在经过了简单对比之后,我最终选择了中国天气网作为我爬虫的数据源

随便查询一个地区的天气,我们可以发现它的天气详情页面一般是这种格式:
http://www.weather.com.cn/weather/101010300.shtml

对于后面的那串数字我暂且称作每个城镇的CODE,其值为:省CODE+市CODE +县(区)CODE

关于这个数字是如何组合起来的,接下来我们一起来分析:

打开这个网站的某个分站,如:http://bj.weather.com.cn/。从页面可以看出有一个三级联动的天气查询下拉框,根据我以往的经验这个页面应该会异步请求一些数据接口。通过在浏览器中按F12之后,观察Network界面,果然发现了数据请求链接:

天气数据源选取

总结一下它的规律就是:

i)全国省、直辖市列表:

http://js.weather.com.cn/data/city3jdata/china.html
其返回值如下:
{"10101":"北京","10102":"上海","10103":"天津","10104":"重庆","10105":"黑龙江","10106":"吉林","10107":"辽宁","10108":"内蒙古","10109":"河北","10110":"山西","10111":"陕西","10112":"山东","10113":"新疆","10114":"西藏","10115":"青海","10116":"甘肃","10117":"宁夏","10118":"河南","10119":"江苏","10120":"湖北","10121":"浙江","10122":"安徽","10123":"福建","10124":"江西","10125":"湖南","10126":"贵州","10127":"四川","10128":"广东","10129":"云南","10130":"广西","10131":"海南","10132":"香港","10133":"澳门","10134":"台湾"}

ii)某个省的市级列表:

比如吉林省它的市级列表的请求接口是:http://js.weather.com.cn/data/city3jdata/provshi/10106.html
其返回值如下:
{"01":"长春","02":"吉林","03":"延边","04":"四平","05":"通化","06":"白城","07":"辽源","08":"松原","09":"白山"}

iii)某个市的县(区)级列表:

市级CODE为省级CODE 加上上面对应的CODE
比如长春市它的县级列表的请求接口是:http://js.weather.com.cn/data/city3jdata/station/1010601.html
其返回值如下:
{"01":"长春","02":"农安","03":"德惠","04":"九台","05":"榆树","06":"双阳"}

iv)某个县(区)天气详情:

很显然,某个县(区)的天气详情的请求地址是:http://www.weather.com.cn/weather/101060101.shtml

关于全国所有县(区)的CODE 我们根据上面的规律写一段简单的代码循环遍历即可获得,最后需要将这些所有获取到的CODE保存到数据库中。每当我们更新全国所有地区天气时,我们使用爬虫循环请求对应的地址获取数据并保存到数据库中即可

(2)使用webmagic获取天气详情:

关于天气数据的选择,我初步选择了获取最近7天的天气以及24小时内分时段的天气,它们在页面中的位置如下图所示:

天气数据详情页面

接下来就是使用webmagic爬虫框架,采用XPath这种HTML节点定位方式获取上图中指定的数据即可

注:关于webmagic的一些基本使用可以参考我之前写的这篇文章:www.zifangsky.cn/853.html

关键部分代码如下:

package cn.zifangsky.spider;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import cn.zifangsky.model.WeatherWeather;
import us.codecraft.webmagic.Page;
import us.codecraft.webmagic.Site;
import us.codecraft.webmagic.processor.PageProcessor;
import us.codecraft.webmagic.selector.Html;
import us.codecraft.webmagic.selector.Selectable;

public class WeatherSpider implements PageProcessor{

    private Site site = Site.me().setTimeOut(20000).setRetryTimes(3)
            .setSleepTime(2000).setCharset("UTF-8");

    @Override
    public Site getSite() {
        Set<Integer> acceptStatCode = new HashSet<>();
        acceptStatCode.add(200);
        site = site.setAcceptStatCode(acceptStatCode).addHeader("Accept-Encoding", "/")
                .setUserAgent(UserAgentUtils.radomUserAgent());

        return site;
    }

    @Override
    public void process(Page page) {
        //最近7天天气
        Selectable sevenStr = page.getHtml().xpath("//div[@id='7d']/ul[@class='t clearfix']");
        //分时段天气
        Selectable hourStr = page.getHtml().xpath("//div[@id='7d']/script");
        //最近24小时整体情况
//        Selectable t24Str = page.getHtml().xpath("//div[@class='left fl']/script");

        WeatherWeather weather = new WeatherWeather();
        weather.setHour(handleHourStr(hourStr));;

        List<String> list = handleSevenDays(sevenStr);
        if(list != null && list.size() == 7){
            weather.setToday(list.get(0));
            weather.setNextday(list.get(1));
            weather.setNext2day(list.get(2));
            weather.setNext3day(list.get(3));
            weather.setNext4day(list.get(4));
            weather.setNext5day(list.get(5));
            weather.setNext6day(list.get(6));
        }
        page.putField("weather", weather);
        page.putField("stationCode", page.getUrl().regex("(\\d+).shtml",1));
    }

    /**
     * 处理分时段天气
     * @param hourStr
     * @return 
     */
    private String handleHourStr(Selectable hourStr) {
        String result = hourStr.regex("1d.*?(\\[.*?\\])",1).replace("&quot;", "\"").toString();

        if(result != null){
            return result;
        }else{
            return "";
        }
    }

    /**
     * 处理最近7天天气
     * @param sevenStr
     * @return 最近7天天气格式化之后的集合
     */
    private List<String> handleSevenDays(Selectable sevenStr) {
        List<String> sevenDays = sevenStr.xpath("//ul[@class='t clearfix']/li").all();
        List<String> result = new ArrayList<>();

        if(sevenDays != null && sevenDays.size() > 0){
            for(String day : sevenDays){
                Html temp = Html.create(day);
                StringBuffer stringBuffer = new StringBuffer();
                stringBuffer.append(temp.xpath("//h1/text()").toString());
                stringBuffer.append("," + temp.xpath("//p[@class='wea']/text()").toString());
                stringBuffer.append("," + temp.xpath("//p[@class='tem']/allText()").toString());

                List<String> windList = temp.xpath("//p[@class='win']/em/span").all();
                String windStr = ",";
                if(windList !=null && windList.size() > 0){
                    for(String win : windList){
                        Html winHtml = Html.create(win);
                        windStr = windStr + winHtml.xpath("//span/@title") + "/";
                    }
                }
                stringBuffer.append(windStr.substring(0, windStr.length()-1));
                stringBuffer.append("," + temp.xpath("//p[@class='win']/i/text()").toString());

                result.add(stringBuffer.toString());
            }
        }

        return result;
    }
}复制代码

在获取到需要的数据之后,接下来做的工作就是数据的持久化。根据数据库中是否存在该城市的天气数据来决定是插入还是更新天气数据:

package cn.zifangsky.spider;

import javax.annotation.Resource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import cn.zifangsky.manager.WeatherStationManager;
import cn.zifangsky.manager.WeatherWeatherManager;
import cn.zifangsky.model.WeatherWeather;
import us.codecraft.webmagic.ResultItems;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.pipeline.Pipeline;

/**
 * 自定义Pipeline处理抓取的数据
 * @author zifangsky
 *
 */
@Component("customPipeline")
public class CustomPipeline implements Pipeline {
    @Autowired
    private WeatherStationManager stationManager;

    @Resource(name="weatherWeatherManager")
    private WeatherWeatherManager weatherManager;

    /**
     * 保存数据
     */
    @Override
    public void process(ResultItems resultItems, Task task) {
        WeatherWeather weather = resultItems.get("weather");
        Long stationId = stationManager.selectIdByCode(resultItems.get("stationCode").toString());
        if(stationId != null){
            weather.setStationId(stationId);
            WeatherWeather oldWeather = weatherManager.selectByStationId(stationId);
            if(oldWeather == null){
                weatherManager.insertSelective(weather);
            }else{
                weather.setId(oldWeather.getId());
                weatherManager.updateByPrimaryKeySelective(weather);
            }

        }

    }

}复制代码

注:到了这一步我们就可以写一个简单的单元测试来测试上面的爬虫代码是否如我们预期那样抓取到了指定的数据:

package cn.zifangsky.test.spider;

import javax.annotation.Resource;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import cn.zifangsky.manager.CrawlManager;
import cn.zifangsky.spider.CustomPipeline;
import cn.zifangsky.spider.WeatherSpider;
import us.codecraft.webmagic.model.OOSpider;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:context/context.xml","classpath:context/context_activemq.xml"})
public class TestSpider{
    @Resource(name="crawlManager")
    private CrawlManager crawlManager;

    @Resource(name="customPipeline")
    private CustomPipeline customPipeline;

    @Test
    public void testWeatherCrawl(){
        OOSpider.create(new WeatherSpider()).addPipeline(customPipeline)
        .addUrl("http://www.weather.com.cn/weather/101060101.shtml")
        .thread(1)
        .run();
        crawlManager.weatherCrawl("101060101");
    }

}复制代码

为了方便后面使用异步消息更新全国所有城市的天气,因此我将上面的爬虫代码封装成了一个天气获取方法:

package cn.zifangsky.manager;

public interface CrawlManager {

    /**
     * 天气爬虫
     * @param stationCode 县城(区)的CODE
     */
    public void weatherCrawl(String stationCode);

    ...
}复制代码

其实现类是:

package cn.zifangsky.manager.impl;

import javax.annotation.Resource;

import org.springframework.stereotype.Service;

import cn.zifangsky.manager.CrawlManager;
import cn.zifangsky.spider.CustomPipeline;
import cn.zifangsky.spider.WeatherSpider;
import us.codecraft.webmagic.model.OOSpider;

@Service("crawlManager")
public class CrawlManagerImpl implements CrawlManager {

    @Resource(name="customPipeline")
    private CustomPipeline customPipeline;

    ...

    @Override
    public void weatherCrawl(String stationCode) {
        OOSpider.create(new WeatherSpider()).addPipeline(customPipeline)
        .addUrl("http://www.weather.com.cn/weather/" + stationCode + ".shtml")
        .thread(1)
        .run();
    }

    ...
}复制代码

(3)使用异步消息更新全国所有城镇天气:

在上面的代码中我们实现了如何抓取指定城镇的天气并实现其数据的持久化。那么当我们设置定时任务后,需要在一天的指定时间点更新全国所有城镇的天气该怎么做呢?

在这里一个很容易想到的思路是:首先从数据库中取出所有的城镇CODE,接着遍历调用上面的天气更新方法逐个更新每个城镇的天气。但是从中国天气网获取到的全国城镇一共有2600多个,如果使用单线程逐个更新的话那将变得非常缓慢。相反,使用ActiveMQ将每个城镇的天气更新指令当做一个消息,通过消息队列的形式不仅可以指定每个消费者的并发数同时还可以同时部署多个消费者形成消息队列集群

注:

  • 要想建立消息队列集群需要将我代码中的定时调度和消息队列这两个功能点分开到两个不同项目中才行
  • 关于ActiveMQ相关的基本用法可以参考我之前的这篇文章:www.zifangsky.cn/815.html

天气更新生产者代码:

package cn.zifangsky.activemq.producer;

import javax.annotation.Resource;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component("weatherUpdateSender")
public class WeatherUpdateSender {

    @Resource(name="jmsQueueTemplate")
    private JmsTemplate jmsTemplate;

    /**
     * 城镇天气更新发送者
     * 向接收者发送需要更新的城镇天气的stationCode
     * @param queueName 天气更新队列的名称
     * @param stationCode 城镇代码
     */
    public void updateWeather(String queueName,final String stationCode){
        jmsTemplate.convertAndSend(queueName, stationCode);
    }
}复制代码

天气更新消费者代码:

package cn.zifangsky.activemq.consumer;

import javax.annotation.Resource;

import org.springframework.stereotype.Component;

import cn.zifangsky.manager.CrawlManager;

@Component("weatherUpdateReceiver")
public class WeatherUpdateReceiver{
    @Resource(name="crawlManager")
    private CrawlManager crawlManager;

    /**
     * 接收消息并处理
     * @param stationCode
     */
    public void handle(String stationCode){
        //更新天气
        crawlManager.weatherCrawl(stationCode);
    }

}复制代码

注:对应的activeMQ的配置文件可以参考这里:github.com/zifangsky/W…

(4)使用Quartz控制全国城镇天气更新频率:

关键代码是:

package cn.zifangsky.job;

import java.text.Format;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

import javax.annotation.Resource;

import org.apache.log4j.Logger;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.quartz.QuartzJobBean;

import cn.zifangsky.activemq.producer.WeatherUpdateSender;
import cn.zifangsky.mapper.WeatherStationMapper;
import cn.zifangsky.model.WeatherStation;

/**
 * 天气定时更新任务
 * @author zifangsky
 *
 */
public class WeatherUpdateJob extends QuartzJobBean{
    private static Logger logger = Logger.getLogger(WeatherUpdateJob.class);

    @Value("${activemq.queue.weather}")
    private String weatherQueueName;

    @Resource(name="weatherUpdateSender")
    private WeatherUpdateSender weatherUpdateSender;

    @Autowired
    WeatherStationMapper weatherStationMapper;

    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        Date current = new Date();
        Format format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("开始执行天气定时更新任务,Date:" + format.format(current));
        logger.debug("开始执行天气定时更新任务,Date: " + format.format(current));

        List<WeatherStation> list = weatherStationMapper.selectAll();
        if(list != null && list.size() > 0){
            for(WeatherStation station : list){
                weatherUpdateSender.updateWeather(weatherQueueName, station.getCode());
            }

        }
    }

}复制代码

注:关于Quartz的基本用法可以参考我之前的这篇文章:www.zifangsky.cn/846.html

(5)对外发布webservice接口:

关于Apache CXF实现webservice的基本用法可以参考我之前的这两篇文章:

这里的关键代码是:

package cn.zifangsky.webservice;

import java.util.List;

import javax.jws.WebMethod;
import javax.jws.WebParam;
import javax.jws.WebService;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.xml.ws.Holder;

import org.springframework.context.annotation.Scope;

import cn.zifangsky.common.PageInfo;
import cn.zifangsky.model.WeatherWeather;
import cn.zifangsky.model.bo.WeatherWeatherBO;

@Scope("prototype")
@WebService
public interface WeatherWeatherService {
    @WebMethod
    public int deleteByPrimaryKey(@WebParam(name="id") Long id);

    @WebMethod
    public int insert(@WebParam(name="weather") WeatherWeather weather);

    @WebMethod
    public int insertSelective(@WebParam(name="weather") WeatherWeather weather);

    @WebMethod
    public WeatherWeather selectByPrimaryKey(@WebParam(name="id") Long id);

    @WebMethod
    public int updateByPrimaryKeySelective(@WebParam(name="weather") WeatherWeather weather);

    @WebMethod
    public int updateByPrimaryKey(@WebParam(name="weather") WeatherWeather weather);
    /**
     * 查询数据总数
     * @return
     */
    @WebMethod
    public Long findAllCount(@WebParam(name="weather") WeatherWeather weather);

    /**
     * 分页查询
     * @param pageInfo
     * @param city
     * @return
     */
    @WebMethod
    public List<WeatherWeather> findAll(@WebParam(name="pageInfoHolder",mode=WebParam.Mode.INOUT) Holder<PageInfo> pageInfoHolder,@WebParam(name="weather") WeatherWeather weather);

    /**
     * 通过城镇Code查询天气
     * @param stationCode
     * @return
     */
    @WebMethod
    public WeatherWeatherBO selectByStationCode(@WebParam(name="stationCode") String stationCode);

    /**
     * 通过城镇名字查询天气(模糊查询)
     * @param stationName
     * @return
     */
    @WebMethod
    public List<WeatherWeatherBO> selectByStationName(@WebParam(name="stationName") String stationName);

    /**
     * 通过城镇Code查询天气,RESTful接口
     * @param stationCode
     * @return
     */
    @GET
    @Path("/getWeatherByStationCode")
    @Produces(MediaType.APPLICATION_JSON)
    public WeatherWeatherBO selectByStationCodeRest(@QueryParam("stationCode") String stationCode);

    /**
     * 通过城镇名字查询天气(模糊查询),RESTful接口
     * @param stationName
     * @return
     */
    @GET
    @Path("/getWeatherByStationName")
    @Produces(MediaType.APPLICATION_JSON)
    public List<WeatherWeatherBO> selectByStationNameRest(@QueryParam("stationName") String stationName);
}复制代码

它对应的实现类略,请自行参考源码


三 代理IP池API开发

关于免费代理IP的获取我选择了两个数据源,它们分别是:

当然,后面具体的代码实现过程其实是跟上面的天气API开发过程是差不多的,因此我这里就不多说了,需要自己尝试的同学可以根据上面的实现思路自行参考源码即可