阅读 40

聊聊puma的Parser

本文主要研究一下puma的Parser

Parser

puma/puma/src/main/java/com/dianping/puma/parser/Parser.java

public interface Parser extends LifeCycle {
    BinlogEvent parse(ByteBuffer buf, PumaContext context) throws IOException;
}
复制代码
  • Parser继承了LifeCycle接口,它定义了parse方法,解析ByteBuffer到BinlogEvent

DefaultBinlogParser

puma/puma/src/main/java/com/dianping/puma/parser/DefaultBinlogParser.java

@ThreadSafe
public class DefaultBinlogParser implements Parser {
    private final Logger logger = LoggerFactory.getLogger(DefaultBinlogParser.class);
    private static Map<Byte, Class<? extends BinlogEvent>> eventMaps = new ConcurrentHashMap<Byte, Class<? extends BinlogEvent>>();

    @Override
    public BinlogEvent parse(ByteBuffer buf, PumaContext context) throws IOException {

        logger.debug("\n\n\n");
        logger.debug("****************************** binlog parse begin ******************************");

        BinlogHeader header = new BinlogHeader();
        header.parse(buf, context);

        logger.debug("binlog event header:\n");
        logger.debug("{}", header);

        BinlogEvent event = null;
        Class<? extends BinlogEvent> eventClass = eventMaps.get(header.getEventType());
        if (eventClass != null) {
            try {
                event = eventClass.newInstance();
            } catch (Exception e) {
                logger.error("Init event class failed. eventType: " + header.getEventType(), e);
                event = null;
            }
        }

        if (event == null) {
            event = new PumaIgnoreEvent();
        }

        logger.debug("binlog event type:\n");
        logger.debug("{}", event.getClass());

        event.parse(buf, context, header);

        logger.debug("binlog event:\n");
        logger.debug("{}", event);
        logger.debug("****************************** binlog parse end ******************************");
        logger.debug("\n\n\n");

        return event;
    }

    /*
     * (non-Javadoc)
     *
     * @see com.dianping.puma.common.LifeCycle#start()
     */
    @Override
    public void start() {
        eventMaps.put(BinlogConstants.UNKNOWN_EVENT, UnknownEvent.class);
        eventMaps.put(BinlogConstants.QUERY_EVENT, QueryEvent.class);
        eventMaps.put(BinlogConstants.STOP_EVENT, StopEvent.class);
        eventMaps.put(BinlogConstants.ROTATE_EVENT, RotateEvent.class);
        eventMaps.put(BinlogConstants.INTVAR_EVENT, IntVarEvent.class);
        eventMaps.put(BinlogConstants.RAND_EVENT, RandEvent.class);
        eventMaps.put(BinlogConstants.USER_VAR_EVENT, UserVarEvent.class);
        eventMaps.put(BinlogConstants.FORMAT_DESCRIPTION_EVENT, FormatDescriptionEvent.class);
        eventMaps.put(BinlogConstants.XID_EVENT, XIDEvent.class);
        eventMaps.put(BinlogConstants.TABLE_MAP_EVENT, TableMapEvent.class);
        eventMaps.put(BinlogConstants.WRITE_ROWS_EVENT_V1, WriteRowsEvent.class);
        eventMaps.put(BinlogConstants.UPDATE_ROWS_EVENT_V1, UpdateRowsEvent.class);
        eventMaps.put(BinlogConstants.DELETE_ROWS_EVENT_V1, DeleteRowsEvent.class);
        eventMaps.put(BinlogConstants.INCIDENT_EVENT, IncidentEvent.class);
        //mysql --5.6
        eventMaps.put(BinlogConstants.WRITE_ROWS_EVENT, WriteRowsEvent.class);
        eventMaps.put(BinlogConstants.UPDATE_ROWS_EVENT, UpdateRowsEvent.class);
        eventMaps.put(BinlogConstants.DELETE_ROWS_EVENT, DeleteRowsEvent.class);
        eventMaps.put(BinlogConstants.HEARTBEAT_LOG_EVENT, HeartbeatEvent.class);
        eventMaps.put(BinlogConstants.IGNORABLE_LOG_EVENT, IgnorableEvent.class);
        eventMaps.put(BinlogConstants.ROWS_QUERY_LOG_EVENT, RowsQueryEvent.class);
        eventMaps.put(BinlogConstants.GTID_LOG_EVENT, GtidEvent.class);
        eventMaps.put(BinlogConstants.ANONYMOUS_GTID_LOG_EVENT, AnonymousGtidEvent.class);
        eventMaps.put(BinlogConstants.PREVIOUS_GTIDS_LOG_EVENT, PreviousGtidsEvent.class);
    }

    @Override
    public void stop() {

    }

}
复制代码
  • DefaultBinlogParser实现了Parser接口,其parse方法通过header.getEventType()先实例化对应的BinlogEvent,然后通过event.parse(buf, context, header)进行解析

BinlogEvent

puma/puma/src/main/java/com/dianping/puma/parser/mysql/event/BinlogEvent.java

public interface BinlogEvent extends Serializable {
	BinlogHeader getHeader();
	
	void setHeader(BinlogHeader header);

	void parse(ByteBuffer buf, PumaContext context, BinlogHeader header) throws IOException;
}
复制代码
  • BinlogEvent接口定义了getHeader、setHeader、parse方法

AbstractBinlogEvent

puma/puma/src/main/java/com/dianping/puma/parser/mysql/event/AbstractBinlogEvent.java

public abstract class AbstractBinlogEvent implements BinlogEvent {
	private static final long serialVersionUID = -8136236885229956889L;
	private BinlogHeader header;
	private int checksumAlg = BinlogConstants.CHECKSUM_ALG_OFF;
	private long crc;

	@Override
	public void parse(ByteBuffer buf, PumaContext context, BinlogHeader header) throws IOException {
		this.header = header;
		doParse(buf, context);
		if (!(this.header.getEventType() == BinlogConstants.ROTATE_EVENT)) {
			checksumAlg = context.getChecksumAlg(); // fetch checksum alg
			parseCheckSum(buf);
		}
	}

	@Override
	public BinlogHeader getHeader() {
		return header;
	}
	
	@Override
	public void setHeader(BinlogHeader header) {
		this.header = header;
	}

	public abstract void doParse(ByteBuffer buf, PumaContext context) throws IOException;

	private void parseCheckSum(ByteBuffer buf) {
		if (checksumAlg != BinlogConstants.CHECKSUM_ALG_OFF && checksumAlg != BinlogConstants.CHECKSUM_ALG_UNDEF) {
			buf.position((int) (this.header.getEventLength() - 4));
			setCrc(PacketUtils.readLong(buf, 4));
		}
	}

	@Override public String toString() {
		return new ToStringBuilder(this)
				.append("header", header)
				.append("checksumAlg", checksumAlg)
				.append("crc", crc)
				.toString();
	}

	public void setChecksumAlg(int checksumAlg) {
		this.checksumAlg = checksumAlg;
	}

	public int getChecksumAlg() {
		return checksumAlg;
	}

	public long getCrc() {
		return crc;
	}

	public void setCrc(long crc) {
		this.crc = crc;
	}

	public boolean isRemaining(ByteBuffer buf, PumaContext context) {
		return context.isCheckSum() ? buf.remaining() - 4 > 0 : buf.hasRemaining();
	}

	public int lenRemaining(ByteBuffer buf, PumaContext context) {
		return context.isCheckSum() ? buf.remaining() - 4 : buf.remaining();
	}
}
复制代码
  • AbstractBinlogEvent声明实现了BinlogEvent接口,其parse方法会调用doParse方法,之后对于非ROTATE_EVENT会执行parseCheckSum

小结

Parser继承了LifeCycle接口,它定义了parse方法,解析ByteBuffer到BinlogEvent;DefaultBinlogParser实现了Parser接口,其parse方法通过header.getEventType()先实例化对应的BinlogEvent,然后通过event.parse(buf, context, header)进行解析

doc

关注下面的标签,发现更多相似文章
评论