找回密码
 立即注册
首页 业界区 业界 1分钟搭建 Redis三主三从集群!附完整自动化脚本(直接 ...

1分钟搭建 Redis三主三从集群!附完整自动化脚本(直接复制可用)

巫雪艷 2026-2-4 18:00:09
[code]#!/usr/bin/env python3# -*- coding: utf-8 -*-"""Redis Cluster三主三从自动化部署脚本适配系统:CentOS 7.x 64位(最小化安装即可)Redis版本:6.2.6(稳定版,生产环境推荐)部署模式:单机三主三从(可扩展多机,脚本内有标注)作者:Redis自动化部署工具版本:2.0(优化版)注意:需以root权限运行,单机部署需保证服务器内存≥4G"""import osimport sysimport subprocessimport shutilimport argparseimport socketimport timefrom pathlib import Path# 彩色输出配置(提升可读性,兼容CentOS7终端)class Color:    RED = '\033[31m'    GREEN = '\033[32m'    YELLOW = '\033[33m'    BLUE = '\033[34m'    RESET = '\033[0m'# 全局常量(便于后续修改配置)REDIS_VERSION = "6.2.6"REDIS_URL = f"https://download.redis.io/releases/redis-{REDIS_VERSION}.tar.gz"REDIS_USER = "redis"DEFAULT_PASSWORD = "RedisPass@123"INSTALL_DIR = "/opt/redis"MASTER_PORTS = [7000, 7001, 7002]  # 主节点端口SLAVE_PORTS = [7003, 7004, 7005]   # 从节点端口ALL_PORTS = MASTER_PORTS + SLAVE_PORTSBUS_PORTS = [p + 10000 for p in ALL_PORTS]  # 集群总线端口(固定+10000)class RedisClusterDeployer:    def __init__(self, args):        self.args = args        self.redis_password = args.password or DEFAULT_PASSWORD        # 节点IP:指定则用指定的,未指定则获取本机非回环IP        self.node_ips = self.args.nodes.split(',') if self.args.nodes else self.get_local_ips()        # 本机IP(集群公告用,避免使用127.0.0.1)        self.local_ip = self.get_local_ips()[0]    def echo(self, msg, color=Color.BLUE):        """彩色打印输出"""        print(f"{color}{msg}{Color.RESET}")    def success(self, msg):        """成功信息打印"""        self.echo(f"✓ {msg}", Color.GREEN)    def error(self, msg):        """错误信息打印"""        self.echo(f"✗ {msg}", Color.RED)    def warn(self, msg):        """警告信息打印"""        self.echo(f"⚠ {msg}", Color.YELLOW)    def get_local_ips(self):        """获取本机非回环、非docker的IPv4地址"""        try:            ip_list = []            for iface in socket.if_nameindex():                if iface[1] in ['lo', 'docker0']:                    continue                sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)                try:                    sock.bind((iface[1], 0))                    ip = sock.getsockname()[0]                    ip_list.append(ip)                except:                    continue                finally:                    sock.close()            # 若未获取到,用公网检测方式兜底            if not ip_list:                s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)                s.connect(('8.8.8.8', 80))                ip_list = [s.getsockname()[0]]                s.close()            return ip_list        except Exception as e:            self.error(f"获取本机IP失败:{e}")            return ['127.0.0.1']    def check_port_used(self, port):        """检查端口是否被占用"""        try:            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:                s.settimeout(1)                return s.connect_ex(('0.0.0.0', port)) == 0        except:            return True    def run_command(self, cmd, shell=True, check=True, capture_output=False):        """执行Shell命令,带异常捕获和结果返回"""        self.echo(f"执行命令:{cmd}")        try:            if capture_output:                result = subprocess.run(                    cmd, shell=shell, check=check,                    stdout=subprocess.PIPE, stderr=subprocess.PIPE,                    universal_newlines=True, encoding='utf-8'                )                return result.stdout.strip(), result.stderr.strip()            else:                subprocess.run(cmd, shell=shell, check=check)            return True, ""        except subprocess.CalledProcessError as e:            err_msg = f"命令执行失败:{e.stderr.strip() if e.stderr else str(e)}"            self.error(err_msg)            if not self.args.force:                self.error("非强制模式,退出部署")                sys.exit(1)            self.warn("强制模式,忽略错误继续执行")            return False, err_msg        except Exception as e:            err_msg = f"命令执行异常:{str(e)}"            self.error(err_msg)            if not self.args.force:                sys.exit(1)            self.warn("强制模式,忽略错误继续执行")            return False, err_msg    def pre_check(self):        """部署前前置校验(关键,避免后续失败)"""        self.echo("="*60, Color.BLUE)        self.echo("开始部署前前置校验...", Color.BLUE)        # 1. 检查是否为root        if os.geteuid() != 0:            self.error("必须以root/管理员权限运行脚本!")            self.echo("请执行:sudo python3 redis_cluster_3m3s.py", Color.YELLOW)            sys.exit(1)        # 2. 检查系统是否为CentOS7        if not os.path.exists('/etc/redhat-release'):            self.error("当前系统非CentOS/RHEL系列,不支持部署!")            sys.exit(1)        with open('/etc/redhat-release', 'r') as f:            os_ver = f.read().strip()        if '7.' not in os_ver:            self.error(f"当前系统为{os_ver},仅支持CentOS 7.x!")            sys.exit(1)        # 3. 检查端口是否被占用        for port in ALL_PORTS + BUS_PORTS:            if self.check_port_used(port):                self.error(f"端口{port}已被占用,部署失败!")                self.echo(f"请执行:netstat -tulnp | grep {port} 查看占用进程并关闭", Color.YELLOW)                sys.exit(1)        # 4. 检查内存是否≥4G(单机三主三从最小要求)        mem_total = os.popen('free -g | grep Mem | awk \'{print $2}\'').read().strip()        if int(mem_total) < 4:            self.warn(f"当前服务器内存为{mem_total}G,建议≥4G,可能影响Redis运行!")            confirm = input("是否继续部署?(yes/no):")            if confirm.lower() != 'yes':                self.echo("用户取消部署", Color.BLUE)                sys.exit(0)        self.success("所有前置校验通过!")    def install_dependencies(self):        """安装系统编译和运行依赖"""        self.echo("="*60, Color.BLUE)        self.echo("开始安装系统依赖...", Color.BLUE)        deps = ['gcc', 'tcl', 'make', 'wget', 'tar', 'systemd-devel', 'net-tools', 'lsof']        if self.args.offline:            self.warn("离线模式:请确保依赖包已上传至 /tmp/rpms/ 目录!")            if not os.path.exists('/tmp/rpms/') or len(os.listdir('/tmp/rpms/')) == 0:                self.error("/tmp/rpms/ 目录为空,离线安装失败!")                sys.exit(1)            self.run_command('yum localinstall -y /tmp/rpms/*.rpm --nogpgcheck')        else:            self.run_command('yum install -y epel-release')            self.run_command(f'yum install -y {" ".join(deps)}')        # 创建redis系统用户(无登录权限,更安全)        self.run_command(f'id {REDIS_USER} &>/dev/null || useradd -r -s /sbin/nologin -M {REDIS_USER}')        self.success("系统依赖安装完成!")    def download_redis(self):        """下载并编译安装Redis"""        self.echo("="*60, Color.BLUE)        self.echo(f"开始下载并编译Redis {REDIS_VERSION}...", Color.BLUE)        redis_tar = f"/tmp/redis-{REDIS_VERSION}.tar.gz"        redis_src = f"/tmp/redis-{REDIS_VERSION}"        # 下载Redis安装包        if not os.path.exists(redis_tar):            if self.args.offline:                self.warn(f"离线模式:请确保Redis包已上传至 {redis_tar}!")                if not os.path.exists(redis_tar):                    self.error(f"Redis安装包{redis_tar}不存在!")                    sys.exit(1)            else:                self.run_command(f'wget {REDIS_URL} -O {redis_tar} --no-check-certificate')        # 解压源码        if os.path.exists(redis_src):            shutil.rmtree(redis_src)        self.run_command(f'tar -xzf {redis_tar} -C /tmp/')        # 编译安装(指定CC=gcc,避免编译错误)        os.chdir(redis_src)        self.run_command('make clean')        self.run_command('make CC=gcc -j $(nproc)')  # 多核编译,提升速度        self.run_command('make install PREFIX=/usr/local/redis')        # 创建安装目录和软链接        self.run_command(f'mkdir -p {INSTALL_DIR}')        if os.path.exists(f'{INSTALL_DIR}/redis-{REDIS_VERSION}'):            shutil.rmtree(f'{INSTALL_DIR}/redis-{REDIS_VERSION}')        shutil.copytree(redis_src, f'{INSTALL_DIR}/redis-{REDIS_VERSION}')        self.run_command('ln -sf /usr/local/redis/bin/* /usr/local/bin/')        self.success(f"Redis {REDIS_VERSION} 编译安装完成!")    def setup_environment(self):        """设置Redis环境变量,确保全局可用"""        self.echo("="*60, Color.BLUE)        self.echo("开始设置Redis环境变量...", Color.BLUE)        env_file = '/etc/profile.d/redis.sh'        env_content = f"""# Redis Cluster 环境变量配置export REDIS_VERSION={REDIS_VERSION}export REDIS_HOME={INSTALL_DIR}/redis-{REDIS_VERSION}export PATH=$REDIS_HOME/srcPATH:/usr/local/redis/bin"""        with open(env_file, 'w', encoding='utf-8') as f:            f.write(env_content)        # 使环境变量立即生效        os.system('source /etc/profile.d/redis.sh')        # 验证Redis是否安装成功        redis_cli, err = self.run_command('redis-cli --version', capture_output=True)        if redis_cli:            self.success(f"Redis版本验证成功:{redis_cli}")        else:            self.error(f"Redis版本验证失败:{err}")            self.warn("请手动执行 source /etc/profile 使环境变量生效")        self.success("Redis环境变量设置完成!")    def create_cluster_configs(self):        """为每个节点创建独立的Redis配置文件"""        self.echo("="*60, Color.BLUE)        self.echo("开始创建Redis集群节点配置...", Color.BLUE)        for port in ALL_PORTS:            # 定义各目录(每个端口独立目录,避免冲突)            conf_dir = f"/etc/redis/{port}"            data_dir = f"/data/redis/{port}"            log_dir = f"/var/log/redis"            pid_dir = f"/var/run/redis"            # 创建目录并设置权限            for dir_path in [conf_dir, data_dir, log_dir, pid_dir]:                self.run_command(f'mkdir -p {dir_path}')                self.run_command(f'chown -R {REDIS_USER}:{REDIS_USER} {dir_path}')                self.run_command(f'chmod 750 {dir_path}')            # 节点类型            node_type = "主节点" if port in MASTER_PORTS else "从节点"            # Redis核心配置(生产环境优化版)            redis_conf = f"""# Redis Cluster {node_type} 配置文件 - 端口{port}# 基础网络配置bind 0.0.0.0port {port}protected-mode no  # 关闭保护模式,集群模式必须关闭tcp-backlog 511timeout 0tcp-keepalive 300daemonize yes  # 后台运行pidfile {pid_dir}/redis_{port}.pidlogfile {log_dir}/redis_{port}.logloglevel notice# 数据存储配置dir {data_dir}dbfilename dump_{port}.rdbrdbcompression yesrdbchecksum yesstop-writes-on-bgsave-error yes# AOF持久化(集群推荐开启)appendonly yesappendfilename "appendonly_{port}.aof"appendfsync everysecno-appendfsync-on-rewrite noauto-aof-rewrite-percentage 100auto-aof-rewrite-min-size 64mb# 集群核心配置cluster-enabled yescluster-config-file {conf_dir}/nodes_{port}.confcluster-node-timeout 15000cluster-announce-ip {self.local_ip}  # 公告本机真实IP,集群通信必备cluster-announce-port {port}cluster-announce-bus-port {port+10000}# 安全认证配置requirepass {self.redis_password}  # 访问密码masterauth {self.redis_password}  # 从节点连接主节点的密码# 内存限制(根据服务器内存调整,单机3主3从各分配2G)maxmemory 2gbmaxmemory-policy allkeys-lru  # 内存满时淘汰策略maxmemory-samples 5# 复制配置replica-serve-stale-data yesreplica-read-only yesrepl-diskless-sync norepl-diskless-sync-delay 5repl-ping-replica-period 10repl-timeout 60# 性能优化hz 10dynamic-hz yesaof-rewrite-incremental-fsync yes"""            # 写入配置文件            conf_file = f"{conf_dir}/redis_{port}.conf"            with open(conf_file, 'w', encoding='utf-8') as f:                f.write(redis_conf)            # 设置配置文件权限            self.run_command(f'chown {REDIS_USER}:{REDIS_USER} {conf_file}')            self.run_command(f'chmod 640 {conf_file}')            self.success(f"端口{port} {node_type} 配置文件创建完成")        # 创建systemd服务文件        self.create_systemd_service()        self.success("所有Redis集群节点配置创建完成!")    def create_systemd_service(self):        """创建systemd服务文件,便于系统管理Redis节点"""        self.echo("开始创建Redis systemd服务文件...", Color.BLUE)        # 服务模板(使用@参数化,适配不同端口)        service_tpl = f"""[Unit]Description=Redis Cluster Node %iAfter=network.target network-online.targetWants=network-online.target[Service]Type=forkingUser={REDIS_USER}Group={REDIS_USER}# 解决文件句柄不足问题(CentOS7常见坑)LimitNOFILE=65535LimitNPROC=65535LimitCORE=infinity# 启动/停止命令ExecStart=/usr/local/bin/redis-server /etc/redis/%%i/redis_%%i.confExecStop=/usr/local/bin/redis-cli -p %%i -a {self.redis_password} shutdownExecReload=/usr/local/bin/redis-cli -p %%i -a {self.redis_password} config rewrite# 重启策略Restart=on-failureRestartSec=3TimeoutSec=300[Install]WantedBy=multi-user.target"""        # 为每个端口创建服务文件        for port in ALL_PORTS:            service_file = f"/etc/systemd/system/redis-cluster@{port}.service"            with open(service_file, 'w', encoding='utf-8') as f:                f.write(service_tpl)            self.run_command(f'chmod 644 {service_file}')        # 创建集群批量管理脚本(一键启停/查看状态)        self.create_manage_scripts()        self.success("Redis systemd服务文件创建完成!")    def create_manage_scripts(self):        """创建集群批量管理脚本,提升运维效率"""        # 一键启动        start_script = """#!/bin/bash# Redis Cluster 批量启动脚本for port in 7000 7001 7002 7003 7004 7005; do    systemctl start redis-cluster@$port    echo "启动Redis节点 $port ..."    sleep 0.5doneecho -e "\\033[32m所有Redis集群节点启动完成!\\033[0m""""        # 一键停止        stop_script = """#!/bin/bash# Redis Cluster 批量停止脚本for port in 7000 7001 7002 7003 7004 7005; do    systemctl stop redis-cluster@$port    echo "停止Redis节点 $port ..."doneecho -e "\\033[32m所有Redis集群节点停止完成!\\033[0m""""        # 一键查看状态        status_script = """#!/bin/bash# Redis Cluster 批量状态查看脚本for port in 7000 7001 7002 7003 7004 7005; do    echo -e "\\033[34m========== Redis节点 $port 状态 ==========\\033[0m"    systemctl status redis-cluster@$port --no-pager -l    echo ""done"""        # 一键重启        restart_script = """#!/bin/bash# Redis Cluster 批量重启脚本for port in 7000 7001 7002 7003 7004 7005; do    systemctl restart redis-cluster@$port    echo "重启Redis节点 $port ..."    sleep 0.5doneecho -e "\\033[32m所有Redis集群节点重启完成!\\033[0m""""        # 脚本路径和内容映射        scripts = {            '/usr/local/bin/redis-cluster-start': start_script,            '/usr/local/bin/redis-cluster-stop': stop_script,            '/usr/local/bin/redis-cluster-status': status_script,            '/usr/local/bin/redis-cluster-restart': restart_script        }        # 写入并赋予执行权限        for path, content in scripts.items():            with open(path, 'w', encoding='utf-8') as f:                f.write(content)            self.run_command(f'chmod +x {path}')        self.success("Redis集群批量管理脚本创建完成!")    def configure_firewall(self):        """配置firewalld防火墙,开放Redis数据端口和集群总线端口"""        self.echo("="*60, Color.BLUE)        self.echo("开始配置防火墙规则...", Color.BLUE)        # 检查firewalld是否运行        is_active, err = self.run_command('systemctl is-active firewalld', capture_output=True)        if is_active not in ['active', 'running']:            self.warn("firewalld服务未运行,跳过防火墙配置!")            self.warn("若开启防火墙,请手动开放端口7000-7005、17000-17005/tcp")            return        # 开放端口(批量开放,减少命令执行次数)        self.run_command('firewall-cmd --permanent --add-port=7000-7005/tcp')        self.run_command('firewall-cmd --permanent --add-port=17000-17005/tcp')        self.run_command('firewall-cmd --reload')        # 验证端口是否开放        check, err = self.run_command('firewall-cmd --list-ports', capture_output=True)        self.echo(f"当前开放的端口:{check}", Color.BLUE)        self.success("防火墙规则配置完成!")    def start_redis_nodes(self):        """启动所有Redis节点并验证运行状态"""        self.echo("="*60, Color.BLUE)        self.echo("开始启动Redis集群节点...", Color.BLUE)        # 重新加载systemd配置        self.run_command('systemctl daemon-reload')        # 启动并设置开机自启        for port in ALL_PORTS:            self.run_command(f'systemctl enable --now redis-cluster@{port}')            time.sleep(0.5)        # 等待节点完全启动(至少5秒)        self.echo("等待Redis节点初始化...(5秒)", Color.BLUE)        time.sleep(5)        # 验证每个节点的运行状态        self.echo("开始验证Redis节点状态...", Color.BLUE)        fail_ports = []        for port in ALL_PORTS:            ping, err = self.run_command(f'redis-cli -p {port} -a {self.redis_password} ping', capture_output=True)            if ping == 'PONG':                self.success(f"节点{port}:运行正常")            else:                self.error(f"节点{port}:启动失败,错误:{err}")                fail_ports.append(port)        # 若有节点启动失败,给出解决提示        if fail_ports:            self.error(f"以下节点启动失败:{fail_ports}")            self.echo("解决方法:1. 查看日志:tail -f /var/log/redis/redis_{port}.log", Color.YELLOW)            self.echo("2. 手动重启:systemctl restart redis-cluster@{port}", Color.YELLOW)            if not self.args.force:                sys.exit(1)        self.success("Redis集群节点启动完成!")    def create_cluster(self):        """创建Redis Cluster三主三从集群(--cluster-replicas 1 表示1主1从)"""        self.echo("="*60, Color.BLUE)        self.echo("开始创建Redis三主三从集群...", Color.BLUE)        # 单机部署(核心场景):拼接所有节点地址        if len(self.node_ips) == 1:            node_list = [f"{self.local_ip}:{port}" for port in ALL_PORTS]            cluster_cmd = (                f'redis-cli --cluster create {" ".join(node_list)} '                f'--cluster-replicas 1 -a {self.redis_password} --cluster-yes'            )            # 执行集群创建命令            result, err = self.run_command(cluster_cmd, capture_output=True)            if 'OK' in result or 'cluster created correctly' in result:                self.success("Redis三主三从集群创建成功!")            else:                self.error(f"集群创建失败,错误:{err}")                self.echo("手动执行集群创建命令:\n" + cluster_cmd, Color.YELLOW)                if not self.args.force:                    sys.exit(1)        # 多机部署(扩展场景):给出手动配置提示(需先在各节点部署单节点,再拼接)        else:            self.warn("检测到多节点IP,脚本暂不支持自动多机部署!")            self.echo("多机部署步骤:", Color.BLUE)            self.echo("1. 在每个节点执行本脚本,添加--skip-cluster 参数(仅部署节点,不创建集群)", Color.YELLOW)            self.echo("2. 在任意节点执行:redis-cli --cluster create 节点1:7000 节点1:7001 ... 节点3:7005 --cluster-replicas 1 -a 密码 --cluster-yes", Color.YELLOW)        time.sleep(3)  # 等待集群状态稳定    def verify_cluster(self):        """验证集群健康状态和读写功能"""        self.echo("="*60, Color.BLUE)        self.echo("开始验证Redis集群健康状态...", Color.BLUE)        # 1. 检查集群整体状态        check_cmd = f'redis-cli -p 7000 -a {self.redis_password} --cluster check {self.local_ip}:7000'        cluster_status, err = self.run_command(check_cmd, capture_output=True)        self.echo("集群状态检查结果:", Color.BLUE)        print(cluster_status)        # 2. 检查集群信息        info_cmd = f'redis-cli -p 7000 -a {self.redis_password} cluster info'        info, err = self.run_command(info_cmd, capture_output=True)        self.echo("\n集群核心信息:", Color.BLUE)        print(info)        # 3. 测试集群读写(-c 表示集群模式,自动重定向)        self.echo("\n开始测试集群读写功能...", Color.BLUE)        set_cmd = f'redis-cli -p 7000 -a {self.redis_password} -c set cluster_test "Hello Redis Cluster 3m3s"'        self.run_command(set_cmd)        get_cmd = f'redis-cli -p 7000 -a {self.redis_password} -c get cluster_test'        get_result, err = self.run_command(get_cmd, capture_output=True)        if get_result == 'Hello Redis Cluster 3m3s':            self.success("集群读写测试成功!")        else:            self.error(f"集群读写测试失败,结果:{get_result},错误:{err}")        self.success("Redis集群状态验证完成!")    def cleanup(self):        """清理部署过程中的临时文件,释放磁盘空间"""        self.echo("="*60, Color.BLUE)        self.echo("开始清理部署临时文件...", Color.BLUE)        # 清理Redis源码和压缩包        redis_tar = f"/tmp/redis-{REDIS_VERSION}.tar.gz"        redis_src = f"/tmp/redis-{REDIS_VERSION}"        for f in [redis_tar, redis_src]:            if os.path.exists(f):                if os.path.isfile(f):                    os.remove(f)                else:                    shutil.rmtree(f)                self.echo(f"清理临时文件:{f}")        self.success("临时文件清理完成!")    def print_deploy_summary(self):        """打印部署完成摘要,包含所有运维信息"""        self.echo("\n" + "="*80, Color.GREEN)        self.echo("
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

2026-2-10 05:05:12

举报

2026-2-10 19:21:40

举报

12 小时前

举报

您需要登录后才可以回帖 登录 | 立即注册