Validating MySQL MGR Dual-Datacenter Active-Active Architecture Feasibility
In production environments, many enterprises aim to achieve dual-datacenter active-active (双活) deployments to enhance business high availability and disaster recovery capabilities. A recent client inquiry highlighted a common challenge: How to implement MySQL Group Replication (MGR) for dual-datacenter active-active architectures? While MGR supports multi-master modes and asynchronous replication channel failover, its suitability for true dual-active scenarios remains debated. To address this, we developed an automated testing script using cursor
to validate MGR’s feasibility in dual-datacenter setups.
Design Goals
- Validate whether MySQL MGR can achieve dual-active operations in a dual-datacenter environment, ensuring:
- Simultaneous writes in both locations
- Automatic data synchronization
- Fault-triggered automatic failover
- Automate deployment, failover, fault simulation, and data consistency checks to reduce manual intervention and improve testing efficiency.
Key Features
- One-Click Deployment of Dual Clusters
- Supports single-master and multi-master modes.
- Uses
dbdeployer
to rapidly deploy two MGR clusters simulating dual-datacenter environments.
- Flexible Replication Testing
- Configurable unidirectional or bidirectional replication.
- Automatically establishes asynchronous replication channels for dual-active data writes.
- Automated Failover and Recovery
- Simulates primary node failures and automatic election of new primaries.
- Supports node restarts and reintegration into clusters.
- Replicates real-world failure scenarios.
- Data Consistency and Health Checks
- Automatically inserts test data and validates cross-datacenter synchronization.
- Monitors replication channel health in real time.
Testing Conclusions
Through automated testing, the following conclusions were drawn:
- Single-Master Mode: Avoid bidirectional replication. Instead, use MySQL Shell to deploy a ClusterSet architecture with MySQL Router for dual-active setups.
- Multi-Master Mode: If bidirectional replication is enabled, ensure
skip_replica_start
is activated. Otherwise, nodes may fail to rejoin the cluster due to GTID inconsistencies during restarts. - MySQL Shell Limitation: Currently lacks native support for bidirectional replication; rely on ClusterSet instead.
Usage Examples
# Deploy single-master cluster
python mgr_test.py -s
# Deploy multi-master cluster
python mgr_test.py -m
# Run single-master unidirectional replication test
python mgr_test.py -ss
# Run multi-master bidirectional replication test
python mgr_test.py -aa
Source Code Highlights
The script leverages Python for automation, with key components including:
- Cluster Deployment: Automates cluster initialization and topology configuration.
- Replication Setup: Configures asynchronous channels and failover policies.
- Fault Simulation: Tests node failures, restarts, and recovery workflows.
- Data Validation: Ensures consistency across clusters via SQL queries.
#!/usr/bin/env python3
from typing import List, Dict, Literal
import subprocess
import time
from dataclasses import dataclass
from datetime import datetime
import argparse
MYSQL_VERSION = "8.0.40"
REPL_USER = "msandbox"
REPL_PASSWORD = "msandbox"
# MGR cluster port configuration
MGR1_PORTS = [4316, 4317, 4318] # Ports for the first MGR cluster
MGR2_PORTS = [5316, 5317, 5318] # Ports for the second MGR cluster
SANDBOX = "$HOME/workbench/sandboxs"
# Replication channel names
REPLICATION_CHANNELS = {
"mgr1": "async_mgr2_to_mgr1", # Channel for mgr1 cluster
"mgr2": "async_mgr1_to_mgr2" # Channel for mgr2 cluster
}
@dataclass
class MGRNode:
port: int
host: str = '127.0.0.1'
user: str = 'msandbox'
password: str = 'msandbox'
def execute_sql(self, sql: str, vertical: bool = False) -> str:
"""Execute SQL command on the node"""
cmd = f"mysql -h{self.host} -P{self.port} -u{self.user} -p{self.password}"
if not vertical:
cmd += " -NB"
cmd += f" -e \"{sql}\""
try:
result = subprocess.run(
cmd, shell=True, capture_output=True, text=True, check=True
)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
error_msg = e.stderr.strip()
print("\n📋 Latest error log:")
self.show_error_log()
raise Exception(e.stderr)
def show_error_log(self, lines: int = 20):
"""Display recent error logs"""
try:
log_file = (
f"{SANDBOX}/mgr{'1' if self.port < 5000 else '2'}/"
f"node{(self.port % 10) - 5}/data/msandbox.err"
)
cmd = f"tail -n {lines} {log_file} | grep ERROR"
result = subprocess.run(
cmd, shell=True, capture_output=True, text=True, check=True
)
print(result.stdout.strip())
except subprocess.CalledProcessError as e:
print(f"⚠️ Failed to read error log: {e.stderr.strip()}")
def shutdown(self, force: bool = False):
"""Shutdown the MySQL instance"""
if force:
# Kill only MySQL processes
cmd = (
f"ps aux | grep mysql.*{self.port} | grep -v grep | awk '{{print $2}}' | "
"xargs -r kill -9"
)
else:
cmd = (
f"mysqladmin -u{self.user} -p{self.password} -h{self.host} -P{self.port} shutdown"
)
subprocess.run(cmd, shell=True, check=True)
class MGRCluster:
def __init__(
self,
name: str,
ports: List[int],
mysql_version: str = MYSQL_VERSION
):
self.name = name
self.nodes = [MGRNode(port) for port in ports]
self.primary_node = self.nodes[0]
self.mysql_version = mysql_version
self._primary_node_cache = None
self._last_primary_check = 0
self._cache_ttl = 10 # Cache TTL in seconds
def deploy(
self,
topology_mode: Literal["single-primary", "multi-primary"] = "single-primary"
):
"""Deploy MGR cluster"""
# Cleanup existing cluster
try:
print(f"=== Removing cluster {self.name} ===")
cmd = (
f"ps aux | grep {self.name} | grep -v grep | awk '{{print $2}}' | "
"xargs -r kill -9"
)
cmd = f"dbdeployer delete {self.name} --concurrent 2>/dev/null || true"
subprocess.run(cmd, shell=True, check=False)
except:
pass # Ignore cleanup errors
# Deploy new cluster
print(f"=== Deploying cluster {self.name} ===")
cmd = (
f"dbdeployer deploy replication --topology=group -c skip_replica_start=on"
)
if topology_mode == "single-primary":
cmd += " --single-primary"
cmd += (
f" --nodes={len(self.nodes)} --concurrent --port-as-server-id "
f"--base-port={self.nodes[0].port - 1} {self.mysql_version} "
f"--sandbox-directory={self.name} > /dev/null"
)
print(f"{cmd}")
subprocess.run(cmd, shell=True, check=True)
# Clear cache
self._primary_node_cache = None
self._last_primary_check = 0
def get_primary_node(self, force_check: bool = False) -> MGRNode:
"""Get current primary node"""
current_time = time.time()
if not force_check and self._primary_node_cache:
if (current_time - self._last_primary_check) < self._cache_ttl:
return self._primary_node_cache
print(f"\n🔍 Searching for primary node in cluster {self.name}")
for node in self.nodes:
try:
print(f" ⚡️ Checking node {node.port}")
time.sleep(3)
group_status = node.execute_sql("""
SELECT COUNT(*)
FROM performance_schema.replication_group_members
WHERE MEMBER_STATE = 'ONLINE'
""")
print(f" 📊 Online nodes: {group_status}")
if int(group_status) > 0:
members = node.execute_sql("""
SELECT CONCAT(
MEMBER_PORT, ' (', MEMBER_STATE, '/', MEMBER_ROLE, ')'
)
FROM performance_schema.replication_group_members
""")
print(f" 📋 Member statuses: {members.replace('\n', ', ')}")
for check_node in self.nodes:
role = check_node.execute_sql("""
SELECT MEMBER_ROLE
FROM performance_schema.replication_group_members
WHERE MEMBER_ID = @@server_uuid
""")
if role == "PRIMARY":
self.primary_node = check_node
self._primary_node_cache = check_node
self._last_primary_check = current_time
print(f" ✅ Found primary node: {check_node.port}")
return check_node
break # No need to check further if online but no primary found
except subprocess.CalledProcessError as e:
print(f" ❌ Node {node.port} check failed")
continue
# Clear cache on failure
self._primary_node_cache = None
self._last_primary_check = 0
raise Exception(f"❗ Cluster {self.name} has no primary node (election in progress)")
def init_test_schema(self):
"""Initialize test database/schema"""
primary = self.get_primary_node()
primary.execute_sql("""
CREATE DATABASE IF NOT EXISTS mgr_test;
CREATE TABLE IF NOT EXISTS mgr_test.events (
id INT AUTO_INCREMENT PRIMARY KEY,
event_time DATETIME,
cluster_name VARCHAR(10),
event_type VARCHAR(20),
event_data VARCHAR(100)
);
""")
def switch_mode(self, to_single_primary: bool = True):
"""Switch between single-primary/multi-primary mode"""
primary = self.get_primary_node()
mode = "single-primary" if to_single_primary else "multi-primary"
print(f"\n🔄 Switching {self.name} to {mode} mode")
try:
mode_sql = "SINGLE_PRIMARY" if to_single_primary else "MULTI_PRIMARY"
primary.execute_sql(f"""
SELECT group_replication_switch_to_{mode_sql.lower()}_mode()
""")
# Clear cache after mode switch
self._primary_node_cache = None
self._last_primary_check = 0
print(f"✅ {self.name} mode switched to {mode}")
except Exception as e:
print(f"❌ Switch failed: {str(e)}")
raise
class ReplicationManager:
def __init__(self, source_cluster: MGRCluster, target_cluster: MGRCluster):
self.source = source_cluster
self.target = target_cluster
def setup_replication(
self,
source: MGRNode,
target: MGRNode,
channel: str = "",
auto_failover: bool = True
):
"""Configure replication channel"""
channel_clause = f" FOR CHANNEL '{channel}'" if channel else ""
auto_failover_clause = (
",\n SOURCE_CONNECTION_AUTO_FAILOVER=1"
if auto_failover
else ""
)
target.execute_sql(f"""
CHANGE REPLICATION SOURCE TO
SOURCE_HOST='{source.host}',
SOURCE_PORT={source.port},
SOURCE_USER='{REPL_USER}',
SOURCE_PASSWORD='{REPL_PASSWORD}',
SOURCE_CONNECT_RETRY=3,
SOURCE_AUTO_POSITION=1{auto_failover_clause}
{channel_clause};
START REPLICA {channel_clause};
""")
def setup_async_replication(self):
"""Setup async replication between clusters"""
source = self.source.get_primary_node()
target = self.target.get_primary_node()
self.setup_replication(
source,
target,
REPLICATION_CHANNELS[self.target.name],
auto_failover=False
)
def setup_active_active_replication(self):
"""Setup bi-directional async replication"""
source_primary = self.source.get_primary_node()
target_primary = self.target.get_primary_node()
self.setup_replication(
source_primary,
target_primary,
REPLICATION_CHANNELS[self.target.name],
auto_failover=False
)
self.setup_replication(
target_primary,
source_primary,
REPLICATION_CHANNELS[self.source.name],
auto_failover=False
)
# Deployment utilities
class ClusterDeployer:
def __init__(self, mysql_version: str = MYSQL_VERSION):
self.mysql_version = mysql_version
def deploy_single_primary_clusters(
self,
cluster1_ports: List[int],
cluster2_ports: List[int]
) -> tuple[MGRCluster, MGRCluster]:
"""Deploy two single-primary clusters"""
cluster1 = MGRCluster("mgr1", cluster1_ports, self.mysql_version)
cluster2 = MGRCluster("mgr2", cluster2_ports, self.mysql_version)
print("\n🚀 Deploying test clusters")
cluster1.deploy("single-primary")
cluster2.deploy("single-primary")
print("\n📝 Initializing test schema")
cluster1.init_test_schema()
cluster2.init_test_schema()
return cluster1, cluster2
def deploy_multi_primary_clusters(
self,
cluster1_ports: List[int],
cluster2_ports: List[int]
) -> tuple[MGRCluster, MGRCluster]:
"""Deploy two multi-primary clusters"""
cluster1 = MGRCluster("mgr1", cluster1_ports, self.mysql_version)
cluster2 = MGRCluster("mgr2", cluster2_ports, self.mysql_version)
print("\n🚀 Deploying multi-primary clusters")
cluster1.deploy("multi-primary")
cluster2.deploy("multi-primary")
print("\n📝 Initializing test schema")
cluster1.init_test_schema()
cluster2.init_test_schema()
return cluster1, cluster2
# Testing utilities
class ClusterTester:
def __init__(
self,
cluster1: MGRCluster = None,
cluster2: MGRCluster = None
):
self.cluster1 = cluster1
self.cluster2 = cluster2
def _check_clusters(self):
"""Check if clusters are deployed"""
try:
cmd = "dbdeployer sandboxes --header | grep -E 'mgr[12]'"
result = subprocess.run(
cmd, shell=True, capture_output=True, text=True, check=True
)
if result.stdout.strip():
self.cluster1 = MGRCluster("mgr1", MGR1_PORTS)
self.cluster2 = MGRCluster("mgr2", MGR2_PORTS)
return
except subprocess.CalledProcessError:
pass
raise Exception("❌ No clusters found. Please deploy first.")
@staticmethod
def insert_test_data(
node: MGRNode,
cluster_name: str,
event_type: str,
data: str
):
"""Insert test data into events table"""
node.execute_sql(f"""
INSERT INTO mgr_test.events (
event_time, cluster_name, event_type, event_data
) VALUES (
NOW(), '{cluster_name}', '{event_type}', '{data}'
)
""")
@staticmethod
def verify_data_sync(node: MGRNode):
"""Verify data synchronization"""
result = node.execute_sql("""
SELECT CONCAT(
'Time: ', event_time, ', Cluster: ', cluster_name,
', Type: ', event_type, ', Data: ', event_data
) FROM mgr_test.events ORDER BY event_time
""")
print(f"\n📊 Data on node {node.port}:\n{result}")
@staticmethod
def verify_replication_status(node: MGRNode, channel: str = ""):
"""Check replication channel status"""
print(f"\n📡 Checking replication status for channel '{channel}'")
cmd = (
f"mysql -h{node.host} -P{node.port} -u{node.user} -p{node.password} "
f"-e \"SHOW REPLICA STATUS FOR CHANNEL '{channel}'\\G\" 2>/dev/null | "
"grep -E '(Source_Port|Replica_IO_Running|Replica_SQL_Running|Seconds_Behind_Source|Chna)'"
)
try:
result = subprocess.run(
cmd, shell=True, capture_output=True, text=True, check=True
)
print(result.stdout)
except subprocess.CalledProcessError as e:
print(f"❌ Replication check failed: {e.stderr or 'Channel not found'}")
raise
def setup_replication(self, mode: Literal["async", "active-active"]):
"""Configure replication between clusters"""
self._check_clusters()
repl_mgr = ReplicationManager(self.cluster1, self.cluster2)
mode_name = "Async" if mode == "async" else "Bi-Directional"
print(f"\n🔄 Setting up {mode_name} replication")
if mode == "async":
repl_mgr.setup_async_replication()
self.verify_replication_status(
self.cluster2.get_primary_node(),
REPLICATION_CHANNELS[self.cluster2.name]
)
else:
repl_mgr.setup_active_active_replication()
for cluster in [self.cluster1, self.cluster2]:
self.verify_replication_status(
cluster.get_primary_node(),
REPLICATION_CHANNELS[cluster.name]
)
def write_test_data(self, cluster: MGRCluster, event_type: str = ""):
"""Write test data to primary node"""
primary = cluster.get_primary_node()
print(f"\n📝 Writing test data to {cluster.name} primary {primary.port}")
self.insert_test_data(
primary,
cluster.name,
event_type,
f"Data from node {primary.port}"
)
time.sleep(1) # Add delay for visibility
def simulate_node_failure(self, node: MGRNode, cluster: MGRCluster):
"""Simulate node failure"""
print(f"\n💥 Simulating failure: Shutting down node {node.port}")
node.shutdown(force=True)
if node.port == cluster.primary_node.port:
return self._wait_for_new_primary(cluster)
return cluster.get_primary_node()
def recover_node(self, node: MGRNode, cluster: MGRCluster):
"""Recover a failed node"""
print(f"\n🔄 Restarting node {node.port} and rejoining cluster")
node_index = cluster.nodes.index(node) + 1
subprocess.run(
f"{SANDBOX}/{cluster.name}/node{node_index}/start",
shell=True,
check=True
)
time.sleep(2)
print(f"\n🔄 Rejoining cluster: START GROUP_REPLICATION")
node.execute_sql("START GROUP_REPLICATION;")
print(f"\n🔄 Enabling replication: START REPLICA")
node.execute_sql("START REPLICA;")
time.sleep(10)
def verify_cluster_data(self, clusters: list[MGRCluster]):
"""Verify data consistency across clusters"""
print("\n🔍 Verifying data synchronization")
time.sleep(5)
for cluster in clusters:
primary = cluster.get_primary_node()
self.verify_data_sync(primary)
self.verify_replication_status(
primary,
REPLICATION_CHANNELS[cluster.name]
)
def test_async_failover(self):
"""Test async replication failover"""
self.setup_replication("async")
self.write_test_data(self.cluster1, "Initial data")
time.sleep(2)
primary1 = self.cluster1.get_primary_node()
new_primary = self.simulate_node_failure(primary1, self.cluster1)
self.write_test_data(self.cluster1, "After failover")
time.sleep(2)
self.recover_node(primary1, self.cluster1)
self.verify_cluster_data([self.cluster1, self.cluster2])
def test_active_active_replication(self):
"""Test bi-directional replication"""
self.setup_replication("active-active")
for cluster in [self.cluster1, self.cluster2]:
self.write_test_data(cluster, "Initial data")
primary1 = self.cluster1.get_primary_node()
new_primary = self.simulate_node_failure(primary1, self.cluster1)
self.write_test_data(self.cluster1, "During failover")
self.write_test_data(self.cluster2, "During recovery")
self.recover_node(primary1, self.cluster1)
self.verify_cluster_data([self.cluster1, self.cluster2])
def _wait_for_new_primary(
self,
cluster: MGRCluster,
max_wait: int = 30,
interval: int = 6
) -> MGRNode:
"""Wait for new primary election"""
print("\n⏳ Waiting for new primary election")
new_primary = None
old_port = cluster.primary_node.port
for _ in range(max_wait // interval):
print(f"\n🔄 Attempt {_+1} to find new primary")
try:
new_primary = cluster.get_primary_node(force_check=True)
if new_primary.port != old_port:
cluster.primary_node = new_primary
print(f"✅ New primary elected: {new_primary.port}")
return new_primary
except Exception as e:
time.sleep(interval)
raise Exception(f"❌ Timed out after {max_wait}s waiting for new primary")
def deploy_clusters(topology: Literal["single-primary", "multi-primary"] = "multi-primary"):
"""Deploy MGR clusters"""
deployer = ClusterDeployer()
if topology == "single-primary":
return deployer.deploy_single_primary_clusters(MGR1_PORTS, MGR2_PORTS)
return deployer.deploy_multi_primary_clusters(MGR1_PORTS, MGR2_PORTS)
def run_tests(test_type: Literal["async", "active-active"] = "active-active"):
"""Run specified tests"""
tester = ClusterTester()
if test_type == "async":
tester.test_async_failover()
else:
tester.test_active_active_replication()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="MySQL MGR Deployment & Test Tool")
parser.add_argument("-s", "--deploy-single", action="store_true", help="Deploy single-primary cluster")
parser.add_argument("-m", "--deploy-multi", action="store_true", help="Deploy multi-primary cluster")
parser.add_argument("-ss", "--single-async", action="store_true", help="Test async replication")
parser.add_argument("-sa", "--single-active", action="store_true", help="Test active-active replication")
parser.add_argument("-aa", "--multi-active", action="store_true", help="Test bi-directional replication")
args = parser.parse_args()
if not any(vars(args).values()):
parser.print_help()
exit(0)
# Map CLI args to deployment/test modes
topology = "single-primary" if args.deploy_single else "multi-primary"
test_type = None
if args.single_async:
test_type = "async"
elif args.single_active:
test_type = "active-active"
elif args.multi_active:
test_type = "active-active"
clusters = None
if test_type:
try:
tester = ClusterTester()
clusters = (tester.cluster1, tester.cluster2)
print(f"\n=== Switching to {test_type} mode ===")
for cluster in clusters:
cluster.switch_mode(to_single_primary=(test_type == "async"))
except Exception as e:
print(f"\n⚠️ Failed to switch mode. Re-deploying clusters...")
clusters = None
if not clusters:
print(f"\n=== Deploying {topology} clusters ===")
clusters = deploy_clusters(topology)
if test_type:
print(f"\n=== Running {test_type} tests ===")
run_tests(test_type)