聊聊debezium的eventHandlers

606 阅读3分钟

本文主要研究一下debezium的eventHandlers

handleInsert

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java

public class BinlogReader extends AbstractReader {

	//......

    protected void handleInsert(Event event) throws InterruptedException {
        if (skipEvent) {
            // We can skip this because we should already be at least this far ...
            logger.debug("Skipping previously processed row event: {}", event);
            return;
        }
        if (ignoreDmlEventByGtidSource) {
            logger.debug("Skipping DML event because this GTID source is filtered: {}", event);
            return;
        }
        WriteRowsEventData write = unwrapData(event);
        long tableNumber = write.getTableId();
        BitSet includedColumns = write.getIncludedColumns();
        RecordsForTable recordMaker = recordMakers.forTable(tableNumber, includedColumns, super::enqueueRecord);
        if (recordMaker != null) {
            List<Serializable[]> rows = write.getRows();
            final Instant ts = context.getClock().currentTimeAsInstant();
            int count = 0;
            int numRows = rows.size();
            if (startingRowNumber < numRows) {
                for (int row = startingRowNumber; row != numRows; ++row) {
                    count += recordMaker.create(rows.get(row), ts, row, numRows);
                }
                if (logger.isDebugEnabled()) {
                    if (startingRowNumber != 0) {
                        logger.debug("Recorded {} insert record(s) for last {} row(s) in event: {}",
                                count, numRows - startingRowNumber, event);
                    }
                    else {
                        logger.debug("Recorded {} insert record(s) for event: {}", count, event);
                    }
                }
            }
            else {
                // All rows were previously processed ...
                logger.debug("Skipping previously processed insert event: {}", event);
            }
        }
        else {
            informAboutUnknownTableIfRequired(event, recordMakers.getTableIdFromTableNumber(tableNumber), "insert row");
        }
        startingRowNumber = 0;
    }

    //......

}
  • handleInsert方法将event解析为WriteRowsEventData,然后通过recordMakers.forTable(tableNumber, includedColumns, super::enqueueRecord)获取recordMaker,执行recordMaker.create(rows.get(row), ts, row, numRows)

handleUpdate

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java

public class BinlogReader extends AbstractReader {

	//......

    protected void handleUpdate(Event event) throws InterruptedException {
        if (skipEvent) {
            // We can skip this because we should already be at least this far ...
            logger.debug("Skipping previously processed row event: {}", event);
            return;
        }
        if (ignoreDmlEventByGtidSource) {
            logger.debug("Skipping DML event because this GTID source is filtered: {}", event);
            return;
        }
        UpdateRowsEventData update = unwrapData(event);
        long tableNumber = update.getTableId();
        BitSet includedColumns = update.getIncludedColumns();
        // BitSet includedColumnsBefore = update.getIncludedColumnsBeforeUpdate();
        RecordsForTable recordMaker = recordMakers.forTable(tableNumber, includedColumns, super::enqueueRecord);
        if (recordMaker != null) {
            List<Entry<Serializable[], Serializable[]>> rows = update.getRows();
            final Instant ts = context.getClock().currentTimeAsInstant();
            int count = 0;
            int numRows = rows.size();
            if (startingRowNumber < numRows) {
                for (int row = startingRowNumber; row != numRows; ++row) {
                    Map.Entry<Serializable[], Serializable[]> changes = rows.get(row);
                    Serializable[] before = changes.getKey();
                    Serializable[] after = changes.getValue();
                    count += recordMaker.update(before, after, ts, row, numRows);
                }
                if (logger.isDebugEnabled()) {
                    if (startingRowNumber != 0) {
                        logger.debug("Recorded {} update record(s) for last {} row(s) in event: {}",
                                count, numRows - startingRowNumber, event);
                    }
                    else {
                        logger.debug("Recorded {} update record(s) for event: {}", count, event);
                    }
                }
            }
            else {
                // All rows were previously processed ...
                logger.debug("Skipping previously processed update event: {}", event);
            }
        }
        else {
            informAboutUnknownTableIfRequired(event, recordMakers.getTableIdFromTableNumber(tableNumber), "update row");
        }
        startingRowNumber = 0;
    }

	//......

}
  • handleUpdate方法将event解析为UpdateRowsEventData,然后通过recordMakers.forTable(tableNumber, includedColumns, super::enqueueRecord)获取recordMaker,执行recordMaker.update(before, after, ts, row, numRows)

handleDelete

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java

public class BinlogReader extends AbstractReader {

	//......

    protected void handleDelete(Event event) throws InterruptedException {
        if (skipEvent) {
            // We can skip this because we should already be at least this far ...
            logger.debug("Skipping previously processed row event: {}", event);
            return;
        }
        if (ignoreDmlEventByGtidSource) {
            logger.debug("Skipping DML event because this GTID source is filtered: {}", event);
            return;
        }
        DeleteRowsEventData deleted = unwrapData(event);
        long tableNumber = deleted.getTableId();
        BitSet includedColumns = deleted.getIncludedColumns();
        RecordsForTable recordMaker = recordMakers.forTable(tableNumber, includedColumns, super::enqueueRecord);
        if (recordMaker != null) {
            List<Serializable[]> rows = deleted.getRows();
            final Instant ts = context.getClock().currentTimeAsInstant();
            int count = 0;
            int numRows = rows.size();
            if (startingRowNumber < numRows) {
                for (int row = startingRowNumber; row != numRows; ++row) {
                    count += recordMaker.delete(rows.get(row), ts, row, numRows);
                }
                if (logger.isDebugEnabled()) {
                    if (startingRowNumber != 0) {
                        logger.debug("Recorded {} delete record(s) for last {} row(s) in event: {}",
                                count, numRows - startingRowNumber, event);
                    }
                    else {
                        logger.debug("Recorded {} delete record(s) for event: {}", count, event);
                    }
                }
            }
            else {
                // All rows were previously processed ...
                logger.debug("Skipping previously processed delete event: {}", event);
            }
        }
        else {
            informAboutUnknownTableIfRequired(event, recordMakers.getTableIdFromTableNumber(tableNumber), "delete row");
        }
        startingRowNumber = 0;
    }

	//......

}
  • handleDelete方法将event解析为DeleteRowsEventData,然后通过recordMakers.forTable(tableNumber, includedColumns, super::enqueueRecord)获取recordMaker,执行recordMaker.delete(rows.get(row), ts, row, numRows)

handleQueryEvent

debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java

public class BinlogReader extends AbstractReader {

	//......

    protected void handleQueryEvent(Event event) throws InterruptedException {
        QueryEventData command = unwrapData(event);
        logger.debug("Received query command: {}", event);
        String sql = command.getSql().trim();
        if (sql.equalsIgnoreCase("BEGIN")) {
            // We are starting a new transaction ...
            source.startNextTransaction();
            source.setBinlogThread(command.getThreadId());
            if (initialEventsToSkip != 0) {
                logger.debug("Restarting partially-processed transaction; change events will not be created for the first {} events plus {} more rows in the next event",
                        initialEventsToSkip, startingRowNumber);
                // We are restarting, so we need to skip the events in this transaction that we processed previously...
                skipEvent = true;
            }
            return;
        }
        if (sql.equalsIgnoreCase("COMMIT")) {
            handleTransactionCompletion(event);
            return;
        }

        String upperCasedStatementBegin = Strings.getBegin(sql, 7).toUpperCase();

        if (upperCasedStatementBegin.startsWith("XA ")) {
            // This is an XA transaction, and we currently ignore these and do nothing ...
            return;
        }
        if (context.ddlFilter().test(sql)) {
            logger.debug("DDL '{}' was filtered out of processing", sql);
            return;
        }
        if (upperCasedStatementBegin.equals("INSERT ") || upperCasedStatementBegin.equals("UPDATE ") || upperCasedStatementBegin.equals("DELETE ")) {
            throw new ConnectException(
                    "Received DML '" + sql + "' for processing, binlog probably contains events generated with statement or mixed based replication format");
        }
        if (sql.equalsIgnoreCase("ROLLBACK")) {
            // We have hit a ROLLBACK which is not supported
            logger.warn("Rollback statements cannot be handled without binlog buffering, the connector will fail. Please check '{}' to see how to enable buffering",
                    MySqlConnectorConfig.BUFFER_SIZE_FOR_BINLOG_READER.name());
        }
        context.dbSchema().applyDdl(context.source(), command.getDatabase(), command.getSql(), (dbName, tables, statements) -> {
            if (recordSchemaChangesInSourceRecords && recordMakers.schemaChanges(dbName, tables, statements, super::enqueueRecord) > 0) {
                logger.debug("Recorded DDL statements for database '{}': {}", dbName, statements);
            }
        });
    }

	//......

}
  • handleQueryEvent方法将event解析为QueryEventData,然后通过context.dbSchema().applyDdl来执行ddl

小结

BinlogReader注册了增删改查的eventHandlers,它们分别解析event对对应的data,对于增删改则执行recordMakers的对应方法,对于查询则作用对应的ddl语句

doc