大厂技术内幕:万万没想到,“字节跳动”居然是这么做数据迁移的

386 阅读5分钟

1.jpg

目标

  • 描述项目数据迁移的方案
  • 了解hbas能够e的特点
  • 能够熟悉数据迁移中的数据包装和转换
  • 能够完成文章数据的全量和增量迁移
  • 能够完成热点文章数据的迁移

1 为什么需要自动同步

因为MySQL保存着我们爬取的以及自建的数据,对于爬取的数据,数据量比较大,使用mysql 存储会影响mysql的性能,并且我们需要对数据进行流式计算,对数据进行各种统计,mysq满足不了我们的需求,我们就将mysql中的全量数据同步到HBASE中,由HBASE保存海量数据,mysql中的全量数据会定期进行删除。

HBASE中保存着海量数据,我们需要计算出热点数据,并将数据同步到mysql以及MONGODB中,mysql中保存主体关系数据,MONGODB保存着具体数据信息。

因为热点数据也会失效,今天是热数据,明天就不是了,也需要定期对热点数据进行删除,我们定时删除一个月之前的热点数据,保持本月的热数据。

2 迁移方案

2.1 需求分析

2.1.1 功能需求

有了大量数据集基础后,实时计算后的热点数据需要保存起来,因为mysql保存大量文章数据会影响mysql的性能,所以采用mysql+mongoDB的方式进行存储。

2.1.1 全量数据迁移方案

通过定时任务将mysql中爬取或者自建的文章同步到HBASE中,并将同步过的数据状态改为已同步,下次同步的时候就不会再次同步这些数据了。

2.1.2 热数据迁移方案

HBASE中有全量数据,大数据端计算出热点数据,需要将这些热点数据同步到MYSQL和MONGDB中,用于页面显示

2.jpg

2.2 设计思路

将mysql数据库中的全量数据定时读取出来,将多个对象打包成一个对象,保存到HBASE中,保存成功后更新数据库中的状态改为已同步,下一次就不会同步该条数据了。

使用KAFKA监听热点数据计算结果,接收到热点数据信息后,从HBASE得到打包的数据,并将数据进行拆分,将关系数据保存到mysql中,将具体数据保存到mongodb中。

因为热点数据会失效,定期清除mysql和mongodb中的过期数据

2.3 数据同步注意的问题

HBASE数据主要靠rowKey进行查询的,rowKey设计就用mysql中的主键ID作为rowKey,查询的时候直接根据Rowkey获取数据

因为需要同步到HBASE的数据是多个数据表的数据,一条数据由多个对象组成,存储的时候使用列族区分不同的对象,里面存储不同的字段。

3 项目中集成hbase与Mongodb

在leadnews-common 集成:

  • hbase.properties

3.jpg

  • mongo.properties

4.jpg

  • pom.xml
<!--mongoDB-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<!--HBase-->
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>2.1.5</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
        </exclusion>
        <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
        </exclusion>
        <exclusion>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
        </exclusion>
        <exclusion>
            <artifactId>slf4j-log4j12</artifactId>
            <groupId>org.slf4j</groupId>
        </exclusion>
    </exclusions>
</dependency>

host文件配置: 在服务器host文件中配置域名,根据自己的服务器地址更改

172.16.1.52 javaedge

4 常用组件介绍

4.1 Hbase相关操作

Hbase 操作工具类用于将数据存储到Hbase中,其中有些方法用于存储或删除。

4.1.1 项目导入

导入资料文件夹中的项目leadnews-migration

4.1.2 公共存储类

StorageData

公共存储数据表,由多个StorageEntity组成

StorageData 是最重要的一个存储对象,它是保存一个bean信息的类,负责存储bean信息以及转换和反向转换bean 。

该类用到一个重要的工具类ReflectUtils 反射工具类和DataConvertUtils数据类型转换工具类主要用于日期类型的转换

主要方法

  • 添加StorageEntry方法
public void addStorageEntry(StorageEntry entry)

该方法有几个重载方法,用于向StorageEntry列表中添加StorageEntry对象的

  • 获取该对象对应的Object对象
public Object getObjectValue() 

该方法用于将存储的实体数据转换为Bean的实体,用了ReflectUtils反射工具类进行操作

  • 将Bean 转换为StorageData的存储结构
public static StorageData getStorageData(Object bean) 

该方法用于将不同的bean转换为同一种存储结构进行存储

StorageEntity

公共代码存储的实体

StorageEntry

公共存储对象的一个key-value的字段

4.1.3 Hbase操作相关工具类

(1)HBaseConstants 类

配置类Hbase存储的的表名称

public class HBaseConstants {
    public static final String APARTICLE_QUANTITY_TABLE_NAME = "APARTICLE_QUANTITY_TABLE_NAME";
}

(2)HBaseInvok

hbase的的回调操作类
/**
 * Hbase 的回调类
 * 用于我们操作的时候就行回调
 */
public interface HBaseInvok {
    /**
     * 回调方法
     */
    public void invok();
}

(3)HBaseStorage

hbase 的存储对象 继承自StorageEntity

(4)HBaseClent

hbase client操作的工具类

(5)HBaseConfig

用于将HbaseClient对象的相关配置

(6)HBaseStorageClient

Hbase 存储客户端工具类 是对HbaseClient工具类的封装

这个类是自己封装的存储客户端

该类位于heima-leadnews-common 包下的 com.heima.hbase.HBaseStorageClient

其中用到了HBaseClent 客户端工具,它是一个操作工具类,不需要我们具体地写拿过来用就可以

4.1.4 测试代码

@SpringBootTest
@RunWith(SpringRunner.class)
public class HbaseTest {

    @Autowired
    private HBaseClent hBaseClent;

    @Test
    public void testCreateTable(){

        List<String> columnFamily = new ArrayList<>();
        columnFamily.add("test_cloumn_family1");
        columnFamily.add("test_cloumn_family2");
        boolean ret = hBaseClent.creatTable("hbase_test_table_name", columnFamily);
    }

    @Test
    public void testDelTable(){
        hBaseClent.deleteTable("hbase_test_table_name");
    }

    @Test
    public void testSaveData(){
        String []columns ={"name","age"};
        String [] values = {"zhangsan","28"};
        hBaseClent.putData("hbase_test_table_name","test_row_key_001","test_cloumn_family1",columns,values);
    }

    @Test
    public void testFindByRowKey(){
        Result hbaseResult = hBaseClent.getHbaseResult("hbase_test_table_name", "test_row_key_001");
        System.out.println(hbaseResult);
    }
}

4.2 MongoDB操作工具类

mongoDB是一个文档型数据库,也需要存储多个不同的对象,我们也用到了HBASE中用到的StorageEntity 存储结构,我们下面会讲

我们用到了Spring MongoTemplate 来操作数据库

介绍一下我们的实体

(1)MongoConstant

mongoDB操作的常量定义了操作mongodb的表名称

代码位置: com.heima.common.mongo.constants.MongoConstant

public class MongoConstant {
    public static final String APARTICLE_MIGRATION_TABLE = "APARTICLE_MIGRATION_TABLE";
}

(2)MongoStorageEntity

MongoStorageEntity 是我们存储MongoDB数据的存储结构主要是基于StorageEntity 结构来的

mongoDB操作的实体类继承了StorageEntity 制定了 表明以及实体类型

代码位置: com.heima.common.mongo.entity.MongoStorageEntity

(2)MongoStorageEntity

MongoStorageEntity 是我们存储MongoDB数据的存储结构主要是基于StorageEntity 结构来的

mongoDB操作的实体类继承了StorageEntity 制定了 表明以及实体类型

代码位置:com.heima.common.mongo.entity.MongoStorageEntity

(3)MongoDBconfigure

对mongdb操作的配置类

代码位置: com.heima.common.mongo.MongoDBconfigure

@Configuration
@PropertySource("classpath:mongo.properties")
public class MongoDBconfigure {
    @Value("${mongo.host}")
    private String host;
    @Value("${mongo.port}")
    private int port;
    @Value("${mongo.dbname}")
    private String dbName;
    @Bean
    public MongoTemplate getMongoTemplate() {
        return new MongoTemplate(getSimpleMongoDbFactory());
    }
    public SimpleMongoDbFactory getSimpleMongoDbFactory() {
        return new SimpleMongoDbFactory(new MongoClient(host, port), dbName);
    }
}

(4)测试代码

@SpringBootTest(classes = MigrationApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class MongoTest {

    @Autowired
    private MongoTemplate mongotemplate;

    @Autowired
    private HBaseStorageClient hBaseStorageClient;

    @Test
    public void test() {
        Class<?>[] classes = new Class<?>[]{ApArticle.class, ApArticleContent.class, ApAuthor.class};
        //List<Object> entityList = hBaseStorageClient.getHbaseDataEntityList(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, "1", Arrays.asList(classes));
        List<String> strList = Arrays.asList(classes).stream().map(x -> x.getName()).collect(Collectors.toList());
        List<StorageData> storageDataList = hBaseStorageClient.gethBaseClent().getStorageDataList(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, "1", strList);
        MongoStorageEntity mongoStorageEntity = new MongoStorageEntity();
        mongoStorageEntity.setDataList(storageDataList);
        mongoStorageEntity.setRowKey("1");
        MongoStorageEntity tmp = mongotemplate.findById("1", MongoStorageEntity.class);
        if (null != tmp) {
            mongotemplate.remove(tmp);
        }
        MongoStorageEntity tq = mongotemplate.insert(mongoStorageEntity);
        System.out.println(tq);

    }

    @Test
    public void test1() {
        MongoStorageEntity mongoStorageEntity = mongotemplate.findById("1", MongoStorageEntity.class);
        if (null != mongoStorageEntity && null != mongoStorageEntity.getDataList()) {
            mongoStorageEntity.getDataList().forEach(x -> {
                System.out.println(x.getObjectValue());
            });
        }
    }
}

5 业务层代码

5.1 Habse操作实体类

(1)ArticleCallBack Hbase相关回调操作的工具类

5.jpg

(2)ArticleHBaseInvok Hbase 对回调对象的封装,以及对回调的invoke执行对象

6.jpg

(3)ArticleQuantity 对整个需要存储的对象的封装。

5.2 文章配置接口

5.2.1 mapper

ApArticleConfigMapper中新增方法

7.jpg

ApArticleConfigMapper.xml

8.jpg

5.2.2 service

对文章配置操作的service

9.jpg

ApArticleConfigServiceImpl是对ApArticleConfig的操作

10.jpg

5.3 文章内容接口

5.3.1 mapper定义

ApArticleContentMapper新增方法

 List<ApArticleContent> selectByArticleIds(List<String> articleIds);

5.3.2 service

对文章内容操作的Service

public interface ApArticleContenService {
    List<ApArticleContent> queryByArticleIds(List<String> ids);
    ApArticleContent getByArticleIds(Integer id);
}

ApArticleContenServiceImpl

对ApArticleConten相关的操作

代码位置: com.heima.migration.service.impl.ApArticleContenServiceImpl

@Service
public class ApArticleContenServiceImpl implements ApArticleContenService {

    @Autowired
    private ApArticleContentMapper apArticleContentMapper;

    @Override
    public List<ApArticleContent> queryByArticleIds(List<String> ids) {
        return apArticleContentMapper.selectByArticleIds(ids);
    }

    @Override
    public ApArticleContent getByArticleIds(Integer id) {
        return apArticleContentMapper.selectByArticleId(id);
    }
}

7.5 文章作者接口

7.5.1 mapper定义

ApAuthorMapper

 List<ApAuthor> selectByIds(List<Integer> ids);

ApAuthorMapper.xml

<select id="selectByIds" resultMap="BaseResultMap">
    select * from ap_author
    where id in
    <foreach item="item" index="index" collection="list" open="(" separator="," close=")">
        #{item}
    </foreach>
</select>

7.5.2 service

对ApAuthor操作的Service

接口位置 :com.heima.migration.service.ApAuthorService

public interface ApAuthorService {
    List<ApAuthor> queryByIds(List<Integer> ids);
    ApAuthor getById(Long id);
}

ApAuthorServiceImpl

对ApAuthor相关的操作

代码位置 :com.heima.migration.service.impl.ApAuthorServiceImpl

@Service
public class ApAuthorServiceImpl implements ApAuthorService {
    @Autowired
    private ApAuthorMapper apAuthorMapper;
    @Override
    public List<ApAuthor> queryByIds(List<Integer> ids) {
        return apAuthorMapper.selectByIds(ids);
    }
    @Override
    public ApAuthor getById(Long id) {
        if (null != id) {
            return apAuthorMapper.selectById(id.intValue());
        }
        return null;
    }
}

7.6 综合迁移接口

ArticleQuantityService

操作ArticleQuantity对象的Service ArticleQuantity对象封装了文章相关的数据

接口位置: com.heima.migration.service.ArticleQuantityService

public interface ArticleQuantityService {
    /**
     * 获取ArticleQuantity列表
     * @return
     */
    public List<ArticleQuantity> getArticleQuantityList();
    /**
     * 根据ArticleId获取ArticleQuantity
     * @param id
     * @return
     */
    public ArticleQuantity getArticleQuantityByArticleId(Long id);
    /**
     * 根据ByArticleId从Hbase中获取ArticleQuantity
     * @param id
     * @return
     */
    public ArticleQuantity getArticleQuantityByArticleIdForHbase(Long id);
    /**
     * 数据库到Hbase的同步
     */
    public void dbToHbase();
    /**
     * 根据articleId 将数据库的数据同步到Hbase
     * @param articleId
     */
    public void dbToHbase(Integer articleId);
}

ArticleQuantityServiceImpl

对ArticleQuantity的相关操作

代码位置: com.heima.migration.service.impl.ArticleQuantityServiceImpl

/**
 * 查询未同步的数据,并封装成ArticleQuantity 对象
 */
@Service
@Log4j2
public class ArticleQuantityServiceImpl implements ArticleQuantityService {

    @Autowired
    private ApArticleContenService apArticleContenService;
    @Autowired
    private ApArticleConfigService apArticleConfigService;
    @Autowired
    private ApAuthorService apAuthorService;

    @Autowired
    private HBaseStorageClient hBaseStorageClient;

    @Autowired
    private ApArticleService apArticleService;

    /**
     * 查询位同步数据的列表
     *
     * @return
     */
    public List<ArticleQuantity> getArticleQuantityList() {
        log.info("生成ArticleQuantity列表");
        //查询未同步的庶数据
        List<ApArticle> apArticleList = apArticleService.getUnsyncApArticleList();
        if (apArticleList.isEmpty()) {
            return null;
        }
        //获取ArticleId 的list
        List<String> apArticleIdList = apArticleList.stream().map(apArticle -> String.valueOf(apArticle.getId())).collect(Collectors.toList());
        //获取AuthorId 的 list
        List<Integer> apAuthorIdList = apArticleList.stream().map(apAuthor -> apAuthor.getAuthorId() == null ? null : apAuthor.getAuthorId().intValue()).filter(x -> x != null).collect(Collectors.toList());
        //根据apArticleIdList 批量查询出内容列表
        List<ApArticleContent> apArticleContentList = apArticleContenService.queryByArticleIds(apArticleIdList);
        //根据apArticleIdList 批量查询出配置列表
        List<ApArticleConfig> apArticleConfigList = apArticleConfigService.queryByArticleIds(apArticleIdList);
        //根据apAuthorIdList 批量查询出作者列
        List<ApAuthor> apAuthorList = apAuthorService.queryByIds(apAuthorIdList);

        //将不同的对象转换为 ArticleQuantity 对象
        List<ArticleQuantity> articleQuantityList = apArticleList.stream().map(apArticle -> {
            return new ArticleQuantity() {{
                //设置apArticle 对象
                setApArticle(apArticle);
                // 根据apArticle.getId() 过滤出符合要求的 ApArticleContent 对象
                List<ApArticleContent> apArticleContents = apArticleContentList.stream().filter(x -> x.getArticleId().equals(apArticle.getId())).collect(Collectors.toList());
                if (null != apArticleContents && !apArticleContents.isEmpty()) {
                    setApArticleContent(apArticleContents.get(0));
                }
                // 根据 apArticle.getId 过滤出 ApArticleConfig 对象
                List<ApArticleConfig> apArticleConfigs = apArticleConfigList.stream().filter(x -> x.getArticleId().equals(apArticle.getId())).collect(Collectors.toList());
                if (null != apArticleConfigs && !apArticleConfigs.isEmpty()) {
                    setApArticleConfig(apArticleConfigs.get(0));
                }
                // 根据 apArticle.getAuthorId().intValue() 过滤出 ApAuthor 对象
                List<ApAuthor> apAuthors = apAuthorList.stream().filter(x -> x.getId().equals(apArticle.getAuthorId().intValue())).collect(Collectors.toList());
                if (null != apAuthors && !apAuthors.isEmpty()) {
                    setApAuthor(apAuthors.get(0));
                }
                //设置回调方法 用户方法的回调 用于修改同步状态 插入Hbase 成功后同步状态改为已同步
                setHBaseInvok(new ArticleHBaseInvok(apArticle, (x) -> apArticleService.updateSyncStatus(x)));
            }};
        }).collect(Collectors.toList());
        if (null != articleQuantityList && !articleQuantityList.isEmpty()) {
            log.info("生成ArticleQuantity列表完成,size:{}", articleQuantityList.size());
        } else {
            log.info("生成ArticleQuantity列表完成,size:{}", 0);
        }

        return articleQuantityList;
    }

    public ArticleQuantity getArticleQuantityByArticleId(Long id) {
        if (null == id) {
            return null;
        }
        ArticleQuantity articleQuantity = null;
        ApArticle apArticle = apArticleService.getById(id);
        if (null != apArticle) {
            articleQuantity = new ArticleQuantity();
            articleQuantity.setApArticle(apArticle);
            ApArticleContent apArticleContent = apArticleContenService.getByArticleIds(id.intValue());
            articleQuantity.setApArticleContent(apArticleContent);
            ApArticleConfig apArticleConfig = apArticleConfigService.getByArticleId(id.intValue());
            articleQuantity.setApArticleConfig(apArticleConfig);
            ApAuthor apAuthor = apAuthorService.getById(apArticle.getAuthorId());
            articleQuantity.setApAuthor(apAuthor);
        }
        return articleQuantity;
    }

    public ArticleQuantity getArticleQuantityByArticleIdForHbase(Long id) {
        if (null == id) {
            return null;
        }
        ArticleQuantity articleQuantity = null;
        List<Class> typeList = Arrays.asList(ApArticle.class, ApArticleContent.class, ApArticleConfig.class, ApAuthor.class);
        List<Object> objectList = hBaseStorageClient.getStorageDataEntityList(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, DataConvertUtils.toString(id), typeList);
        if (null != objectList && !objectList.isEmpty()) {
            articleQuantity = new ArticleQuantity();
            for (Object value : objectList) {
                if (value instanceof ApArticle) {
                    articleQuantity.setApArticle((ApArticle) value);
                } else if (value instanceof ApArticleContent) {
                    articleQuantity.setApArticleContent((ApArticleContent) value);
                } else if (value instanceof ApArticleConfig) {
                    articleQuantity.setApArticleConfig((ApArticleConfig) value);
                } else if (value instanceof ApAuthor) {
                    articleQuantity.setApAuthor((ApAuthor) value);
                }
            }
        }
        return articleQuantity;
    }

    /**
     * 数据库到Hbase同步
     */
    public void dbToHbase() {
        long cutrrentTime = System.currentTimeMillis();
        List<ArticleQuantity> articleQuantitList = getArticleQuantityList();
        if (null != articleQuantitList && !articleQuantitList.isEmpty()) {
            log.info("开始进行定时数据库到HBASE同步,筛选出未同步数据量:{}", articleQuantitList.size());
            if (null != articleQuantitList && !articleQuantitList.isEmpty()) {
                List<HBaseStorage> hbaseStorageList = articleQuantitList.stream().map(ArticleQuantity::getHbaseStorage).collect(Collectors.toList());
                hBaseStorageClient.addHBaseStorage(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, hbaseStorageList);
            }
        } else {
            log.info("定时数据库到HBASE同步为筛选出数据");
        }

        log.info("定时数据库到HBASE同步结束,耗时:{}", System.currentTimeMillis() - cutrrentTime);
    }

    @Override
    public void dbToHbase(Integer articleId) {
        long cutrrentTime = System.currentTimeMillis();
        log.info("开始进行异步数据库到HBASE同步,articleId:{}", articleId);
        if (null != articleId) {
            ArticleQuantity articleQuantity = getArticleQuantityByArticleId(articleId.longValue());
            if (null != articleQuantity) {
                HBaseStorage hBaseStorage = articleQuantity.getHbaseStorage();
                hBaseStorageClient.addHBaseStorage(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, hBaseStorage);
            }
        }
        log.info("异步数据库到HBASE同步结束,articleId:{},耗时:{}", articleId, System.currentTimeMillis() - cutrrentTime);
    }
}

7.7 热点文章接口

ApHotArticleService

对ApHotArticle操作Service

接口位置: com.heima.migration.service.ApHotArticleService

public interface ApHotArticleService {
    List<ApHotArticles> selectList(ApHotArticles apHotArticlesQuery);
    void insert(ApHotArticles apHotArticles);
    /**
     * 热数据 Hbase 同步
     *
     * @param apArticleId
     */
    public void hotApArticleSync(Integer apArticleId);
    void deleteById(Integer id);
    /**
     * 查询过期的数据
     *
     * @return
     */
    public List<ApHotArticles> selectExpireMonth();
    void deleteHotData(ApHotArticles apHotArticle);
}

ApHotArticleServiceImpl

对ApHotArticle的相关操作

代码位置: com.heima.migration.service.impl.ApHotArticleServiceImpl

/**
 * 热点数据操作Service 类
 */
@Service
@Log4j2
public class ApHotArticleServiceImpl implements ApHotArticleService {

    @Autowired
    private ApHotArticlesMapper apHotArticlesMapper;

    @Autowired
    private MongoTemplate mongoTemplate;

    @Autowired
    private ArticleQuantityService articleQuantityService;

    @Autowired
    private HBaseStorageClient hBaseStorageClient;

    @Override
    public List<ApHotArticles> selectList(ApHotArticles apHotArticlesQuery) {
        return apHotArticlesMapper.selectList(apHotArticlesQuery);
    }

    /**
     * 根据ID删除
     *
     * @param id
     */
    @Override
    public void deleteById(Integer id) {
        log.info("删除热数据,apArticleId:{}", id);
        apHotArticlesMapper.deleteById(id);
    }

    /**
     * 查询一个月之前的数据
     *
     * @return
     */
    @Override
    public List<ApHotArticles> selectExpireMonth() {
        return apHotArticlesMapper.selectExpireMonth();
    }

    /**
     * 删除过去的热数据
     *
     * @param apHotArticle
     */
    @Override
    public void deleteHotData(ApHotArticles apHotArticle) {
        deleteById(apHotArticle.getId());
        String rowKey = DataConvertUtils.toString(apHotArticle.getId());
        hBaseStorageClient.gethBaseClent().deleteRow(HBaseConstants.APARTICLE_QUANTITY_TABLE_NAME, rowKey);
        MongoStorageEntity mongoStorageEntity = mongoTemplate.findById(rowKey, MongoStorageEntity.class);
        if (null != mongoStorageEntity) {
            mongoTemplate.remove(mongoStorageEntity);
        }
    }

    /**
     * 插入操作
     *
     * @param apHotArticles
     */
    @Override
    public void insert(ApHotArticles apHotArticles) {
        apHotArticlesMapper.insert(apHotArticles);
    }

    /**
     * 热点数据同步方法
     *
     * @param apArticleId
     */
    @Override
    public void hotApArticleSync(Integer apArticleId) {
        log.info("开始将热数据同步,apArticleId:{}", apArticleId);
        ArticleQuantity articleQuantity = getHotArticleQuantity(apArticleId);
        if (null != articleQuantity) {
            //热点数据同步到DB中
            hotApArticleToDBSync(articleQuantity);
            //热点数据同步到MONGO
            hotApArticleMongoSync(articleQuantity);
            log.info("热数据同步完成,apArticleId:{}", apArticleId);
        } else {
            log.error("找不到对应的热数据,apArticleId:{}", apArticleId);
        }
    }

    /**
     * 获取热数据的ArticleQuantity 对象
     *
     * @param apArticleId
     * @return
     */
    private ArticleQuantity getHotArticleQuantity(Integer apArticleId) {
        Long id = Long.valueOf(apArticleId);
        ArticleQuantity articleQuantity = articleQuantityService.getArticleQuantityByArticleId(id);
        if (null == articleQuantity) {
            articleQuantity = articleQuantityService.getArticleQuantityByArticleIdForHbase(id);
        }
        return articleQuantity;
    }

    /**
     * 热数据 到数据库Mysql的同步
     *
     * @param articleQuantity
     */
    public void hotApArticleToDBSync(ArticleQuantity articleQuantity) {
        Integer apArticleId = articleQuantity.getApArticleId();
        log.info("开始将热数据从Hbase同步到mysql,apArticleId:{}", apArticleId);
        if (null == apArticleId) {
            log.error("apArticleId不存在无法进行同步");
            return;
        }
        ApHotArticles apHotArticlesQuery = new ApHotArticles() {{
            setArticleId(apArticleId);
        }};
        List<ApHotArticles> apHotArticlesList = apHotArticlesMapper.selectList(apHotArticlesQuery);
        if (null != apHotArticlesList && !apHotArticlesList.isEmpty()) {
            log.info("Mysql数据已同步过不需要再次同步,apArticleId:{}", apArticleId);
        } else {
            ApHotArticles apHotArticles = articleQuantity.getApHotArticles();
            apHotArticlesMapper.insert(apHotArticles);
        }
        log.info("将热数据从Hbase同步到mysql完成,apArticleId:{}", apArticleId);
    }

    /**
     * 热数据向从Hbase到Mongodb同步
     *
     * @param articleQuantity
     */
    public void hotApArticleMongoSync(ArticleQuantity articleQuantity) {
        Integer apArticleId = articleQuantity.getApArticleId();
        log.info("开始将热数据从Hbase同步到MongoDB,apArticleId:{}", apArticleId);
        if (null == apArticleId) {
            log.error("apArticleId不存在无法进行同步");
            return;
        }
        String rowKeyId = DataConvertUtils.toString(apArticleId);
        MongoStorageEntity mongoStorageEntity = mongoTemplate.findById(rowKeyId, MongoStorageEntity.class);
        if (null != mongoStorageEntity) {
            log.info("MongoDB数据已同步过不需要再次同步,apArticleId:{}", apArticleId);
        } else {
            List<StorageData> storageDataList = articleQuantity.getStorageDataList();
            if (null != storageDataList && !storageDataList.isEmpty()) {
                mongoStorageEntity = new MongoStorageEntity();
                mongoStorageEntity.setDataList(storageDataList);
                mongoStorageEntity.setRowKey(rowKeyId);
                mongoTemplate.insert(mongoStorageEntity);
            }

        }
        log.info("将热数据从Hbase同步到MongoDB完成,apArticleId:{}", apArticleId);
    }
}

8 定时同步数据

8.1 全量数据从mysql同步到HBase

@Component
@DisallowConcurrentExecution
@Log4j2
/**
 * 全量数据从mysql 同步到HBase
 */
public class MigrationDbToHBaseQuartz extends AbstractJob {

    @Autowired
    private ArticleQuantityService articleQuantityService;

    @Override
    public String[] triggerCron() {
        /**
         * 2019/8/9 10:15:00
         * 2019/8/9 10:20:00
         * 2019/8/9 10:25:00
         * 2019/8/9 10:30:00
         * 2019/8/9 10:35:00
         */
        return new String[]{"0 0/5 * * * ?"};
    }

    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        log.info("开始进行数据库到HBASE同步任务");
        articleQuantityService.dbToHbase();
        log.info("数据库到HBASE同步任务完成");
    }

}

8.2 定期删除过期的数据

/**
 * 定期删除过期的数据
 */
@Component
@Log4j2
public class MigrationDeleteHotDataQuartz extends AbstractJob {

    @Autowired
    private ApHotArticleService apHotArticleService;

    @Override
    public String[] triggerCron() {
        /**
         * 2019/8/9 22:30:00
         * 2019/8/10 22:30:00
         * 2019/8/11 22:30:00
         * 2019/8/12 22:30:00
         * 2019/8/13 22:30:00
         */
        return new String[]{"0 30 22 * * ?"};
    }

    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        long cutrrentTime = System.currentTimeMillis();
        log.info("开始删除数据库过期数据");
        deleteExpireHotData();
        log.info("删除数据库过期数据结束,耗时:{}", System.currentTimeMillis() - cutrrentTime);
    }

    /**
     * 删除过期的热数据
     */
    public void deleteExpireHotData() {
        List<ApHotArticles> apHotArticlesList = apHotArticleService.selectExpireMonth();
        if (null != apHotArticlesList && !apHotArticlesList.isEmpty()) {
            for (ApHotArticles apHotArticle : apHotArticlesList) {
                apHotArticleService.deleteHotData(apHotArticle);
            }
        }
    }

}

9 消息接收同步数据

9.1 文章审核成功同步

9.1.1 消息发送

(1)消息名称定义及消息发送方法声明

maven_test.properties

kafka.topic.article-audit-success=kafka.topic.article.audit.success.sigle.test

kafka.properties

kafka.topic.article-audit-success=${kafka.topic.article-audit-success}

com.heima.common.kafka.KafkaTopicConfig新增属性

/**
     * 审核成功
     */
String articleAuditSuccess;

com.heima.common.kafka.KafkaSender


/**
     * 发送审核成功消息
     */
public void sendArticleAuditSuccessMessage(ArticleAuditSuccess message) {
    ArticleAuditSuccessMessage temp = new ArticleAuditSuccessMessage();
    temp.setData(message);
    this.sendMesssage(kafkaTopicConfig.getArticleAuditSuccess(), UUID.randomUUID().toString(), temp);
}

(2)修改自动审核代码,爬虫和自媒体都要修改

在审核成功后,发送消息

爬虫

//文章审核成功
ArticleAuditSuccess articleAuditSuccess = new ArticleAuditSuccess();
articleAuditSuccess.setArticleId(apArticle.getId());
articleAuditSuccess.setType(ArticleAuditSuccess.ArticleType.CRAWLER);
articleAuditSuccess.setChannelId(apArticle.getChannelId());

kafkaSender.sendArticleAuditSuccessMessage(articleAuditSuccess);

自媒体

//文章审核成功
ArticleAuditSuccess articleAuditSuccess = new ArticleAuditSuccess();
articleAuditSuccess.setArticleId(apArticle.getId());
articleAuditSuccess.setType(ArticleAuditSuccess.ArticleType.MEDIA);
articleAuditSuccess.setChannelId(apArticle.getChannelId());

kafkaSender.sendArticleAuditSuccessMessage(articleAuditSuccess);

9.1.2消息接收

/**
 * 热点文章监听类
 */
@Component
@Log4j2
public class MigrationAuditSucessArticleListener implements KafkaListener<String, String> {
    /**
     * 通用转换mapper
     */
    @Autowired
    ObjectMapper mapper;
    /**
     * kafka 主题 配置
     */
    @Autowired
    KafkaTopicConfig kafkaTopicConfig;

    @Autowired
    private ArticleQuantityService articleQuantityService;

    @Override
    public String topic() {
        return kafkaTopicConfig.getArticleAuditSuccess();
    }

    /**
     * 监听消息
     *
     * @param data
     * @param consumer
     */
    @Override
    public void onMessage(ConsumerRecord<String, String> data, Consumer<?, ?> consumer) {
        log.info("kafka接收到审核通过消息:{}", data);
        String value = (String) data.value();
        if (null != value) {
            ArticleAuditSuccessMessage message = null;
            try {
                message = mapper.readValue(value, ArticleAuditSuccessMessage.class);
            } catch (IOException e) {
                e.printStackTrace();
            }
            ArticleAuditSuccess auto = message.getData();
            if (null != auto) {
                //调用方法 将HBAESE中的热数据进行同步
                Integer articleId = auto.getArticleId();
                if (null != articleId) {
                    articleQuantityService.dbToHbase(articleId);
                }
            }
        }

    }
}

9.2 热点文章同步

创建监听类: com.heima.migration.kafka.listener.MigrationHotArticleListener

/**
 * 热点文章监听类
 */
@Component
@Log4j2
public class MigrationHotArticleListener implements KafkaListener<String, String> {
    /**
     * 通用转换mapper
     */
    @Autowired
    ObjectMapper mapper;
    /**
     * kafka 主题 配置
     */
    @Autowired
    KafkaTopicConfig kafkaTopicConfig;
    /**
     * 热点文章service注入
     */
    @Autowired
    private ApHotArticleService apHotArticleService;

    @Override
    public String topic() {
        return kafkaTopicConfig.getHotArticle();
    }

    /**
     * 监听消息
     *
     * @param data
     * @param consumer
     */
    @Override
    public void onMessage(ConsumerRecord<String, String> data, Consumer<?, ?> consumer) {
        log.info("kafka接收到热数据同步消息:{}", data);
        String value = (String) data.value();
        if (null != value) {
            ApHotArticleMessage message = null;
            try {
                message = mapper.readValue(value, ApHotArticleMessage.class);
            } catch (IOException e) {
                e.printStackTrace();
            }
            Integer articleId = message.getData().getArticleId();
            if (null != articleId) {
                //调用方法 将HBAESE中的热数据进行同步
                apHotArticleService.hotApArticleSync(articleId);
            }
        }
    }
}