聊聊maxwell的Recovery

770 阅读2分钟

本文主要研究一下maxwell的Recovery

Recovery

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/recovery/Recovery.java

public class Recovery {
	static final Logger LOGGER = LoggerFactory.getLogger(Recovery.class);

	private final ConnectionPool replicationConnectionPool;
	private final RecoveryInfo recoveryInfo;
	private final MaxwellMysqlConfig replicationConfig;
	private final String maxwellDatabaseName;
	private final RecoverySchemaStore schemaStore;


	public Recovery(MaxwellMysqlConfig replicationConfig,
					String maxwellDatabaseName,
					ConnectionPool replicationConnectionPool,
					CaseSensitivity caseSensitivity,
					RecoveryInfo recoveryInfo) {
		this.replicationConfig = replicationConfig;
		this.replicationConnectionPool = replicationConnectionPool;
		this.recoveryInfo = recoveryInfo;
		this.schemaStore = new RecoverySchemaStore(replicationConnectionPool, maxwellDatabaseName, caseSensitivity);
		this.maxwellDatabaseName = maxwellDatabaseName;
	}

	public HeartbeatRowMap recover() throws Exception {
		String recoveryMsg = String.format(
			"old-server-id: %d, position: %s",
			recoveryInfo.serverID,
			recoveryInfo.position
		);

		LOGGER.warn("attempting to recover from master-change: " + recoveryMsg);
		List<BinlogPosition> list = getBinlogInfo();
		for ( int i = list.size() - 1; i >= 0 ; i-- ) {
			BinlogPosition binlogPosition = list.get(i);
			Position position = Position.valueOf(binlogPosition, recoveryInfo.getHeartbeat());
			Metrics metrics = new NoOpMetrics();

			LOGGER.debug("scanning binlog: " + binlogPosition);
			Replicator replicator = new BinlogConnectorReplicator(
					this.schemaStore,
					null,
					null,
					replicationConfig,
					0L, // server-id of 0 activates "mysqlbinlog" behavior where the server will stop after each binlog
					maxwellDatabaseName,
					metrics,
					position,
					true,
					recoveryInfo.clientID,
					new HeartbeatNotifier(),
					null,
					new RecoveryFilter(this.maxwellDatabaseName),
					new MaxwellOutputConfig(),
					0.25f // Default memory usage size, not used 
			);

			HeartbeatRowMap h = findHeartbeat(replicator);
			if ( h != null ) {
				LOGGER.warn("recovered new master position: " + h.getNextPosition());
				return h;
			}
		}

		LOGGER.error("Could not recover from master-change: " + recoveryMsg);
		return null;
	}

	/**
	 * try to find a given heartbeat value from the replicator.
	 * @return A BinlogPosition where the heartbeat was found, or null if none was found.
	 */
	private HeartbeatRowMap findHeartbeat(Replicator r) throws Exception {
		r.startReplicator();
		for (RowMap row = r.getRow(); row != null ; row = r.getRow()) {
			if (!(row instanceof HeartbeatRowMap)) {
				continue;
			}
			HeartbeatRowMap heartbeatRow = (HeartbeatRowMap) row;
			if (heartbeatRow.getPosition().getLastHeartbeatRead() == recoveryInfo.getHeartbeat())
				return heartbeatRow;
		}
		return null;
	}

	/**
	 * fetch a list of binlog positions representing the start of each binlog file
	 *
	 * @return a list of binlog positions to attempt recovery at
	 * */

	private List<BinlogPosition> getBinlogInfo() throws SQLException {
		ArrayList<BinlogPosition> list = new ArrayList<>();
		try ( Connection c = replicationConnectionPool.getConnection() ) {
			ResultSet rs = c.createStatement().executeQuery("SHOW BINARY LOGS");
			while ( rs.next() ) {
				list.add(BinlogPosition.at(4, rs.getString("Log_name")));
			}
		}
		return list;
	}
}
  • Recovery提供了recover方法,它先通过getBinlogInfo方法获取BinlogPosition列表,之后从后往前遍历BinlogPosition构建BinlogConnectorReplicator,然后最后通过findHeartbeat方法查找heartbeatRow.getPosition().getLastHeartbeatRead()为recoveryInfo.getHeartbeat()的HeartbeatRowMap,如果不为null则直接返回

Maxwell

maxwell-1.25.1/src/main/java/com/zendesk/maxwell/Maxwell.java

public class Maxwell implements Runnable {
	protected MaxwellConfig config;
	protected MaxwellContext context;
	protected Replicator replicator;

	static final Logger LOGGER = LoggerFactory.getLogger(Maxwell.class);

	public Maxwell(MaxwellConfig config) throws SQLException, URISyntaxException {
		this(new MaxwellContext(config));
	}

	protected Maxwell(MaxwellContext context) throws SQLException, URISyntaxException {
		this.config = context.getConfig();
		this.context = context;
		this.context.probeConnections();
	}

	//......

	private Position attemptMasterRecovery() throws Exception {
		HeartbeatRowMap recoveredHeartbeat = null;
		MysqlPositionStore positionStore = this.context.getPositionStore();
		RecoveryInfo recoveryInfo = positionStore.getRecoveryInfo(config);

		if ( recoveryInfo != null ) {
			Recovery masterRecovery = new Recovery(
				config.replicationMysql,
				config.databaseName,
				this.context.getReplicationConnectionPool(),
				this.context.getCaseSensitivity(),
				recoveryInfo
			);

			recoveredHeartbeat = masterRecovery.recover();

			if (recoveredHeartbeat != null) {
				// load up the schema from the recovery position and chain it into the
				// new server_id
				MysqlSchemaStore oldServerSchemaStore = new MysqlSchemaStore(
					context.getMaxwellConnectionPool(),
					context.getReplicationConnectionPool(),
					context.getSchemaConnectionPool(),
					recoveryInfo.serverID,
					recoveryInfo.position,
					context.getCaseSensitivity(),
					config.filter,
					false
				);

				// Note we associate this schema to the start position of the heartbeat event, so that
				// we pick it up when resuming at the event after the heartbeat.
				oldServerSchemaStore.clone(context.getServerID(), recoveredHeartbeat.getPosition());
				return recoveredHeartbeat.getNextPosition();
			}
		}
		return null;
	}

	//......

}
  • Maxwell的attemptMasterRecovery方法通过positionStore.getRecoveryInfo(config)获取recoveryInfo,若recoveryInfo不为null则创建masterRecovery,执行masterRecovery.recover()获取recoveredHeartbeat,若recoveredHeartbeat不为null则返回recoveredHeartbeat.getNextPosition()

小结

Recovery提供了recover方法,它先通过getBinlogInfo方法获取BinlogPosition列表,之后从后往前遍历BinlogPosition构建BinlogConnectorReplicator,然后最后通过findHeartbeat方法查找heartbeatRow.getPosition().getLastHeartbeatRead()为recoveryInfo.getHeartbeat()的HeartbeatRowMap,如果不为null则直接返回

doc