找回密码
 立即注册
首页 业界区 业界 【实用脚本】一键完成MySQL数据库健康巡检,并生成word ...

【实用脚本】一键完成MySQL数据库健康巡检,并生成word报告

狭踝仇 昨天 19:05
过程截图:
1.png

2.png

说明:赋予执行权限,执行即可。
源码文件:
[code]#!/usr/bin/env python3# -*- coding:utf-8 -*-from __future__ import unicode_literalsimport itertoolsimport mathimport sysimport datetimeimport argparseimport subprocessimport loggingimport logging.handlersimport socketimport reimport timefrom pathlib import Pathimport pymysqlimport datetimeimport sys, getopt, osimport docxfrom docx.shared import Cmfrom docxtpl import DocxTemplateimport configparserimport importlibimport subprocessimport jsonimport hashlibimport base64from datetime import datetime, timedeltaimport platformimportlib.reload(sys)# 获取资源路径函数 - 处理打包后的路径问题def get_resource_path(relative_path):    """获取资源的绝对路径,处理打包后的路径问题"""    try:        # PyInstaller创建临时文件夹,将路径存储在_MEIPASS中        base_path = sys._MEIPASS    except Exception:        base_path = os.path.abspath(".")        return os.path.join(base_path, relative_path)# 许可证管理类class SimpleCrypto:    """简单的加密解密类,兼容Python 3.6以上"""        def __init__(self, secret="ODB_SECRET_2024"):        self.secret = secret.encode('utf-8')        def _xor_encrypt_decrypt(self, data, key):        """使用XOR进行加密/解密"""        key_len = len(key)        result = bytearray()        for i, byte in enumerate(data):            result.append(byte ^ key[i % key_len])        return bytes(result)        def encrypt(self, text):        """加密文本"""        data = text.encode('utf-8')        # 生成密钥        key = hashlib.sha256(self.secret).digest()[:32]        # XOR加密        encrypted = self._xor_encrypt_decrypt(data, key)        # Base64编码        return base64.b64encode(encrypted).decode('utf-8')        def decrypt(self, token):        """解密文本"""        try:            # Base64解码            encrypted = base64.b64decode(token.encode('utf-8'))            # 生成密钥            key = hashlib.sha256(self.secret).digest()[:32]            # XOR解密            decrypted = self._xor_encrypt_decrypt(encrypted, key)            return decrypted.decode('utf-8')        except Exception as e:            raise ValueError(f"解密失败: {e}")class LicenseValidator:    """严格的许可证验证类 - 防止删除文件重置试用期"""    def __init__(self):        self.license_file = "mysql_inspector.lic"        self.trial_days = 366        self.crypto = SimpleCrypto()        self._init_license_system()    def _parse_datetime(self, date_string):        """解析日期字符串,兼容Python 3.6"""        try:            # 尝试Python 3.7+的fromisoformat            return datetime.fromisoformat(date_string)        except AttributeError:            # Python 3.6兼容:手动解析ISO格式            try:                # 格式: 2024-01-01T10:30:00                if 'T' in date_string:                    date_part, time_part = date_string.split('T')                    year, month, day = map(int, date_part.split('-'))                    time_parts = time_part.split(':')                    hour, minute = int(time_parts[0]), int(time_parts[1])                    second = int(time_parts[2].split('.')[0]) if len(time_parts) > 2 else 0                    return datetime(year, month, day, hour, minute, second)                else:                    # 格式: 2024-01-01 10:30:00                    date_part, time_part = date_string.split(' ')                    year, month, day = map(int, date_part.split('-'))                    hour, minute, second = map(int, time_part.split(':'))                    return datetime(year, month, day, hour, minute, second)            except Exception as e:                print(f"日期解析错误: {e}")                return datetime.now()    def _format_datetime(self, dt):        """格式化日期为字符串,兼容Python 3.6"""        return dt.strftime('%Y-%m-%dT%H:%M:%S')    def _init_license_system(self):        """初始化许可证系统"""        # 创建必要的目录和文件        if not os.path.exists(self.license_file):            self._create_trial_license()    def _create_trial_license(self):        """创建试用许可证"""        create_time = datetime.now()        expire_time = create_time + timedelta(days=self.trial_days)                license_data = {            "type": "TRIAL",            "create_time": self._format_datetime(create_time),            "expire_time": self._format_datetime(expire_time),            "machine_id": self._get_machine_id(),            "signature": self._generate_signature("TRIAL")        }        encrypted_data = self.crypto.encrypt(json.dumps(license_data))        with open(self.license_file, 'w') as f:            f.write(encrypted_data)    def _get_machine_id(self):        """获取机器标识(简化版)"""        try:            machine_info = f"{platform.node()}-{platform.system()}-{platform.release()}"            return hashlib.md5(machine_info.encode()).hexdigest()[:16]        except:            return "unknown_machine"    def _generate_signature(self, license_type):        """生成许可证签名"""        key = "ODB2024TRL"  # 试用版密钥        signature_data = f"{license_type}-{datetime.now().strftime('%Y%m%d')}-{key}"        return hashlib.sha256(signature_data.encode()).hexdigest()    def _verify_signature(self, license_data):        """验证许可证签名"""        try:            expected_signature = self._generate_signature(license_data["type"])            return license_data["signature"] == expected_signature        except:            return False    def validate_license(self):        """验证许可证有效性"""        try:            # 检查许可证文件            if not os.path.exists(self.license_file):                return False, "许可证文件不存在", 0            # 读取并解密许可证            with open(self.license_file, 'r') as f:                encrypted_data = f.read().strip()                        decrypted_data = self.crypto.decrypt(encrypted_data)            license_data = json.loads(decrypted_data)            # 验证签名            if not self._verify_signature(license_data):                return False, "许可证签名无效", 0            # 检查过期时间            expire_time = self._parse_datetime(license_data["expire_time"])            remaining_days = (expire_time - datetime.now()).days            if remaining_days < 0:                return False, "许可证已过期", 0            license_type = license_data.get("type", "TRIAL")                        if license_type == "TRIAL":                return True, f"试用版许可证有效,剩余 {remaining_days} 天", remaining_days            else:                return True, f"{license_type}版许可证有效", remaining_days        except Exception as e:            return False, f"许可证验证失败: {str(e)}", 0# 临时替代自定义loggerdef getlogger():    logger = logging.getLogger('mysql_check')    logger.setLevel(logging.INFO)    if not logger.handlers:        handler = logging.StreamHandler()        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')        handler.setFormatter(formatter)        logger.addHandler(handler)    return loggerlogger = getlogger()# 辅助类:传递脚本输入的参数class passArgu(object):    """辅助类,脚本传参    查看帮助:python3 main.py -h"""    def get_argus(self):        """ all_info: 接收所有传入的信息 """        all_info = argparse.ArgumentParser(            description="--example: python3 mysql_autoDOC.py -C templates/sqltemplates.ini -L '标签名称'")        all_info.add_argument('-C', '--sqltemplates', required=False, default='templates/sqltemplates.ini',                              help='SQL sqltemplates.')        all_info.add_argument('-L', '--label', required=False, help='Label used when health check single database.')        all_info.add_argument('-B', '--batch', action='store_true', help='Batch mode (use interactive input for multiple DBs)')        all_para = all_info.parse_args()        " 返回值默认string, 不区分前后顺序 "        return all_para# 新增:交互式输入数据库连接信息def input_db_info():    """交互式输入数据库连接信息,支持默认值"""    print("\n请输入数据库连接信息:")        # 主机地址,默认localhost    host = input("主机地址 [localhost]: ").strip()    if not host:        host = "localhost"        # 端口,默认3306    port_input = input("端口 [3306]: ").strip()    if not port_input:        port = 3306    else:        try:            port = int(port_input)        except ValueError:            print("⚠️  端口输入无效,使用默认值3306")            port = 3306        # 用户名,默认root    user = input("用户名 [root]: ").strip()    if not user:        user = "root"        # 密码,无默认值    import getpass    password = getpass.getpass("密码: ").strip()        # 数据库名称/标签    db_name = input("数据库名称(用于报告标识) [MySQL_Server]: ").strip()    if not db_name:        db_name = "MySQL_Server"        # 验证连接    print(f"\n
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册