聊聊chronos的cancelMessage

290 阅读2分钟

本文主要研究一下chronos的cancelMessage

MqPullService

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPullService.java

public class MqPullService implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(MqPullService.class);

    private static final PullConfig PULL_CONFIG = ConfigManager.getConfig().getPullConfig();
    private static final Batcher BATCHER = Batcher.getInstance();
    private volatile boolean shouldStop = false;
    private CountDownLatch cdl;

    private final List<Long> succOffsets = new ArrayList<>();
    private final List<Long> failOffsets = new ArrayList<>();
    private SimpleCarreraConsumer carreraConsumer;
    private String mqPullServiceName;

    private final int INTERNAL_PAIR_COUNT = 5000;
    private final BlockingQueue<InternalPair> blockingQueue = new ArrayBlockingQueue<>(INTERNAL_PAIR_COUNT);

    //......

    private void cancelMessage(final InternalKey internalKey, final String topic, final int action) {
        InternalKey tombStoneInternalKey = internalKey.cloneTombstoneInternalKey();

        if (internalKey.getType() == MsgTypes.DELAY.getValue()) {
            MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.DELAY);

            BATCHER.putToDefaultCF(tombStoneInternalKey.genUniqDelayMsgId(),
                    new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, tombStoneInternalKey, action);
        } else if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()) {
            MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_DELAY);

            BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action);
        } else if (internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) {
            MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_EXPONENT_DELAY);

            BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action);
        } else {
            MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.UNKNOWN);

            LOGGER.error("should not go here, invalid message type: {}, internalKey: {}", internalKey.getType(),
                    internalKey.genUniqDelayMsgId());
        }
    }

    //......
}
  • cancelMessage方法首先通过internalKey.cloneTombstoneInternalKey()构造tombStoneInternalKey,之后对于MsgTypes.DELAY类型的执行BATCHER.putToDefaultCF(tombStoneInternalKey.genUniqDelayMsgId(), new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, tombStoneInternalKey, action);对于MsgTypes.LOOP_DELAY及MsgTypes.LOOP_EXPONENT_DELAY的执行BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action)

InternalKey

DDMQ/carrera-common/src/main/java/com/xiaojukeji/carrera/chronos/model/InternalKey.java

public class InternalKey {
    private static final String SEPARATOR = "-";
    private static final int LEN_UUID = 36;
    private static final long ONE_DAY_SECONDS = 24 * 60 * 60;

    private long timestamp;
    private int type;
    private long expire;
    private long times;
    private long timed;
    private long interval;
    private int innerTopicSeq;
    private String uuid;
    private int segmentNum;
    private int segmentIndex;

    //......

    public InternalKey cloneTombstoneInternalKey() {
        InternalKey tombstoneInternalKey = new InternalKey(this);
        tombstoneInternalKey.setType(MsgTypes.TOMBSTONE.getValue());
        return tombstoneInternalKey;
    }

    //......
}
  • cloneTombstoneInternalKey方法设置type为MsgTypes.TOMBSTONE.getValue()

CancelWrap

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/model/CancelWrap.java

public class CancelWrap {
    private String uniqDelayMsgId;
    private String topic;

    public CancelWrap() {
    }

    public CancelWrap(String uniqDelayMsgId, String topic) {
        this.uniqDelayMsgId = uniqDelayMsgId;
        this.topic = topic;
    }

    public String getUniqDelayMsgId() {
        return uniqDelayMsgId;
    }

    public void setUniqDelayMsgId(String uniqDelayMsgId) {
        this.uniqDelayMsgId = uniqDelayMsgId;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public String toJsonString() {
        return JsonUtils.toJsonString(this);
    }

    @Override
    public String toString() {
        return "CancelWrap{" +
                "uniqDelayMsgId='" + uniqDelayMsgId + '\'' +
                ", topic='" + topic + '\'' +
                '}';
    }
}
  • CancelWrap定义了uniqDelayMsgId及topic两个属性

Batcher

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/autobatcher/Batcher.java

public class Batcher {
    private static final Logger LOGGER = LoggerFactory.getLogger(Batcher.class);

    private static final int PULL_BATCH_ITEM_NUM = ConfigManager.getConfig().getPullConfig().getPullBatchItemNum();
    private static final int MSG_BYTE_BASE_LEN = ConfigManager.getConfig().getPullConfig().getMsgByteBaseLen();

    private WriteBatch wb = new WriteBatch();
    private volatile int itemNum = 0;
    private static volatile Batcher instance = null;

    public static volatile ReentrantLock lock = new ReentrantLock();

    //......

    public void putLoopTombstoneKey(final InternalKey tombstoneInternalKey, InternalKey internalKey, final String topic, final int action) {
        lock.lock();
        try {
            // 指数循环
            // 1536811267-4-1536911267-3-0-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5
            // 1536811567-4-1536911267-3-1-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5
            // 1536897967-4-1536911267-3-2-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5

            // 普通循环
            // 1536811267-3-1536911267-3-0-10-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5
            while (!KeyUtils.afterSeekTimestamp(internalKey.getTimestamp())) {
                internalKey = internalKey.nextUniqDelayMsgId();
            }

            tombstoneInternalKey.setTimestamp(internalKey.getTimestamp());
            tombstoneInternalKey.setTimes(internalKey.getTimed() + 2);
            tombstoneInternalKey.setTimed(internalKey.getTimed());

            if (!KeyUtils.isInvalidMsg(tombstoneInternalKey)) {
                putToDefaultCF(tombstoneInternalKey.genUniqDelayMsgId(),
                        new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, internalKey, action);
            }
        } finally {
            lock.unlock();
        }
    }

    //......
}
  • putLoopTombstoneKey方法通过KeyUtils.afterSeekTimestamp(internalKey.getTimestamp())寻找internalKey,然后通过putToDefaultCF添加一条CancelWrap记录

小结

cancelMessage方法首先通过internalKey.cloneTombstoneInternalKey()构造tombStoneInternalKey,之后对于MsgTypes.DELAY类型的执行BATCHER.putToDefaultCF(tombStoneInternalKey.genUniqDelayMsgId(), new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, tombStoneInternalKey, action);对于MsgTypes.LOOP_DELAY及MsgTypes.LOOP_EXPONENT_DELAY的执行BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action)

doc