MongoDB :ChangeStream 可以用来干什么

2,336 阅读7分钟

👈👈👈 欢迎点赞收藏关注哟

首先分享之前的所有文章 >>>> 😜😜😜
文章合集 : 🎁 juejin.cn/post/694164…
Github : 👉 github.com/black-ant
CASE 备份 : 👉 gitee.com/antblack/ca…

一. 前言

对于 MySQL 我们可以通过 Canal 实现数据的同步,而对于 MongoDB 有其对应的实现方案,即为 ChangeStream

一句话概念

  • ChangeStream 本身基于 MongoDB Oplog 实现
  • 支持副本集分片集群且单机会报错的
  • 支持 WiredTiger 引擎和 In-Memory 引擎
  • 支持以全DB,单DB,单个表
  • 支持所有的 DML (增删改查)和部分 DDL (dropCollection,dropDatabase,renameCollection)

适用场景

其实主要就是一个数据分发的作用,和 Canal 的思路一样 , 基于 log 实现的转发,这样的好处是不会阻塞正常的操作,相当于进行异步处理。

适用的场景也是类型的 : 数据同步,数据清洗,数据分析 ,消息推送。 基于这些大的类型可以实现更细致的功能,例如发消息,做图表,玩营销等,同步上下游等...

二. ChangeStreams 的使用方式

2.1 基础版本

@Autowired
private MongoTemplate mongoTemplate;

public void doChange() {
    MongoCollection<Document> collection = mongoTemplate.getCollection("user");

    // 获取 Change Streams 游标
    MongoCursor<ChangeStreamDocument<Document>> cursor = collection.watch().iterator();

    // 处理变更事件
    while (cursor.hasNext()) {
        ChangeStreamDocument<Document> change = cursor.next();
        Document fullDocument = change.getFullDocument();
        UpdateDescription updateDescription = change.getUpdateDescription();
        System.out.println("Change event: " + fullDocument);
        System.out.println("Update event: " + updateDescription);
    }

    // 关闭游标和连接
    cursor.close();
}

2.2 复杂版本

MongoCursor 进行 ChangeStream 是会一直挂起的,但是中断后对于已经处理的就需要跳过了 :

自定义位点和断点续传

常规的思路是通过手动设置位点来确保查询从指定的位置开始,这就涉及到 ChangeStreamIterable 中的几个方法 :

  • resumeAfter : 设置一个标记,标识重连后通过指定的 resumeToken 后面的数据进行监听
  • maxAwaitTime :设置在等待新变更事件时的最长等待时间。一旦超过指定时间,游标将返回空
  • batchSize每批次返回的变更数量
  • listen : 执行 Change Stream 监听,并返回一个游标来处理变更事件

那么流程就比较清楚了 :

  • S1 : 定时任务或者系统触发 ChangeStream
  • S2 : 从特定的位置获取保存的 resumeToken String (Redis 或者其他的存储介质中)
  • S3 : 通过 resumeAfter 方法指定 ResumeToken从该 Token 进行处理
  • S4 : 通过定时任务或者业务处理把当前处理的 最新 的 Token 存储到库中

方法大同小异,核心就是找个地方存当前处理的游标,然后通过 resumeAfter 进行处理。

// 设置断点续传点位
changeStreamIterable.resumeAfter(BsonDocument.parse(getLastResumeToken())).iterator();

不过,一般用上了 MongoDB 说明数据的量级已经不小了,去节省线程避免循环其实没啥意义,但是断点续传是很有必要的!!谁也不能保证服务器不宕机!!

2.3 扩展功能

// watch : 支持查询语句
List<Bson> search = singletonList(Aggregates.match(Filters.in("userName", "test")));
collection.watch(search).iterator();

三. 原理简述

3.1 源码解析

核心类和方法

C- MongoCursor : 用于迭代查询结果的接口,标识一个游标查询
    M- next() : 获取游标的下一个文档 , 每次游标会前进到下一个
    M- hasNext() : 检查是否还有下一个文档可供迭代 , 返回 false / true
    M- tryNext() : 尝试获取下一个文档 , 

操作主流程

流程一 : watch 做了什么

// S1 : 创建一个 ChangeStreamIterable 用于迭代查询
public <TResult> ChangeStreamIterable<TResult> watch(final List<? extends Bson> pipeline, final Class<TResult> resultClass) {
        return createChangeStreamIterable(null, pipeline, resultClass);
}

- 通过 iterator 获取到 MongoCursor 
- 而核心的流程就是 MongoCursor next 进行迭代


// S2 : hasNext 中判断是否存在更多数据
do {
    // 在 下面会通过 initFromCommandResult 方法往 nextBatch 中存入数据
    this.getMore();
    if (this.closed) {
        throw new IllegalStateException("Cursor has been closed");
    }
    // ↓ ↓ ↓ 如果 getMore 获取到了数据,此处就不是 null 了, 则执行业务方法了
} while(this.nextBatch == null);


// 补充 : getMore 写入 nextBatch 的流程
private void initFromQueryResult(QueryResult<T> queryResult) {
    this.serverCursor = queryResult.getCursor();
    this.nextBatch = queryResult.getResults().isEmpty() ? null : queryResult.getResults();
    this.count += queryResult.getResults().size();
}

流程二 : next 方法还会去查询吗

// 其实由于 hasNext 已经获取到数据了,所以 next 方法是直接从内存里面获取
public List<T> next() {
    if (this.closed) {
        throw new IllegalStateException("Iterator has been closed");
    } else if (!this.hasNext()) {
        throw new NoSuchElementException();
    } else {
        // 直接获取 hasNext 获取的对象,并且置空
        List<T> retVal = this.nextBatch;
        this.nextBatch = null;
        return retVal;
    }
}

3.2 数据结构

ChangeStreamDocument 结构

image.png

  • fullDocument : 变更的完整文档
    • 对于更新或者插入,这里包含整个新文档的内容
    • 对于删除或者替换,为空或者包含一部分数据
  • operationType : 变更的操作类型 (insert / update / replace / delete)
  • namespaceDocument : 变更的命名空间,包括集合名称和数据库名称
  • clusterTime : 变更文档的生成时间
  • resumeToken : 变更流标记,可以基于这个标记进行滚动监视
  • documentKey : 文档主键

补充一 : update 和 replace 有什么区别

// update 操作用于修改文档中的特定字段或添加新字段,而不必替换整个文档。
    - $set、$unset、$inc、$push 
db.collection.updateOne( 
    { _id: ObjectId("...") }, 
    { $set: { key: "new value" }
} 
    
// replace 操作用于完全替换集合中的一个文档
db.collection.replaceOne(
   { _id: ObjectId("...") },   
   { key: "new value", otherKey: "other value" } 
);

补充二 : 不同操作的数据结构

  • insert / replace : fullDocument 中包含文档的全量数据
  • update : updateDescription 描述了哪些被删除,哪些被添加
  • remove : fullDocument 为空,但是会提供 DocumentKey
// insert / replace :
{
   //.....
  "documentKey": {
    "_id": {
      "value": 1693056252946
    }
  },
  // 标识被更新的文档全量内容
  "fullDocument": {
    "_id": 1693056252946,
    "username": "湖北9",
    "age": 51,
    //.....
    "type": "四川24",
    "innerAddress": "湖北46",
    "nickName": "福建81",
    "_class": "com.gang.mongodb.demo.entity.User"
  },
  "operationType": "INSERT",
}


// update : 会多出 updateDescription 字段
{
  // 标识出操作类型  
  "operationType": "UPDATE",
  "updateDescription": {
    "removedFields": [],
    // 这里就会描述出修改的字段以及字段的详细信息
    "updatedFields": {
      "email": {
           //.......
      }
    }
  }
}


// remove : fullDocument 完全为空 ,会提供 operationType 为 DELETE
{
  // .....  
  "documentKey": {
    "_id": {
      "value": 1693051422039
    }
  },
  "operationType": "DELETE",
  }
}


------------------------
>>> 实际效果 :
Update event: UpdateDescription{removedFields=[], updatedFields={"email": "湖北641"}}
Change event: Document{{_id=1693056806716, username=四川86, age=10, address=湖南12, mobile=湖北29, email=湖北91, password=湖北61, status=28, idCard=55, sex=33, role=四川56, type=湖南93, innerAddress=福建7, nickName=湖南93, _class=com.gang.mongodb.demo.entity.User}}

四. 问题

4.1 ChangeStreams 怎么保证数据是事务完成的

首先我们知道, Oplog 记录了操作日志,用来更新到其他节点,保证了数据的一致性。

其次我们知道一个问题,MongoDB 分为主节点和从节点,Oplog 在写入主节点后产生,同时同步到从节点。

但是 : 如果从节点写入失败了,数据是会发生回滚的。

所以当数据写完主节点后并不会马上返回给客户端,而是会等从节点写成功后,才会把数据同步给客户端

4.2 数据中断后怎么处理的

很常见的问题,如果拉取数据的过程中发版了,项目重启后怎么判断拉取的新节点

changeStream 拉取基于时间戳,也可以使用 ResumeToken

4.3 ChangeStream 的因果性

  • 如果两条文档具有前后因果性,即文档2的数据是基于文档1产生的,那么文档1,文档2具有因果性,在 ChangeStream 中也会有严格的因果关系
  • 如果2条文档没有因果关系,那么对应的 ChangeStream 中不会顺序输出

4.4 关于 ChangeStream 的循环

在执行 hashNext 的时候,里面是会有几种情况进行处理 :

  • 如果 closed 后 ,再执行 hasNext , 会抛出异常
  • 如果 nextBatch ()不为 null ,则直接返回存在
  • 如果 超过 limit 限制,则返回不存在
  • 否则,会进行 do-while 循环,一直到上面有情况满足

所以,理论上在完全没配置的情况下,ChangeStream 会一直进行循环,监听整体的变化 (PS :看逻辑应该是不会有超时的情况)

4.5 游标怎么生效

也就是问 resumeToken 是哪里写入的 ?

  • resumeToken 只有在启动一个ChangeStream的时候才能设置,当 ChangeStream 持续监听的时候,是不能在设置 resumeToken 了
  • 当 ChangeStream 开始监听后, 会返回当前监听流程的 resumeToken ,每个监听过程都会返回一个新的 token

其实也能理解,每次都设置很容易出现并发问题和数据不一致,每次创建的时候从指定位点开始才比较合理。

总结

好东西,学了其乐无穷。

参考文档

  • 玩转MongoDB - ChangeSteams 使用原理