找回密码
 立即注册
首页 业界区 安全 PostgreSQL主从流复制状态监控和自动故障转移的轻量级实 ...

PostgreSQL主从流复制状态监控和自动故障转移的轻量级实现

髡芯 2025-6-10 14:42:58
对于PostgreSQL的高可用,之前研究过repmgr以及pg_auto_failover,起作用都是起到主节点故障时,实现“自动故障转移”的目的。
但是repmgr以及pg_auto_failover得缺点是对数据库侵入过多,需要在被监控的数据库内部进行一系列的配置操作,甚至需要专用的服务器作为仲裁方,同时需要启动第三方服务实现节点的可用性监控,这又引入了额外的不确定性因素。
如果是单纯地为了故障转移,repmgr以及pg_auto_failover都显得过于沉重,于是快速尝试了一个轻量级的实现方案
实现思路

主要思路如下:
  1. 1,自动故障转移:该方式在正常情况下,作为一个服务持续监控主从复制状态
  2. while 1:
  3.         轮训PostgreSQL流复制的状态:
  4.         if 如果主节点不可达:
  5.             if 如果从节点可达:
  6.             if 从节点状态是in_recovery:
  7.                             promote 从节点,从节点接管读写服务
  8.     sleep(n)
  9.             
  10.        
  11. 2,手动故障转移:如果是主从节点都正常的情况下,为了演练灾备切换,人工故障转移
  12. #首选关闭主节点,防止主节点非standby模式运行之后,pg_rewind无法修改wal日志时间线
  13. 1,关闭原始主库
  14. if 从节点状态是是in_recovery:
  15.         promote 从节点,从节点接管读写服务
  16. #把原始主节点作为从节点加入集群中运行
  17. 2,尝试修复原始主库时间线
  18. 3,尝试以standby的模式启动主库
复制代码
优点

快速尝试了一个轻量级的实现方案,优点如下:
1,不引入或者依赖于任何第三方中间件,可以同时实现多组流复制集群的监控和故障转移。
2,不对现有PostgreSQL的复制环境做任何修改和侵入,以“第三方”的视角来监控现有PostgreSQL流复制,可以自行实现PostgreSQL流复制状态监控和故障转移。
3,支持在主节点故障时实现自动故障转移,或者手动故障转移视线灾备切换演练,支持主从的单次或者多次反复切换。
4,可以自定义故障转移判断逻辑以及自定义日志,通知信息等 详细实现

详细实现如下,目前在两台EC2上测试环境上,一台腾讯云,一台阿里云上搭建PostgreSQL流复制。经测试:可以一键快速实现主从切换,或者连续故障转移(A节点切换到B节点,B节点切换到A节点,需要修改主从连接信息,已测试切换超过30次)
  1. import threading
  2. import psycopg2
  3. import time
  4. import paramiko
  5. # 连接测试
  6. def conn_to_postgres(db_config):
  7.     try:
  8.         conn = psycopg2.connect(**db_config)
  9.         conn.close()
  10.         return True
  11.     except Exception as e:
  12.         print(f"Error connecting to master: {e}")
  13.         return False
  14. # 判断节点身份
  15. def is_postgresql_recovery(db_config):
  16.     try:
  17.         with psycopg2.connect(**db_config) as conn:
  18.             with conn.cursor() as cur:
  19.                 cur.execute("SELECT pg_is_in_recovery();")
  20.                 in_recovery = cur.fetchone()[0]
  21.         return in_recovery
  22.     except Exception as e:
  23.         print(f"Error connecting to master: {e}")
  24.         return False
  25. # 探测节点是否可达
  26. def is_postgresql_reachable(db_config,retry_times):
  27.     # 这里仅仅判断主节点是否可连通
  28.     # 判断节点是否可达的时候,可以增加其他判断,比如当前节点是否能ping的通DNS服务器等,标明当前节点的判断是没有问题的
  29.     while retry_times > 0:
  30.         if not conn_to_postgres(db_config):
  31.             print('the postgres cannot reachable,retrying......')
  32.             time.sleep(10)
  33.             retry_times = retry_times -1
  34.         else:
  35.             return True
  36.     else:
  37.         return False
  38. def ssh_conn(host, username, password, port):
  39.     """
  40.     在远程Linux服务器上执行命令并返回命令的输出和错误。
  41.     :param host: 远程服务器的主机名或IP地址
  42.     :param port: SSH端口,默认为22
  43.     :param username: 用于SSH登录的用户名
  44.     :param password: 用于SSH登录的密码
  45.     :param command: 要在远程服务器上执行的命令
  46.     :return: 一个元组,包含(标准输出, 错误输出)
  47.     """
  48.     # 创建SSH客户端
  49.     ssh = paramiko.SSHClient()
  50.     # 自动添加策略,保存服务器的主机名和密钥信息
  51.     ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  52.     try:
  53.         # 连接到远程服务器
  54.         ssh.connect(hostname=host, port=port, username=username, password=password)
  55.         return ssh
  56.     except Exception as e:
  57.         # 如果发生异常,返回错误信息
  58.         return None, str(e)
  59. def ssh_exec_commend(node,commend_shell):
  60.     ssh = ssh_conn(host=node['host'], username=node['user_name'], port=node['port'], password=node['password'])
  61.     stdin, stdout, stderr = ssh.exec_command(commend_shell)
  62.     output = stdout.read().decode('utf-8')
  63.     error = stderr.read().decode('utf-8')
  64.     if stderr.channel.recv_exit_status() != 0:
  65.         print(error)
  66.         raise Exception(f"{0} executed failed".format(commend_shell))
  67.     return output,error
  68. def create_replication_slot_if_not_exists(db_config, slot_name):
  69.     try:
  70.         with psycopg2.connect(**db_config) as conn:
  71.             with conn.cursor() as cur:
  72.                 cur.execute("SELECT slot_name FROM pg_replication_slots WHERE slot_name = '{0}';".format(slot_name))
  73.                 var_slot_name = cur.fetchall()
  74.                 if not var_slot_name:
  75.                     cur.execute("SELECT * FROM pg_create_physical_replication_slot('{0}');".format(slot_name))
  76.                     print(f"Replication slot '{slot_name}' created.")
  77.                 else:
  78.                     print(f"Replication slot '{slot_name}' already exists.")
  79.     except Exception as e:
  80.         print(f"Error connecting to master: {e}")
  81.         return False
  82. def promote_standby_to_primary(db_config):
  83.     try:
  84.         # Connect to the standby server
  85.         with psycopg2.connect(**db_config) as conn:
  86.             conn.autocommit = True  # Ensure changes are committed immediately
  87.             with conn.cursor() as cur:
  88.                 cur.execute("SELECT pg_promote(true);")
  89.         print('SELECT pg_promote(true);')
  90.         print("{0} Standby promoted to primary successfully.".format(db_config['host']))
  91.     except Exception as e:
  92.         print("{0} Standby promoted to primary failed : {1}".format(db_config[0],e))
  93. # 时间线修复
  94. def run_pg_rewind(primary_node, standby_node):
  95.     # 新的主服务器的连接信息(pg_rewind需要这些信息)
  96.     primary_db_config = "host={0} port={1} user={2} password={3}".format(primary_node['db_config']['host'],
  97.                                                                          primary_node['db_config']['port'],
  98.                                                                          primary_node['db_config']['user'],
  99.                                                                          primary_node['db_config']['password'])
  100.     # 构建pg_rewind命令
  101.     pg_rewind_cmd = ("sudo -u postgres {0}/pg_rewind --target-pgdata={1} --source-server='host={2} port={3} user={4} dbname='postgres' password={5}'"
  102.                      .format(standby_node['pg_base_dir'],
  103.                              standby_node['pg_data_dir'],
  104.                              primary_node['db_config']['host'],
  105.                              primary_node['db_config']['port'],
  106.                              primary_node['db_config']['user'],
  107.                              primary_node['db_config']['password']
  108.                              )
  109.                      )
  110.     ssh = ssh_conn(host=standby_node['host'], username=standby_node['user_name'], port=standby_node['port'], password=standby_node['password'])
  111.     try:
  112.         # 停止standby数据库实例
  113.         ssh_exec_commend(node=standby_node, commend_shell="sudo systemctl stop postgresql9000")
  114.         print(pg_rewind_cmd)
  115.         stdin, stdout, stderr = ssh.exec_command(pg_rewind_cmd)
  116.         output = stdout.read().decode('utf-8')
  117.         error = stderr.read().decode('utf-8')
  118.         if stderr.channel.recv_exit_status() != 0:
  119.             print('******standby node pg_rewind failed')
  120.             print(output)
  121.             print(error)
  122.             raise Exception(f"{0} Failed to rewind commend".format(standby_node['db_config']['host']))
  123.         print(output)
  124.         print(error)
  125.         return True
  126.     except Exception as err:
  127.         print('pg_rewind failed ' + str(err))
  128.         return False
  129.     finally:
  130.         ssh.close()
  131. def startup_as_replica(primary_node, standby_node):
  132.     standby_auto_conf_path = '{0}/postgresql.auto.conf'.format(standby_node['pg_data_dir'])
  133.     standby_signal_path = '{0}/standby.signal'.format(standby_node['pg_data_dir'])
  134.     ssh = ssh_conn(host=standby_node['host'], username=standby_node['user_name'], port=standby_node['port'], password=standby_node['password'])
  135.     # 要写入postgresql.auto.conf的内容(示例)
  136.     auto_conf_content = """
  137.         primary_conninfo = 'user={2} password=''{3}'' channel_binding=prefer host={0} port={1} sslmode=prefer sslcompression=0 sslcertmode=allow sslsni=1 ssl_min_protocol_version=TLSv1.2 gssencmode=disable krbsrvname=postgres gssdelegation=0 target_session_attrs=any load_balance_hosts=disable'
  138.         primary_slot_name = '{4}'
  139.         """.format(primary_node['db_config']['host'],
  140.                    primary_node['db_config']['port'],
  141.                    primary_node['repl_user'],
  142.                    primary_node['repl_password'],
  143.                    primary_node['slot_name']).lstrip()
  144.     # 要创建的standby.signal文件(空文件即可,表示该节点为备用节点)
  145.     try:
  146.         # 防止主库自动启动,再次执行停止从库,这个从库实际上是原来的主库
  147.         ssh_exec_commend(node=standby_node,commend_shell="sudo systemctl stop postgresql9000")
  148.         try:
  149.             # 创建standby.signal文件(如果尚不存在)
  150.             print('###### {0} touch {1}'.format(standby_node['host'],standby_signal_path))
  151.             output, error = ssh_exec_commend(node=standby_node,commend_shell = 'sudo -u postgres touch {0}'.format(standby_signal_path))
  152.             print('###### {0} touch {1}'.format(standby_node['host'], standby_auto_conf_path))
  153.             output, error = ssh_exec_commend(node=standby_node, commend_shell = 'sudo -u postgres touch {0}'.format(standby_auto_conf_path))
  154.             output, error = ssh_exec_commend(node=standby_node, commend_shell = '''sudo  echo "{0}"  > {1}'''.format(auto_conf_content,standby_auto_conf_path))
  155.             # 启动实例,此时以standby模式运行
  156.             ssh_exec_commend(node=standby_node, commend_shell="sudo systemctl start postgresql9000")
  157.             return True
  158.         except Exception as err:
  159.             print(err)
  160.             return False
  161.     finally:
  162.         ssh.close()
  163. # 执行故障转移,因为主节点可达的情况,把主节点作为从节点运行,可以是灾备切换的预演,
  164. def execute_failover(primary_node,standby_node):
  165.     # 故障转移开始的第一步,应该在第一时间停止主库,直接stop,不用判断,否则后续无法pg_rewind同步时间线
  166.     print('### 1 stop 主节点PostgreSQL服务')
  167.     try:
  168.         # stop_postgresql服务
  169.         ssh_exec_commend(node=primary_node,commend_shell="sudo systemctl stop postgresql9000")
  170.     except Exception as err:
  171.         print('stop primary postgresql error')
  172.         print(str(err))
  173.     print('### 2 promote 从节点为主节点')
  174.     # 判断standby节点是否处于recovery模式
  175.     if is_postgresql_recovery(standby_node['db_config']):
  176.         # 创建复制槽,为从节点复制做准备
  177.         create_replication_slot_if_not_exists(standby_node['db_config'], standby_node['slot_name'])
  178.         # promote standby节点
  179.         promote_standby_to_primary(standby_node['db_config'])
  180.     else:
  181.         print('当前节点非recovery模式,不可执行promote')
  182.         exit(1)
  183.     # 将旧的主节点作为从节点加入新的主节点复制
  184.     print('###### 3 从节点开始pg_rewind')
  185.     # 执行pg_rewind修复原始主节点的时间线,请注意:此时从节点提升为主节点,主节点已经变为从节点,所以需要注意节点身份的变化和参数的变化
  186.     pg_rewind_result = run_pg_rewind(primary_node=standby_node, standby_node=primary_node)
  187.     if pg_rewind_result:
  188.         print('######从节点开始pg_rewind 成功')
  189.     if pg_rewind_result:
  190.         print('### 4 将原始主节点,重新以从节点身份加入复制集群')
  191.         # 注意此时主从节点身份已经变换,所以这里参数也变化,原来的从节点已经成为主节点,所以注意参数的交换
  192.         startup_as_standby = startup_as_replica(primary_node=standby_node, standby_node=primary_node)
  193.         if startup_as_standby:
  194.             print('### 原始主节点以从节点身份加入复制集群成功')
  195. # 启动自动检测和故障转移
  196. def auto_failover(primary_node,standby_node):
  197.     while True:
  198.         if not is_postgresql_reachable(primary_node['db_config'], retry_times=5):
  199.             # 告警通知,说明当前配置的主节点故障,即将开始主从切换
  200.             execute_failover(primary_node,standby_node)
  201.             # 执行完切换之后,进行邮件/短信告警通知等
  202.             # 因为主从身份发生了变化,不再对当前的主从配置进行监控,、
  203.             # 应该第一时间人工介入:1,确认原始主节点是否正常,2,原始主节点是否以从节点的身份加入集群
  204.         # 定期检查间隔
  205.         time.sleep(15)
  206. if __name__ == "__main__":
  207.     # 数据库连接配置
  208.     primary_node  = {
  209.         'host':'***.***.***.***',                                #主机名
  210.         'user_name':"root",                                      #系统用户名
  211.         'password': "******",                                    #系统密码
  212.         'port':22,                                               #ssh端口号
  213.         'pg_base_dir':'/usr/local/pgsql16/server/bin',           #PostgreSQL Server路径
  214.         'pg_data_dir':'/usr/local/pgsql16/pg9000/data',          #PostgreSQL 数据路径
  215.         'repl_user':'replica_user',                              #流复制用户名
  216.         'repl_password':'******',                                #流复制用户密码
  217.         'slot_name': 'pgstandby_slave01',                        #流复制slot名字
  218.         'db_config': {                                           #数据库配置
  219.             'host': '***.***.***.***',
  220.             'user': 'postgres',
  221.             'password': '******',
  222.             'dbname': 'postgres',
  223.             'port': '******'
  224.         },
  225.     }
  226.     standby_node1 = {
  227.         'host':'***.***.***.***',
  228.         'user_name':"root",
  229.         'password': "******",
  230.         'port':22,
  231.         'pg_base_dir':'/usr/local/pgsql16/server/bin',
  232.         'pg_data_dir':'/usr/local/pgsql16/pg9000/data',
  233.         'repl_user':'replica_user',
  234.         'repl_password':'******',
  235.         'slot_name': 'pgstandby_slave01',
  236.         'db_config': {
  237.             'host': '***.***.***.***',
  238.             'user': 'postgres',
  239.             'password': '******',
  240.             'dbname': 'postgres',
  241.             'port': '******'
  242.         },
  243.     }
  244.     postgres_cluster_list = [{'primary_node':primary_node,'standby_node1':standby_node1},] #这里可以配置多个集群,每个集群有一个PrimaryNode,一个或者多个standbyNode组成
  245.     # 调试模式:注意,手动故障转移,表明参数里的主从节点交换身份
  246.     for cluster_info in postgres_cluster_list:
  247.         print(cluster_info['primary_node'])
  248.         execute_failover(primary_node=cluster_info['primary_node'], standby_node=cluster_info['standby_node1'])
  249.     # 单独的拉起一个从节点,分两步:
  250.     # 1 单独的时间线修复操作
  251.     # pg_rewind_result = run_pg_rewind(primary_node=standby_node1, standby_node=primary_node)
  252.    
  253.     # 2,standby模式运行
  254.     # 注意此时主从节点身份已经变换,所以这里参数也变化,原来的从节点已经成为主节点,所以注意参数的交换
  255.     # startup_as_replica(primary_node=standby_node1, standby_node=primary_node)
  256.    
  257.    
  258.     # 如果有多组集群,每个集群一个独立线程进行监控
  259.     '''
  260.     thread_list = []
  261.     for cluster_info in postgres_cluster_list:
  262.         t = threading.Thread(target=auto_failover(primary_node=cluster_info['primary_node'], standby_node=cluster_info['standby_node1']))
  263.         thread_list.append(t)
  264.     for t in thread_list:
  265.         t.start()
  266.     for r in thread_list:
  267.         t.join()
  268.     '''
复制代码
 
待改进

1,用户名密码均已明文方式写在主机信息中
2,pg_rewind的时候需要重启服务器,这里把PostgreSQL的服务写死了,笔者的测试环境PostgreSQL服务名是postgresql9000
3,如果是自动切换模式,自动故障转移后,尚未修改主从配置信息,此时节点的身份信息还是切换前的
4,判断节点是否正常的逻辑中,仅仅通过是否可以连接来实现,如果监控服务本身无法与被监控节点通信,可能会出现误判的情况

 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册