申屠梓彤 发表于 2025-5-29 13:17:19

Lean ConnectQuant针对A股的量化数据库搭建【量化数据篇】

本文基于 SQLite 轻量级数据库与 QuantConnect Lean 框架,构建了一套​​支持高频数据回测、策略无缝实盘部署​​的本地化解决方案。
0x01获取所有股票

import baostock as bs
import pandas as pd
import sqlite3
from tqdm import tqdm
import datetime

def save_to_sqlite(df, db_path, table_name):
    """将DataFrame存储到SQLite数据库"""
    try:
      conn = sqlite3.connect(db_path)
      df.to_sql(table_name, conn, if_exists='replace', index=False)
      print(f"数据成功保存至 {db_path} 的 {table_name} 表")
    except Exception as e:
      print(f"数据库操作失败:{str(e)}")
    finally:
      if 'conn' in locals() and conn:
            conn.close()

def validate_stock_code(code):
    """规范股票代码格式为交易所.6位数字(增强创业板/科创板处理)(@ref)"""
    if not isinstance(code, str):
      code = str(code)
   
    code = code.strip().lower().replace(" ", "")
   
    if code.startswith(("sh.", "sz.", "bj.")):
      prefix, num = code.split(".")
      return f"{prefix}.{num.zfill(6)}"
   
    if code.isdigit():
      code = code.zfill(6)
      first_char = code
      if first_char in ['5','6','9','7'] or code.startswith("688"):
            return f"sh.{code}"
      elif first_char in ['0','1','2','3'] or code.startswith("30"):
            return f"sz.{code}"
      elif code.startswith("8"):
            return f"bj.{code}"
   
    raise ValueError(f"无法识别的股票代码格式: {code}")

def get_index_components(index_func):
    """通用指数成分股获取函数(支持自动重试)(@ref)"""
    max_retries = 3
    for attempt in range(max_retries):
      try:
            rs = index_func()
            if rs.error_code == '0':
                return {validate_stock_code(item) for item in rs.data}
      except Exception as e:
            print(f"指数查询第{attempt+1}次失败: {str(e)}")
            if attempt == max_retries - 1:
                return set()
    return set()

def get_all_stocks_with_industry():
    """获取全市场股票及指数成分标记"""
    try:
      # 登录系统
      if bs.login().error_code != '0':
            raise ConnectionError("Baostock登录失败")

      # 获取三大指数成分股(提前获取避免多次查询)(@ref)
      sz50_set = get_index_components(bs.query_sz50_stocks)
      hs300_set = get_index_components(bs.query_hs300_stocks)
      zz500_set = get_index_components(bs.query_zz500_stocks)
      print(f"指数成分股数量:SZ50={len(sz50_set)}, HS300={len(hs300_set)}, ZZ500={len(zz500_set)}")

      # 获取全市场股票(含最新交易状态)(@ref)
      query_date = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
      rs = bs.query_all_stock(day=query_date)
      if rs.error_code != '0':
            raise ValueError(f"股票查询失败:{rs.error_msg}")

      # 处理原始数据(增强状态过滤)(@ref)
      raw_df = pd.DataFrame(rs.data, columns=rs.fields)
      valid_df = raw_df == '1']# 过滤有效交易股票
      print(f"初始数据量:{len(raw_df)},有效交易股票:{len(valid_df)}")

      # 代码格式转换
      valid_df['valid_code'] = valid_df['code'].apply(lambda x: validate_stock_code(x))
      
      # 获取行业数据(批量查询优化)(@ref)
      industry_rs = bs.query_stock_industry()
      industry_data = []
      fields = industry_rs.fields
      while industry_rs.next():
            row = industry_rs.get_row_data()
            try:
                industry_data.append({
                  'valid_code': validate_stock_code(row),
                  'industry': row,
                  'industry_type': row
                })
            except Exception as e:
                print(f"跳过无效行业数据: {row}, 原因: {str(e)}")
      industry_df = pd.DataFrame(industry_data)

      # 合并数据
      merged_df = pd.merge(
            valid_df[['valid_code', 'code_name']].rename(columns={'valid_code':'code', 'code_name':'name'}),
            industry_df,
            left_on='code',
            right_on='valid_code',
            how='left'
      ).drop(columns=['valid_code'])

      # 标记指数成分(@ref)
      merged_df['issz50'] = merged_df['code'].isin(sz50_set).astype(int)
      merged_df['ishs300'] = merged_df['code'].isin(hs300_set).astype(int)
      merged_df['iszz500'] = merged_df['code'].isin(zz500_set).astype(int)

      # 拆分交易所代码
      merged_df['exchange'] = merged_df['code'].str.split('.').str
      merged_df['code'] = merged_df['code'].str.split('.').str
      
      # 填充缺失值
      merged_df.fillna({
            'industry': '未知',
            'industry_type': '未分类'
      }, inplace=True)

      return merged_df[[
            'code', 'name', 'industry', 'industry_type',
            'issz50', 'ishs300', 'iszz500', 'exchange'
      ]]

    except Exception as e:
      print(f"处理失败:{str(e)}")
      return None
    finally:
      bs.logout()


if __name__ == "__main__":
    df = get_all_stocks_with_industry()
    if df is not None:
      print("\n数据样例:")
      print(df != '未知'].head(5))
      save_to_sqlite(df, r"../project/data/AAshares/code.db", "all_stocks")0x02 获取日K线数据

import baostock as bs
import pandas as pd
import sqlite3
from tqdm import tqdm
import time
from datetime import datetime

def get_stock_codes(conn):
    """从数据库获取带交易所前缀的股票代码(结构不变)"""
    df = pd.read_sql('SELECT exchange, code FROM all_stocks', conn)
    return .lower().strip()}.{str(row['code']).zfill(6)}"
            for _, row in df.iterrows()]

def download_daily_data(code, start_date, end_date, max_retries=3):
    """下载日K线数据(仅修改frequency和fields)"""
    for attempt in range(max_retries):
      rs = bs.query_history_k_data_plus(
            code=code,
            fields="date,code,open,high,low,close,volume,amount",
            start_date=start_date,
            end_date=end_date,
            frequency="d",# 改为日线(@ref)
            adjustflag="2"   # 保持前复权
      )
      if rs.error_code == '0':
            return rs
      time.sleep(1)
    return rs

def process_daily_data(rs):
    """处理日K线数据(移除时间合并步骤)"""
    data_list = []
    while rs.next():
      data_list.append(rs.get_row_data())
    df = pd.DataFrame(data_list, columns=rs.fields)
   
    # 数据清洗(仅保留日期处理)
    df['code'] = df['code'].str.split('.').str
    numeric_cols = ['open', 'high', 'low', 'close', 'volume', 'amount']
    df = df.apply(pd.to_numeric, errors='coerce')
    return df.dropna().reset_index(drop=True)

def save_to_db(df, conn):
    # 将DataFrame转换为字典列表
    records = df.to_dict('records')

    for record in records:
      try:
            # 单条插入语句
            conn.execute(
                """
                INSERT OR IGNORE INTO stock_day_k
                (code, date, open, close, high, low, volume, amount)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                """,
                (record['code'], record['date'],
                record['open'], record['close'],
                record['high'], record['low'],
                record['volume'], record['amount'])
            )
      except IntegrityError:
            print(f"主键冲突已跳过:{record['code']} - {record['date']}")
            continue
    conn.commit()

def main():
    # 初始化数据库(修改表结构)(@ref)
    conn = sqlite3.connect(r"../project/data/AAshares/code.db")
    conn.executescript('''
      CREATE TABLE IF NOT EXISTS stock_day_k (
            date TEXT, code TEXT, open REAL, high REAL,
            low REAL, close REAL, volume INTEGER, amount REAL,
            PRIMARY KEY (date, code)
      );
      CREATE INDEX IF NOT EXISTS idx_day ON stock_day_k(code);
    ''')

    # 登录BaoStock(保持原登录逻辑)
    if bs.login().error_code != '0':
      print("登录失败")
      return

    try:
      codes = get_stock_codes(conn)
      start_date = '2025-01-01'# 保持时间范围不变
      end_date = '2025-04-01'
      
      for code in tqdm(codes, desc="下载进度"):
            try:
                rs = download_daily_data(code, start_date, end_date)
                if rs.error_code != '0': continue
               
                df = process_daily_data(rs)
                if not df.empty:
                  save_to_db(df, conn)
                  
                time.sleep(0.5)# 保持反爬策略不变(@ref)
               
            except Exception as e:
                print(f"{code} 下载失败: {str(e)}")
               
    finally:
      conn.close()
      bs.logout()

if __name__ == "__main__":
    main()0x03 获取5分钟K线数据

import baostock as bs
import pandas as pd
import sqlite3
from tqdm import tqdm
import time
from datetime import datetime

def get_stock_codes(conn):
    """从数据库获取带交易所前缀的股票代码(网页6最佳实践)"""
    df = pd.read_sql('SELECT exchange, code FROM all_stocks', conn)
    return .lower().strip()}.{str(row['code']).zfill(6)}"
            for _, row in df.iterrows()]

def download_5min_data(code, start_date, end_date, max_retries=3):
    """下载5分钟K线数据(含错误重试机制,网页8优化)"""
    for attempt in range(max_retries):
      rs = bs.query_history_k_data_plus(
            code=code,
            fields="date,time,code,open,high,low,close,volume,amount",
            start_date=start_date,
            end_date=end_date,
            frequency="5",
            adjustflag="2"# 前复权
      )
      if rs.error_code == '0':
            return rs
      time.sleep(1)
    return rs

def process_5min_data(rs):
    """处理5分钟数据格式(网页7时间处理优化)"""
    data_list = []
    while rs.next():
      data_list.append(rs.get_row_data())
    df = pd.DataFrame(data_list, columns=rs.fields)
   
    # 合并日期和时间字段
    df['datetime'] = df['time'].apply(
      lambda x: f"{x[:4]}-{x}-{x} {x}:{x}:00")
    df = df.drop(columns=['date', 'time'])
    df.rename(columns={'datetime': 'date'}, inplace=True)
    # 数据清洗
    df['code'] = df['code'].str.split('.').str
    numeric_cols = ['open', 'high', 'low', 'close', 'volume','amount']
    df = df.apply(pd.to_numeric, errors='coerce')
    return df.dropna().reset_index(drop=True)

def save_to_db(df, conn):
    # 将DataFrame转换为字典列表
    records = df.to_dict('records')

    for record in records:
      try:
            # 单条插入语句
            conn.execute(
                """
                INSERT OR IGNORE INTO stock_5min_k
                (code, date, open, close, high, low, volume, amount)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                """,
                (record['code'], record['date'],
                record['open'], record['close'],
                record['high'], record['low'],
                record['volume'], record['amount'])
            )
      except IntegrityError:
            print(f"主键冲突已跳过:{record['code']} - {record['date']}")
            continue
    conn.commit()

def main():
    # 初始化数据库(网页6表结构)
    conn = sqlite3.connect(r"../project/data/AAshares/code.db")
    conn.executescript('''
      CREATE TABLE IF NOT EXISTS stock_5min_k (
            date TEXT, code TEXT, open REAL, high REAL,
            low REAL, close REAL, volume INTEGER,amount REAL,
            PRIMARY KEY (date, code)
      );
      CREATE INDEX IF NOT EXISTS idx_5min ON stock_5min_k(code);
    ''')

    # 登录BaoStock(网页7规范)
    if bs.login().error_code != '0':
      print("登录失败")
      return

    try:
      codes = get_stock_codes(conn)
      start_date = '2025-04-19'
      end_date = datetime.now().strftime('%Y-%m-%d')# 格式为 'YYYY-MM-DD'
      
      for code in tqdm(codes, desc="下载进度"):
            try:
                rs = download_5min_data(code, start_date, end_date)
                if rs.error_code != '0': continue
               
                df = process_5min_data(rs)
                if not df.empty:
                  save_to_db(df, conn)
                  
                time.sleep(0.5)# 反爬策略(网页8建议)
               
            except Exception as e:
                print(f"{code} 下载失败: {str(e)}")
               
    finally:
      conn.close()
      bs.logout()

if __name__ == "__main__":
    main()0x04 通过web将获取的数据读出来

import csv
from io import StringIO
from flask import Flask, jsonify, request, Response
import sqlite3
from datetime import datetime, timedelta, timezone
import pandas as pd
import requests

app = Flask(__name__)

def get_db_connection():
    conn = sqlite3.connect(r"../project/data/AAshares/code.db")
    conn.row_factory = sqlite3.Row
    return conn

def getdatasqlite(code,table_name):
    """统一处理数据请求"""
    conn = get_db_connection()
    cursor = conn.cursor()
    cursor.execute(f"""
      SELECT
            date AS Date,         
            open AS Open,
            close AS Close,
            high AS High,
            low AS Low,
            volume AS Volume,
            amount AS Amount
      FROM {table_name}
      WHERE code = ?
      ORDER BY date
    """, (code,))
   
    data = cursor.fetchall()
    converted_data = []
    for row in data:
      # 按新的字段顺序解析
      date_str = row# 直接使用数据库返回的日期字符串(@ref)
      # 添加时区转换逻辑(假设数据库存储UTC时间)
      if(table_name=="stock_5min_k"):
            local_time = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")# 添加秒并解析为本地时间
      else:
            local_time = datetime.strptime(date_str, "%Y-%m-%d")# 添加秒并解析为本地时间      
      date_time_str = local_time.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")# 转换为UTC时间

      converted = {
            "Date": date_time_str,# 统一时间格式(@ref)
            "Open": float(row),
            "Close": float(row),
            "High": float(row),
            "Low": float(row),
            "Volume": float(row),# 交易量
            "Amount": float(row)   # 修正字段索引(@ref)
      }
      converted_data.append(converted)
    return converted_data


def getdatahttp(code,table_name):
    # 请求第三方接口
    if code.startswith("6"):
      secid = "1." + code
    else:
      secid = "0." + code
    # end_date = (datetime.now() - timedelta(days=1)).strftime('%Y%m%d')
    end_date = (datetime.now()).strftime('%Y%m%d')
    if(table_name=="stock_5min_k"):
      api_url = f"https://push2his.eastmoney.com/api/qt/stock/kline/get?secid={secid}&fields1=f1%2Cf2%2Cf3%2Cf4%2Cf5%2Cf6&fields2=f51%2Cf52%2Cf53%2Cf54%2Cf55%2Cf56%2Cf57%2Cf58%2Cf59%2Cf60%2Cf61&klt=5&fqt=1&end={end_date}&lmt=1488"# 替换为实际API地址
    else:
      api_url = f"https://push2his.eastmoney.com/api/qt/stock/kline/get?secid={secid}&fields1=f1%2Cf2%2Cf3%2Cf4%2Cf5%2Cf6&fields2=f51%2Cf52%2Cf53%2Cf54%2Cf55%2Cf56%2Cf57%2Cf58%2Cf59%2Cf60%2Cf61&klt=101&fqt=1&end={end_date}&lmt=1488"# 替换为实际API地址

    try:
      response = requests.get(api_url, timeout=10)
      response.raise_for_status()
    except requests.exceptions.RequestException as e:
      return {"error": f"API请求失败: {str(e)}"}, 500

    # 解析原始数据
    raw_data = response.json()
    klines = raw_data.get('data', {}).get('klines', [])
   
    # 数据清洗转换
    converted_data = []
    for item in klines:
      parts = item.split(',')
      if len(parts) < 11:
            continue# 跳过无效数据
            
      # 按字段位置解析数据
      # 按字段位置解析数据
      if(table_name=="stock_5min_k"):
            local_time = datetime.strptime(parts + ":00", "%Y-%m-%d %H:%M:%S")# 添加秒并解析为本地时间
      else:
            local_time = datetime.strptime(parts , "%Y-%m-%d")# 添加秒并解析为本地时间
      date_time_str = local_time.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")# 转换为UTC时间
      converted = {
            "Date": date_time_str,
            "Open": float(parts),
            "Close": float(parts),
            "High": float(parts),
            "Low": float(parts),
            "Volume": float(parts),# 交易量
            "Amount": float(parts),    # 交易额
      }
      converted_data.append(converted)
    return converted_data

def handle_request(code, table_name, format_type='json'):
    converted_data = getdatasqlite(code,table_name)
    # ...后续格式转换逻辑保持不变...
    # 返回数据
    if not converted_data:
      return {"error": "没有有效数据"}, 404

    # 根据要求格式化输出
    if format_type == 'csv':
      # 生成CSV
      si = StringIO()
      writer = csv.DictWriter(si, fieldnames=converted_data.keys())
      writer.writeheader()
      writer.writerows(converted_data)# 写入多行数据
      return si.getvalue(), 200, {'Content-Type': 'text/csv'}
      
    return jsonify(converted_data[-1]), 200

@app.route('/dayapi', methods=['GET'])
def get_daystock_data():
    code = request.args.get('code')
    format_type = request.args.get('format', default='json')
    if not code:
      return jsonify({"error": "缺少参数: code"}), 400
    return handle_request(code, 'stock_day_k', format_type)

@app.route('/api', methods=['GET'])
def get_stock_data():
    code = request.args.get('code')
    format_type = request.args.get('format', default='json')
    if not code:
      return jsonify({"error": "缺少参数: code"}), 400
    return handle_request(code, 'stock_5min_k', format_type)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=80, debug=True)访问 http://ip/dayapi?code={Value} 获取最新的一根K线
访问 http://ip/dayapi?code={Value}&format=csv 返回的是历史的K线csv
如果成功了,则完成了基础数据的搭建,这个web是为了给Lean QuantConnect调用的。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: Lean ConnectQuant针对A股的量化数据库搭建【量化数据篇】