一个HBase数据迁移到Mongodb需求,差点葬送了我的职业生涯

1,810 阅读11分钟

目录

只要还有一根头发,说明你还能更加努力🐶

业务背景

之前公司投票系统的统计用的是 HBase 进行存储,历史数据大概是四亿条,总监说现在需要将 HBase 数据迁移到mongodb,只保存最近两年的数据,其他的数据磁盘备份就行,要求有三:

  1. 不丢数据
  2. 平滑迁移
  3. 不停机

于是作为一个刚刚毕业充满激情和热血的有志青年,我开完早会第一时刻就去百度搜索方案:

数据迁移方案
数据迁移方案

点开看了前面几条之后,摸着键盘的手有点发抖,就差留下两行清澈的泪水,然后默默的我打开了:

人生就是这么戏剧
人生就是这么戏剧

方案确定

于是我老大就带着我快乐的分析需求,主要是迁移投票统计数据,有每日投票记录,每日活动投票数记录,每个用户投票记录等等。最终确定三亿条数据分布在两张表里面,那两张数据量大的表由他迁移,另外七张表总共数据量差不多快一个亿,由我来迁移。

因为不能影响到线上的客户,所有我这边迁移方案最终是新老double write + offline data sync and check,其实就是 线上双写+线下复核

我需要迁移的表,有五张表是数据量比较小的就十万条数据左右,于是我先拿着这些表出气,这些表由于数据量不大我是进行全表迁移的,就是从 HBase 查询到所有数据直接往 mongodb 里面倒。

这是我出的数据迁移方案:

数据平滑迁移方案

采用上线期间对新增的数据进行双写策略:

  1. 先上线一版对"增删改"数据写两份数据库的措施,在代码原来只操作hbase的情况下,加上mongodb库的操作逻辑;
  2. 编写数据迁移的 python 脚本和接口,将数据从 hbase 迁移到 mongodb;
  3. 将从 hbase 查询的逻辑旁边全部加上从 mongodb 库查询的逻辑;
  4. 迁移完毕,上线的时候,将hbase逻辑全部下掉,查询全部走 mongo;
  5. 考虑到数据迁移幂等性问题,迁移代码全部写成覆盖而不是增加票数;

迁移阶段

方案确定了,一切好像都在按照排期有条不紊的进行着:

  1. 建立 mongodb 数据表,建立好索引;
  2. 上线双写代码,投票写操作同时记录在 mongodb 库,那样迁移期间投票数据记录就不会丢失;
  3. 拿着迁移数据量小的表出气,全部查出来一股脑往 mongodb 倾倒;
// 查询 hbase
public List<VoteRecordable> listVoteRecord(VoteRecordable begin, VoteRecordable end) {
        byte[] startRow = begin.getRowKey();
        byte[] endRow = end.getRowKey();
        Scan scan = new Scan(startRow, endRow);
        return this.hbaseTemplate.find(begin.getTableName(), scan, this.getRowMapper(begin.getClass()));
    }
// mongodb 入库
public void batchInsertTotalActivityVoteRecordDoc(List<TotalActivityVoteRecordDoc> totalActivityVoteRecordDocs) {
        BulkOperations operations = mongoTemplate.bulkOps(BulkOperations.BulkMode.ORDERED, TotalActivityVoteRecordDoc.class);
        List<Pair<Query, Update>> upsertList = new ArrayList<>(totalActivityVoteRecordDocs.size());
        totalActivityVoteRecordDocs.forEach(data -> {
            Query query = new Query(Criteria.where("activityId").is(data.getActivityId()));
            Update update = new Update();
            update.set("voteCount", data.getVoteCount());
            Pair<Query, Update> upsertPair = Pair.of(query, update);
            upsertList.add(upsertPair);
        });

        operations.upsert(upsertList);
        operations.execute();
    }

一次 scan 查询会返回大量数据,因此客户端发起一次scan请求,实际并不会一次就将所有数据加载到本地,而是分成多次 RPC 请求进行加载,数据量小的话可以不计较得失愉快的scan,但是数据量太大会有两个问题躲不掉:

  1. 严重消耗网络带宽,从而影响其他业务;
  2. 本地客户端发生OOM;
  3. 请求太大太集中会把 HBase 打爆,因为我的请求是 scan 方式而不是 rowKey 等值查询(等值查询的话需要拼接详细的活动ID或者投票ID);

五张表我愉快的迁移了,但是迁移大表的时候报应来了,一个是查询慢,第二个是查询完插入过程中需要new 很多对象,愉快的 OOM 了,于是这种方式我也就是想着小表先给整了,大表还要另外寻出路。

迁移优化

优化思路:

  1. scan 危险的话,那就等值查询喽,拼接要的东西我又不是没有;
  2. 查询慢要提高速度,那就多线程走起;
  3. 本地迁移太耽误事情了,放到服务器里面迁移,内存大各种访问走内网还快;
  4. 迁移期间调用接口请求失败了需要重试;

于是我在Java代码里面写好查询数据和插入数据的逻辑:

@ResponseBody
    public void insertHourlyVoteRecordDoc2(@RequestParam(value = "voteItemId") String voteItemId,
                                           @RequestParam(value = "beginDate") String beginDate,
                                           @RequestParam(value = "endDate") String endDate){
        VoteRecordable begin = new HourlyVoteRecord();
        begin.setTraceId(voteItemId);
        begin.setDate(beginDate);

        VoteRecordable end = new HourlyVoteRecord();
        end.setTraceId(voteItemId);
        end.setDate(endDate);

        List<VoteRecordable> voteRecordables = voteItemStatService.listVoteRecords(begin, end);
        System.out.println(voteRecordables.size() + "voteRecordables大小是多少 insertHourlyVoteRecordDoc");
        if (CollectionUtils.isEmpty(voteRecordables)) {
            return;
        }
        List<HourlyVoteRecordDoc> hourlyVoteRecordDocs = new ArrayList<>(voteRecordables.size());
        for (VoteRecordable voteRecordable1 : voteRecordables) {
            HourlyVoteRecord hourlyVoteRecord = (HourlyVoteRecord) voteRecordable1;
            HourlyVoteRecordDoc hourlyVoteRecordDoc = new HourlyVoteRecordDoc();
            hourlyVoteRecordDoc.setVoteItemId(new ObjectId(hourlyVoteRecord.getVoteItemId()));
            hourlyVoteRecordDoc.setVoteCount(hourlyVoteRecord.getVoteCount());
            hourlyVoteRecordDoc.setHour(hourlyVoteRecord.getHour());

            hourlyVoteRecordDocs.add(hourlyVoteRecordDoc);
        }
        voteRecordService.batchInsertHourlyVoteRecordDoc(hourlyVoteRecordDocs);

    }

然后在python写脚本进行多线程并发请求迁移:

"""
迁移
DailyVoteRecordDoc
HourlyVoteRecord
HourlyActivtiyVoteRecord
"
""

import threading
import time
from datetime import datetime, timedelta

from core_service import vote_service
from scripts.biz.vote.migrate_user_vote_records import MigrateLimitException


class MyThread(threading.Thread):
    def __init__(self, thread_id, name, archive_activities):
        threading.Thread.__init__(self)
        self.threadID = thread_id
        self.name = name
        self.archive_activities = archive_activities

    def run(self):
        print("Starting " + self.name)
        main(self.archive_activities)
        print("exiting " + self.name)


def main(archive_activities):
    """
    迁移投票记录
    :param archive_activities:
    :return:
    "
""

    final_date_end_timestamp = 1595433600

    for activity in archive_activities:
        date_start = time.strftime('%Y-%m-%d', time.localtime(int(activity['dateStart'] / 1000)))
        hour_start = time.strftime('%Y-%m-%d-%H', time.localtime(int(activity['dateStart'] / 1000)))

        date_end_timestamp = int(activity['dateEnd'] / 1000 + 24 * 60 * 60)

        if final_date_end_timestamp < date_end_timestamp:
            date_end_timestamp = final_date_end_timestamp

        date_end = time.strftime('%Y-%m-%d', time.localtime(date_end_timestamp))
        hour_end = time.strftime('%Y-%m-%d-%H', time.localtime(date_end_timestamp))
        ## 迁移 hourly_activity_vote_records
        vote_service.batch_insert_hourly_activity_vote_records(activity.get('_id'), hour_start, hour_end)
        print(activity)

        vote_items = vote_service.list_vote_items(str(activity['_id']))
        try:
            for vote_item in vote_items:
                # print("user_vote_record: activityId voteItemId score", activity.get('_id'), vote_item.get('_id'))
                # print(vote_item)
                if vote_item['score'] != 0:
                    ## 迁移 daily_vote_records
                    vote_service.batch_insert_daily_vote_records(vote_item.get('_id'), date_start, date_end)
                    ## 迁移 hourly_vote_records
                    print("开始结束时间时间", hour_start, hour_end)
                    print("开始结束日期", date_start, date_end)
                    vote_service.batch_insert_hourly_vote_records(vote_item.get('_id'), hour_start, hour_end)

        except MigrateLimitException:
            print("migrate limit error")



if __name__ == "__main__":
    // 调用Java代码接口,查询到所有的活动
    archive_activities_local = vote_service.archive_activities_v3()
    # 开启多线程
    i = 1
    available_activities = []
    threads = []

    for index, local_activity in enumerate(archive_activities_local):
        # 每循环700次开一个线程
        if index == i * 700:
            thread = MyThread(i, "Thread-" + str(i), available_activities)
            thread.start()
            threads.append(thread)
            i += 1
            available_activities = []
        available_activities.append(local_activity)

    if available_activities:
        thread = MyThread(i, "Thread-" + str(i), available_activities)
        thread.start()
        threads.append(thread)

    for t in threads:
        t.join()

请求如果失败了重试三次
@retry(3)
def batch_insert_hourly_activity_vote_records(activityId, beginDate, endDate):
    """
    批量插入活动小时投票记录
    :param activityId:
    :param beginDate:
    :param endDate:
    :return:
    "
""
    url = "http://xxx/vote/api/internal/dbMigration/insertHourlyActivityVoteRecordDoc?" \
          "activityId={activityId}&beginDate={beginDate}&endDate={endDate}".format(activityId=activityId, beginDate=beginDate, endDate=endDate)

    r = requests.session().get(url)

最终放到服务器里面跑了两个小时就跑完了。我老大的三亿条数据跑了一天跑完,我一开始以为我老大说的一天12小时(实际是24小时算的),那我想着我的数据不到他的三分之一,三小时就差不多,于是本地跑,结果跑了一整天。

这便是所有的实现了。

复盘

但是这个过程并没有我说的这么顺利和轻松,架构师给了我五天我延期了两天,我浪费时间的部分有以下几个方面:

  1. 私人原因,中间有一天看了下小破牙请了半天假,痛的灵魂出窍无心工作,但是我周末自己在家赶了很多双写代码的进度;
  2. 前期迁移还算顺利,但是由于缺少数据迁移的经验,后期大数据表迁移出现了很多意外;
  3. 大数据表迁移,而且没有处理好中间中断情况的log记录,导致我开始了不敢轻易中断,然后如果代码逻辑有bug,要从来就很要命;
  4. 预估不到数据迁移时间,其中有一天我写完逻辑在本地跑数据迁移,跑了六个小时,严重影响到了我的操作;
  5. 线下数据复核的时候,有七张表数据,对的我有点头昏眼花,然后有三张大表数据一直对不上;业务里面有礼物投票刷数据的情况一开始没有想到这个龟孙儿,导致我一直以为数据迁移有问题,拼了命的找原因又找不到逻辑漏洞;

说了这么多,菜是原罪,努力学习吧,只要还有一根头发就不放弃学习!!!

欢迎批评指正,公众号《阿甘的码路》欢迎和菜鸟号主一起成长,有收获的朋友点个在看或者分享鼓励一下吧,十分感谢~

关注我,一起成长
关注我,一起成长