Python Script for MySQL Checksum Validation
Operational disaster recovery drills require deterministic, repeatable validation of backup integrity before failover execution. Traditional reliance on mysqlcheck or ad-hoc CHECKSUM TABLE invocations fails to scale across multi-terabyte InnoDB deployments and introduces unpredictable lock contention on standby replicas. A production-grade implementation must decouple validation from blocking DDL, enforce strict manifest reconciliation, and integrate seamlessly into automated orchestration workflows. When deployed within Checksum Validation Pipelines, the script operates as a stateless validation engine that queries live replicas, compares results against cryptographically signed backup manifests, and surfaces divergence before promotion.
Architecture & Execution Model
flowchart TD
A["Start validator"] --> B{"Two JSON args present"}
B -->|"no"| X2["exit code 2 abort pipeline"]
B -->|"yes"| C["Load db config and manifest"]
C --> D["ThreadPoolExecutor over tables"]
D --> E["CHECKSUM TABLE per table"]
E --> F{"Live equals expected"}
F -->|"MISMATCH or ERROR"| X1["exit code 1 halt failover"]
F -->|"all VALID"| X0["exit code 0 proceed promotion"]
Figure. Decision flow of the MySQL checksum validator mapping argument parsing and per table CHECKSUM TABLE comparison to POSIX exit codes 0, 1, and 2.
The validation engine prioritizes non-blocking execution, explicit timeout enforcement, and deterministic error propagation. It uses connection pooling to prevent connection storms during concurrent validation, generator-based table iteration to bound memory consumption, and session-level lock timeouts to prevent replication lag spikes. The design isolates validation logic from orchestration layers, ensuring the script returns strict POSIX exit codes and structured JSON-compatible logs for downstream runbook consumption.
Production Implementation
The following implementation leverages mysql-connector-python with explicit connection pooling, context-managed sessions, and concurrent.futures.ThreadPoolExecutor for bounded parallelism. It reads database credentials and a pre-generated manifest, executes checksum queries against a target replica, and exits with a deterministic status code for CI/CD or DR automation hooks.
import mysql.connector
import json
import logging
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from contextlib import contextmanager
from typing import Dict, List, Tuple
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s"
)
logger = logging.getLogger("mysql_checksum_validator")
class MySQLChecksumValidator:
def __init__(self, db_config: dict, manifest: dict, max_workers: int = 8, lock_wait_timeout: int = 5):
self.db_config = db_config
self.manifest = manifest
self.max_workers = max_workers
self.lock_wait_timeout = lock_wait_timeout
self.pool = mysql.connector.pooling.MySQLConnectionPool(
pool_name="checksum_pool",
pool_size=self.max_workers,
**db_config
)
@contextmanager
def _get_connection(self):
conn = self.pool.get_connection()
try:
cursor = conn.cursor(dictionary=True)
cursor.execute(f"SET SESSION lock_wait_timeout = {self.lock_wait_timeout}")
cursor.execute(f"SET SESSION innodb_lock_wait_timeout = {self.lock_wait_timeout}")
yield cursor
finally:
conn.close()
def _validate_single_table(self, schema: str, table: str) -> dict:
with self._get_connection() as cursor:
try:
cursor.execute(f"CHECKSUM TABLE `{schema}`.`{table}`")
result = cursor.fetchone()
# A nonexistent table returns one row with a NULL Checksum, not no rows.
if result is None or result["Checksum"] is None:
return {"schema": schema, "table": table, "status": "MISSING", "checksum": None}
live_checksum = str(result["Checksum"])
expected_checksum = self.manifest.get(f"{schema}.{table}")
status = "VALID" if live_checksum == expected_checksum else "MISMATCH"
return {"schema": schema, "table": table, "status": status, "checksum": live_checksum}
except mysql.connector.Error as e:
logger.error(f"Validation failed for {schema}.{table}: {e}")
return {"schema": schema, "table": table, "status": "ERROR", "checksum": None, "error": str(e)}
def run_validation(self) -> Tuple[bool, List[dict]]:
results = []
mismatches = []
errors = []
table_list = [(k.split(".")[0], k.split(".")[1]) for k in self.manifest.keys()]
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_table = {
executor.submit(self._validate_single_table, schema, table): f"{schema}.{table}"
for schema, table in table_list
}
for future in as_completed(future_to_table):
res = future.result()
results.append(res)
if res["status"] == "MISMATCH":
mismatches.append(res)
elif res["status"] == "ERROR":
errors.append(res)
return len(mismatches) == 0 and len(errors) == 0, results
def load_json(path: str) -> dict:
with open(path, "r") as f:
return json.load(f)
def main():
if len(sys.argv) < 3:
logger.error("Usage: python mysql_checksum_validator.py <db_config.json> <manifest.json>")
sys.exit(2)
db_config = load_json(sys.argv[1])
manifest = load_json(sys.argv[2])
max_workers = int(db_config.get("max_workers", 8))
validator = MySQLChecksumValidator(db_config["db"], manifest, max_workers)
success, results = validator.run_validation()
for r in results:
log_level = logging.WARNING if r["status"] != "VALID" else logging.INFO
logger.log(log_level, f"{r['schema']}.{r['table']} | Status: {r['status']} | Checksum: {r['checksum']}")
if not success:
logger.critical("Validation failed. Aborting DR promotion.")
sys.exit(1)
logger.info("All checksums validated successfully. Proceeding with failover.")
sys.exit(0)
if __name__ == "__main__":
main()
Orchestration & Runbook Integration
The script is designed for headless execution within automated disaster recovery workflows. It consumes two JSON inputs: a database configuration file and a backup manifest. The configuration must contain connection parameters under the db key, matching the signature expected by mysql.connector.pooling.MySQLConnectionPool.
{
"db": {
"host": "dr-replica-01.internal",
"port": 3306,
"user": "dr_validator",
"password": "${VAULT_DR_PASS}",
"database": "information_schema"
},
"max_workers": 12
}
Exit codes map directly to orchestration logic:
0: All tables match manifest. Proceed to promotion.1: Checksum mismatch or query error detected. Halt failover.2: Missing arguments or malformed configuration. Abort pipeline.
Integrate the validator into your runbook using a wrapper that captures stdout, parses JSON logs, and triggers PagerDuty or Slack webhooks on non-zero exits. Reference the broader Automated Backup Integrity Check Implementation framework for environment variable injection, secret rotation, and audit trail generation.
Manifest Reconciliation & Security
The manifest must be generated during the backup phase, not at validation time. It should map fully qualified table names to their CHECKSUM TABLE output. For cryptographic assurance, sign the manifest using an asymmetric key pair (e.g., Ed25519 or RSA-SHA256) and verify the signature before executing the validation script. This prevents tampering during transit or storage.
{
"app_production.users": "3847291056",
"app_production.orders": "9281736450",
"app_production.sessions": "1029384756"
}
The validation script compares live replica output against this static mapping. Any deviation triggers an immediate MISMATCH status. For tables undergoing active writes on the primary, ensure the manifest captures a consistent snapshot timestamp and that validation targets a replica lagging within acceptable SLO thresholds.
Operational Limits & Hardening
CHECKSUM TABLE performs a full table scan and computes a CRC32 value. While non-blocking for InnoDB under default settings, large partitioned tables or tables with heavy fragmentation can still consume I/O bandwidth. Mitigate impact by:
- Setting
lock_wait_timeoutandinnodb_lock_wait_timeoutto5seconds (as implemented) to prevent thread starvation. - Capping
max_workersto match the replica’sinnodb_thread_concurrencyor available vCPU count. - Excluding volatile staging tables or temporary tables from the manifest.
- Monitoring replica
Seconds_Behind_Masterduring execution. Abort validation if lag exceeds the defined RPO.
The Python concurrent.futures module handles thread scheduling efficiently, but connection pool exhaustion will raise mysql.connector.errors.PoolError. Implement exponential backoff in the orchestration layer if pool limits are reached. For comprehensive reference on MySQL checksum behavior, consult the official MySQL Reference Manual: CHECKSUM TABLE. For thread pool configuration and executor lifecycle management, see the Python Documentation: concurrent.futures.