May 19, 2025

Validating MySQL MGR Dual-Datacenter Active-Active Architecture

Learn how to deploy MySQL Group Replication (MGR) in a dual-datacenter active-active setup for high availability and disaster recovery. Explore automated testing, fault tolerance, and best practices with code examples.

Validating MySQL MGR Dual-Datacenter Active-Active Architecture Feasibility

In production environments, many enterprises aim to achieve dual-datacenter active-active (双活) deployments to enhance business ​​high availability​​ and ​​disaster recovery​​ capabilities. A recent client inquiry highlighted a common challenge: How to implement MySQL Group Replication (MGR) for dual-datacenter active-active architectures? While MGR supports multi-master modes and asynchronous replication channel failover, its suitability for true dual-active scenarios remains debated. To address this, we developed an automated testing script using cursor to validate MGR’s feasibility in dual-datacenter setups.

Design Goals

  1. ​Validate​​ whether MySQL MGR can achieve dual-active operations in a dual-datacenter environment, ensuring:
    • Simultaneous writes in both locations
    • Automatic data synchronization
    • Fault-triggered automatic failover
  2. ​Automate​​ deployment, failover, fault simulation, and data consistency checks to reduce manual intervention and improve testing efficiency.

Key Features

  1. ​One-Click Deployment of Dual Clusters​
    • Supports single-master and multi-master modes.
    • Uses dbdeployer to rapidly deploy two MGR clusters simulating dual-datacenter environments.
  2. ​Flexible Replication Testing​
    • Configurable unidirectional or bidirectional replication.
    • Automatically establishes asynchronous replication channels for dual-active data writes.
  3. ​Automated Failover and Recovery​
    • Simulates primary node failures and automatic election of new primaries.
    • Supports node restarts and reintegration into clusters.
    • Replicates real-world failure scenarios.
  4. ​Data Consistency and Health Checks​
    • Automatically inserts test data and validates cross-datacenter synchronization.
    • Monitors replication channel health in real time.

Testing Conclusions

Through automated testing, the following conclusions were drawn:

  • ​Single-Master Mode​​: Avoid bidirectional replication. Instead, use ​​MySQL Shell​​ to deploy a ​​ClusterSet​​ architecture with ​​MySQL Router​​ for dual-active setups.
  • ​Multi-Master Mode​​: If bidirectional replication is enabled, ensure skip_replica_start is activated. Otherwise, nodes may fail to rejoin the cluster due to GTID inconsistencies during restarts.
  • ​MySQL Shell Limitation​​: Currently lacks native support for bidirectional replication; rely on ClusterSet instead.

Usage Examples

# Deploy single-master cluster
python mgr_test.py -s  

# Deploy multi-master cluster
python mgr_test.py -m  

# Run single-master unidirectional replication test
python mgr_test.py -ss  

# Run multi-master bidirectional replication test
python mgr_test.py -aa  

Source Code Highlights

The script leverages Python for automation, with key components including:

  • ​Cluster Deployment​​: Automates cluster initialization and topology configuration.
  • ​Replication Setup​​: Configures asynchronous channels and failover policies.
  • ​Fault Simulation​​: Tests node failures, restarts, and recovery workflows.
  • ​Data Validation​​: Ensures consistency across clusters via SQL queries.

#!/usr/bin/env python3
from typing import List, Dict, Literal
import subprocess
import time
from dataclasses import dataclass
from datetime import datetime
import argparse

MYSQL_VERSION = "8.0.40"
REPL_USER = "msandbox"
REPL_PASSWORD = "msandbox"

# MGR cluster port configuration
MGR1_PORTS = [4316, 4317, 4318]  # Ports for the first MGR cluster
MGR2_PORTS = [5316, 5317, 5318]  # Ports for the second MGR cluster
SANDBOX = "$HOME/workbench/sandboxs"

# Replication channel names
REPLICATION_CHANNELS = {
    "mgr1": "async_mgr2_to_mgr1",   # Channel for mgr1 cluster
    "mgr2": "async_mgr1_to_mgr2"    # Channel for mgr2 cluster
}

@dataclass
class MGRNode:
    port: int
    host: str = '127.0.0.1'
    user: str = 'msandbox'
    password: str = 'msandbox'

    def execute_sql(self, sql: str, vertical: bool = False) -> str:
        """Execute SQL command on the node"""
        cmd = f"mysql -h{self.host} -P{self.port} -u{self.user} -p{self.password}"
        if not vertical:
            cmd += " -NB"
        cmd += f" -e \"{sql}\""
        try:
            result = subprocess.run(
                cmd, shell=True, capture_output=True, text=True, check=True
            )
            return result.stdout.strip()
        except subprocess.CalledProcessError as e:
            error_msg = e.stderr.strip()
            print("\n📋 Latest error log:")
            self.show_error_log()
            raise Exception(e.stderr)

    def show_error_log(self, lines: int = 20):
        """Display recent error logs"""
        try:
            log_file = (
                f"{SANDBOX}/mgr{'1' if self.port < 5000 else '2'}/"
                f"node{(self.port % 10) - 5}/data/msandbox.err"
            )
            cmd = f"tail -n {lines} {log_file} | grep ERROR"
            result = subprocess.run(
                cmd, shell=True, capture_output=True, text=True, check=True
            )
            print(result.stdout.strip())
        except subprocess.CalledProcessError as e:
            print(f"⚠️ Failed to read error log: {e.stderr.strip()}")

    def shutdown(self, force: bool = False):
        """Shutdown the MySQL instance"""
        if force:
            # Kill only MySQL processes
            cmd = (
                f"ps aux | grep mysql.*{self.port} | grep -v grep | awk '{{print $2}}' | "
                "xargs -r kill -9"
            )
        else:
            cmd = (
                f"mysqladmin -u{self.user} -p{self.password} -h{self.host} -P{self.port} shutdown"
            )
        subprocess.run(cmd, shell=True, check=True)

class MGRCluster:
    def __init__(
        self,
        name: str,
        ports: List[int],
        mysql_version: str = MYSQL_VERSION
    ):
        self.name = name
        self.nodes = [MGRNode(port) for port in ports]
        self.primary_node = self.nodes[0]
        self.mysql_version = mysql_version
        self._primary_node_cache = None
        self._last_primary_check = 0
        self._cache_ttl = 10  # Cache TTL in seconds

    def deploy(
        self,
        topology_mode: Literal["single-primary", "multi-primary"] = "single-primary"
    ):
        """Deploy MGR cluster"""
        # Cleanup existing cluster
        try:
            print(f"=== Removing cluster {self.name} ===")
            cmd = (
                f"ps aux | grep {self.name} | grep -v grep | awk '{{print $2}}' | "
                "xargs -r kill -9"
            )
            cmd = f"dbdeployer delete {self.name} --concurrent 2>/dev/null || true"
            subprocess.run(cmd, shell=True, check=False)
        except:
            pass  # Ignore cleanup errors

        # Deploy new cluster
        print(f"=== Deploying cluster {self.name} ===")
        cmd = (
            f"dbdeployer deploy replication --topology=group -c skip_replica_start=on"
        )
        if topology_mode == "single-primary":
            cmd += " --single-primary"
        cmd += (
            f" --nodes={len(self.nodes)} --concurrent --port-as-server-id "
            f"--base-port={self.nodes[0].port - 1} {self.mysql_version} "
            f"--sandbox-directory={self.name} > /dev/null"
        )
        print(f"{cmd}")
        subprocess.run(cmd, shell=True, check=True)

        # Clear cache
        self._primary_node_cache = None
        self._last_primary_check = 0

    def get_primary_node(self, force_check: bool = False) -> MGRNode:
        """Get current primary node"""
        current_time = time.time()
        if not force_check and self._primary_node_cache:
            if (current_time - self._last_primary_check) < self._cache_ttl:
                return self._primary_node_cache

        print(f"\n🔍 Searching for primary node in cluster {self.name}")
        for node in self.nodes:
            try:
                print(f"  ⚡️ Checking node {node.port}")
                time.sleep(3)
                group_status = node.execute_sql("""
                    SELECT COUNT(*) 
                    FROM performance_schema.replication_group_members 
                    WHERE MEMBER_STATE = 'ONLINE'
                """)
                print(f"  📊 Online nodes: {group_status}")

                if int(group_status) > 0:
                    members = node.execute_sql("""
                        SELECT CONCAT(
                            MEMBER_PORT, ' (', MEMBER_STATE, '/', MEMBER_ROLE, ')'
                        ) 
                        FROM performance_schema.replication_group_members
                    """)
                    print(f"  📋 Member statuses: {members.replace('\n', ', ')}")

                    for check_node in self.nodes:
                        role = check_node.execute_sql("""
                            SELECT MEMBER_ROLE 
                            FROM performance_schema.replication_group_members 
                            WHERE MEMBER_ID = @@server_uuid
                        """)
                        if role == "PRIMARY":
                            self.primary_node = check_node
                            self._primary_node_cache = check_node
                            self._last_primary_check = current_time
                            print(f"  ✅ Found primary node: {check_node.port}")
                            return check_node
                    break  # No need to check further if online but no primary found
            except subprocess.CalledProcessError as e:
                print(f"  ❌ Node {node.port} check failed")
                continue

        # Clear cache on failure
        self._primary_node_cache = None
        self._last_primary_check = 0
        raise Exception(f"❗ Cluster {self.name} has no primary node (election in progress)")

    def init_test_schema(self):
        """Initialize test database/schema"""
        primary = self.get_primary_node()
        primary.execute_sql("""
            CREATE DATABASE IF NOT EXISTS mgr_test;
            CREATE TABLE IF NOT EXISTS mgr_test.events (
                id INT AUTO_INCREMENT PRIMARY KEY,
                event_time DATETIME,
                cluster_name VARCHAR(10),
                event_type VARCHAR(20),
                event_data VARCHAR(100)
            );
        """)

    def switch_mode(self, to_single_primary: bool = True):
        """Switch between single-primary/multi-primary mode"""
        primary = self.get_primary_node()
        mode = "single-primary" if to_single_primary else "multi-primary"
        print(f"\n🔄 Switching {self.name} to {mode} mode")

        try:
            mode_sql = "SINGLE_PRIMARY" if to_single_primary else "MULTI_PRIMARY"
            primary.execute_sql(f"""
                SELECT group_replication_switch_to_{mode_sql.lower()}_mode()
            """)
            # Clear cache after mode switch
            self._primary_node_cache = None
            self._last_primary_check = 0
            print(f"✅ {self.name} mode switched to {mode}")
        except Exception as e:
            print(f"❌ Switch failed: {str(e)}")
            raise

class ReplicationManager:
    def __init__(self, source_cluster: MGRCluster, target_cluster: MGRCluster):
        self.source = source_cluster
        self.target = target_cluster

    def setup_replication(
        self,
        source: MGRNode,
        target: MGRNode,
        channel: str = "",
        auto_failover: bool = True
    ):
        """Configure replication channel"""
        channel_clause = f" FOR CHANNEL '{channel}'" if channel else ""
        auto_failover_clause = (
            ",\n            SOURCE_CONNECTION_AUTO_FAILOVER=1"
            if auto_failover
            else ""
        )
        target.execute_sql(f"""
            CHANGE REPLICATION SOURCE TO
                SOURCE_HOST='{source.host}',
                SOURCE_PORT={source.port},
                SOURCE_USER='{REPL_USER}',
                SOURCE_PASSWORD='{REPL_PASSWORD}',
                SOURCE_CONNECT_RETRY=3,
                SOURCE_AUTO_POSITION=1{auto_failover_clause}
            {channel_clause};
            START REPLICA {channel_clause};
        """)

    def setup_async_replication(self):
        """Setup async replication between clusters"""
        source = self.source.get_primary_node()
        target = self.target.get_primary_node()
        self.setup_replication(
            source,
            target,
            REPLICATION_CHANNELS[self.target.name],
            auto_failover=False
        )

    def setup_active_active_replication(self):
        """Setup bi-directional async replication"""
        source_primary = self.source.get_primary_node()
        target_primary = self.target.get_primary_node()

        self.setup_replication(
            source_primary,
            target_primary,
            REPLICATION_CHANNELS[self.target.name],
            auto_failover=False
        )
        self.setup_replication(
            target_primary,
            source_primary,
            REPLICATION_CHANNELS[self.source.name],
            auto_failover=False
        )

# Deployment utilities
class ClusterDeployer:
    def __init__(self, mysql_version: str = MYSQL_VERSION):
        self.mysql_version = mysql_version

    def deploy_single_primary_clusters(
        self,
        cluster1_ports: List[int],
        cluster2_ports: List[int]
    ) -> tuple[MGRCluster, MGRCluster]:
        """Deploy two single-primary clusters"""
        cluster1 = MGRCluster("mgr1", cluster1_ports, self.mysql_version)
        cluster2 = MGRCluster("mgr2", cluster2_ports, self.mysql_version)

        print("\n🚀 Deploying test clusters")
        cluster1.deploy("single-primary")
        cluster2.deploy("single-primary")

        print("\n📝 Initializing test schema")
        cluster1.init_test_schema()
        cluster2.init_test_schema()

        return cluster1, cluster2

    def deploy_multi_primary_clusters(
        self,
        cluster1_ports: List[int],
        cluster2_ports: List[int]
    ) -> tuple[MGRCluster, MGRCluster]:
        """Deploy two multi-primary clusters"""
        cluster1 = MGRCluster("mgr1", cluster1_ports, self.mysql_version)
        cluster2 = MGRCluster("mgr2", cluster2_ports, self.mysql_version)

        print("\n🚀 Deploying multi-primary clusters")
        cluster1.deploy("multi-primary")
        cluster2.deploy("multi-primary")

        print("\n📝 Initializing test schema")
        cluster1.init_test_schema()
        cluster2.init_test_schema()

        return cluster1, cluster2

# Testing utilities
class ClusterTester:
    def __init__(
        self,
        cluster1: MGRCluster = None,
        cluster2: MGRCluster = None
    ):
        self.cluster1 = cluster1
        self.cluster2 = cluster2

    def _check_clusters(self):
        """Check if clusters are deployed"""
        try:
            cmd = "dbdeployer sandboxes --header | grep -E 'mgr[12]'"
            result = subprocess.run(
                cmd, shell=True, capture_output=True, text=True, check=True
            )
            if result.stdout.strip():
                self.cluster1 = MGRCluster("mgr1", MGR1_PORTS)
                self.cluster2 = MGRCluster("mgr2", MGR2_PORTS)
                return
        except subprocess.CalledProcessError:
            pass
        raise Exception("❌ No clusters found. Please deploy first.")

    @staticmethod
    def insert_test_data(
        node: MGRNode,
        cluster_name: str,
        event_type: str,
        data: str
    ):
        """Insert test data into events table"""
        node.execute_sql(f"""
            INSERT INTO mgr_test.events (
                event_time, cluster_name, event_type, event_data
            ) VALUES (
                NOW(), '{cluster_name}', '{event_type}', '{data}'
            )
        """)

    @staticmethod
    def verify_data_sync(node: MGRNode):
        """Verify data synchronization"""
        result = node.execute_sql("""
            SELECT CONCAT(
                'Time: ', event_time, ', Cluster: ', cluster_name, 
                ', Type: ', event_type, ', Data: ', event_data
            ) FROM mgr_test.events ORDER BY event_time
        """)
        print(f"\n📊 Data on node {node.port}:\n{result}")

    @staticmethod
    def verify_replication_status(node: MGRNode, channel: str = ""):
        """Check replication channel status"""
        print(f"\n📡 Checking replication status for channel '{channel}'")
        cmd = (
            f"mysql -h{node.host} -P{node.port} -u{node.user} -p{node.password} "
            f"-e \"SHOW REPLICA STATUS FOR CHANNEL '{channel}'\\G\" 2>/dev/null | "
            "grep -E '(Source_Port|Replica_IO_Running|Replica_SQL_Running|Seconds_Behind_Source|Chna)'"
        )
        try:
            result = subprocess.run(
                cmd, shell=True, capture_output=True, text=True, check=True
            )
            print(result.stdout)
        except subprocess.CalledProcessError as e:
            print(f"❌ Replication check failed: {e.stderr or 'Channel not found'}")
            raise

    def setup_replication(self, mode: Literal["async", "active-active"]):
        """Configure replication between clusters"""
        self._check_clusters()
        repl_mgr = ReplicationManager(self.cluster1, self.cluster2)
        mode_name = "Async" if mode == "async" else "Bi-Directional"

        print(f"\n🔄 Setting up {mode_name} replication")
        if mode == "async":
            repl_mgr.setup_async_replication()
            self.verify_replication_status(
                self.cluster2.get_primary_node(),
                REPLICATION_CHANNELS[self.cluster2.name]
            )
        else:
            repl_mgr.setup_active_active_replication()
            for cluster in [self.cluster1, self.cluster2]:
                self.verify_replication_status(
                    cluster.get_primary_node(),
                    REPLICATION_CHANNELS[cluster.name]
                )

    def write_test_data(self, cluster: MGRCluster, event_type: str = ""):
        """Write test data to primary node"""
        primary = cluster.get_primary_node()
        print(f"\n📝 Writing test data to {cluster.name} primary {primary.port}")
        self.insert_test_data(
            primary,
            cluster.name,
            event_type,
            f"Data from node {primary.port}"
        )
        time.sleep(1)  # Add delay for visibility

    def simulate_node_failure(self, node: MGRNode, cluster: MGRCluster):
        """Simulate node failure"""
        print(f"\n💥 Simulating failure: Shutting down node {node.port}")
        node.shutdown(force=True)

        if node.port == cluster.primary_node.port:
            return self._wait_for_new_primary(cluster)

        return cluster.get_primary_node()

    def recover_node(self, node: MGRNode, cluster: MGRCluster):
        """Recover a failed node"""
        print(f"\n🔄 Restarting node {node.port} and rejoining cluster")
        node_index = cluster.nodes.index(node) + 1
        subprocess.run(
            f"{SANDBOX}/{cluster.name}/node{node_index}/start",
            shell=True,
            check=True
        )
        time.sleep(2)

        print(f"\n🔄 Rejoining cluster: START GROUP_REPLICATION")
        node.execute_sql("START GROUP_REPLICATION;")
        print(f"\n🔄 Enabling replication: START REPLICA")
        node.execute_sql("START REPLICA;")
        time.sleep(10)

    def verify_cluster_data(self, clusters: list[MGRCluster]):
        """Verify data consistency across clusters"""
        print("\n🔍 Verifying data synchronization")
        time.sleep(5)
        for cluster in clusters:
            primary = cluster.get_primary_node()
            self.verify_data_sync(primary)
            self.verify_replication_status(
                primary,
                REPLICATION_CHANNELS[cluster.name]
            )

    def test_async_failover(self):
        """Test async replication failover"""
        self.setup_replication("async")
        self.write_test_data(self.cluster1, "Initial data")
        time.sleep(2)

        primary1 = self.cluster1.get_primary_node()
        new_primary = self.simulate_node_failure(primary1, self.cluster1)
        self.write_test_data(self.cluster1, "After failover")
        time.sleep(2)

        self.recover_node(primary1, self.cluster1)
        self.verify_cluster_data([self.cluster1, self.cluster2])

    def test_active_active_replication(self):
        """Test bi-directional replication"""
        self.setup_replication("active-active")

        for cluster in [self.cluster1, self.cluster2]:
            self.write_test_data(cluster, "Initial data")

        primary1 = self.cluster1.get_primary_node()
        new_primary = self.simulate_node_failure(primary1, self.cluster1)
        self.write_test_data(self.cluster1, "During failover")
        self.write_test_data(self.cluster2, "During recovery")

        self.recover_node(primary1, self.cluster1)
        self.verify_cluster_data([self.cluster1, self.cluster2])

    def _wait_for_new_primary(
        self,
        cluster: MGRCluster,
        max_wait: int = 30,
        interval: int = 6
    ) -> MGRNode:
        """Wait for new primary election"""
        print("\n⏳ Waiting for new primary election")
        new_primary = None
        old_port = cluster.primary_node.port

        for _ in range(max_wait // interval):
            print(f"\n🔄 Attempt {_+1} to find new primary")
            try:
                new_primary = cluster.get_primary_node(force_check=True)
                if new_primary.port != old_port:
                    cluster.primary_node = new_primary
                    print(f"✅ New primary elected: {new_primary.port}")
                    return new_primary
            except Exception as e:
                time.sleep(interval)
        raise Exception(f"❌ Timed out after {max_wait}s waiting for new primary")

def deploy_clusters(topology: Literal["single-primary", "multi-primary"] = "multi-primary"):
    """Deploy MGR clusters"""
    deployer = ClusterDeployer()
    if topology == "single-primary":
        return deployer.deploy_single_primary_clusters(MGR1_PORTS, MGR2_PORTS)
    return deployer.deploy_multi_primary_clusters(MGR1_PORTS, MGR2_PORTS)

def run_tests(test_type: Literal["async", "active-active"] = "active-active"):
    """Run specified tests"""
    tester = ClusterTester()
    if test_type == "async":
        tester.test_async_failover()
    else:
        tester.test_active_active_replication()

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="MySQL MGR Deployment & Test Tool")
    parser.add_argument("-s", "--deploy-single", action="store_true", help="Deploy single-primary cluster")
    parser.add_argument("-m", "--deploy-multi", action="store_true", help="Deploy multi-primary cluster")
    parser.add_argument("-ss", "--single-async", action="store_true", help="Test async replication")
    parser.add_argument("-sa", "--single-active", action="store_true", help="Test active-active replication")
    parser.add_argument("-aa", "--multi-active", action="store_true", help="Test bi-directional replication")

    args = parser.parse_args()

    if not any(vars(args).values()):
        parser.print_help()
        exit(0)

    # Map CLI args to deployment/test modes
    topology = "single-primary" if args.deploy_single else "multi-primary"
    test_type = None
    if args.single_async:
        test_type = "async"
    elif args.single_active:
        test_type = "active-active"
    elif args.multi_active:
        test_type = "active-active"

    clusters = None
    if test_type:
        try:
            tester = ClusterTester()
            clusters = (tester.cluster1, tester.cluster2)
            print(f"\n=== Switching to {test_type} mode ===")
            for cluster in clusters:
                cluster.switch_mode(to_single_primary=(test_type == "async"))
        except Exception as e:
            print(f"\n⚠️ Failed to switch mode. Re-deploying clusters...")
            clusters = None

    if not clusters:
        print(f"\n=== Deploying {topology} clusters ===")
        clusters = deploy_clusters(topology)

    if test_type:
        print(f"\n=== Running {test_type} tests ===")
        run_tests(test_type)

You will get best features of ChatDBA