狭踝仇 发表于 2025-9-18 11:34:28

印度尼西亚股票数据API对接实现

环境准备

首先安装必要的依赖包:
pip install requests websocket-client pandas numpy基础配置

import requests
import json
import websocket
import threading
import time
from datetime import datetime

# API配置
API_KEY = "YOUR_API_KEY"# 替换为您的实际API密钥
BASE_URL = "https://api.stocktv.top"
WS_URL = "wss://ws-api.stocktv.top/connect"

# 印尼股票代码映射(示例)
IDX_SYMBOLS = {
    "BBCA": "Bank Central Asia",
    "BBRI": "Bank Rakyat Indonesia",
    "TLKM": "Telkom Indonesia",
    "ASII": "Astra International",
    "UNVR": "Unilever Indonesia"
}REST API实现

1. 获取印尼股票列表

def get_indonesia_stocks(page=1, page_size=100):
    """获取印尼交易所股票列表"""
    url = f"{BASE_URL}/stock/stocks"
    params = {
      "countryId": 42,      # 印尼国家ID
      "exchangeId": 62,   # IDX交易所ID
      "pageSize": page_size,
      "page": page,
      "key": API_KEY
    }
   
    try:
      response = requests.get(url, params=params)
      if response.status_code == 200:
            data = response.json()
            if data.get("code") == 200:
                return data["data"]["records"]
            else:
                print(f"API Error: {data.get('message')}")
      else:
            print(f"Request failed with status: {response.status_code}")
    except Exception as e:
      print(f"Error fetching stock list: {str(e)}")
   
    return []

# 示例:获取第一页股票列表
stocks = get_indonesia_stocks()
for stock in stocks:
    print(f"{stock['symbol']}: {stock['name']} - {stock['last']}")2. 查询特定股票详情

def get_stock_detail(symbol_or_id):
    """获取股票详细信息"""
    url = f"{BASE_URL}/stock/queryStocks"
   
    # 判断是symbol还是id
    if isinstance(symbol_or_id, str) and symbol_or_id.isdigit():
      params = {"id": symbol_or_id, "key": API_KEY}
    else:
      params = {"symbol": symbol_or_id, "key": API_KEY}
   
    try:
      response = requests.get(url, params=params)
      if response.status_code == 200:
            data = response.json()
            if data.get("code") == 200 and data["data"]:
                return data["data"]
            else:
                print(f"API Error: {data.get('message')}")
      else:
            print(f"Request failed with status: {response.status_code}")
    except Exception as e:
      print(f"Error fetching stock detail: {str(e)}")
   
    return None

# 示例:获取BBCA股票详情
bbca_detail = get_stock_detail("BBCA")
if bbca_detail:
    print(f"BBCA当前价格: {bbca_detail['last']}")
    print(f"涨跌幅: {bbca_detail['chgPct']}%")3. 获取指数数据

def get_indonesia_indices():
    """获取印尼主要指数"""
    url = f"{BASE_URL}/stock/indices"
    params = {
      "countryId": 42,# 印尼国家ID
      "key": API_KEY
    }
   
    try:
      response = requests.get(url, params=params)
      if response.status_code == 200:
            data = response.json()
            if data.get("code") == 200:
                return data["data"]
            else:
                print(f"API Error: {data.get('message')}")
      else:
            print(f"Request failed with status: {response.status_code}")
    except Exception as e:
      print(f"Error fetching indices: {str(e)}")
   
    return []

# 示例:获取印尼指数
indices = get_indonesia_indices()
for index in indices:
    print(f"{index['symbol']}: {index['last']} ({index['chgPct']}%)")4. 获取K线数据

def get_kline_data(pid, interval="PT15M"):
    """获取股票K线数据"""
    url = f"{BASE_URL}/stock/kline"
    params = {
      "pid": pid,
      "interval": interval,
      "key": API_KEY
    }
   
    try:
      response = requests.get(url, params=params)
      if response.status_code == 200:
            data = response.json()
            if data.get("code") == 200:
                return data["data"]
            else:
                print(f"API Error: {data.get('message')}")
      else:
            print(f"Request failed with status: {response.status_code}")
    except Exception as e:
      print(f"Error fetching kline data: {str(e)}")
   
    return []

# 示例:获取BBCA的15分钟K线数据
bbca_kline = get_kline_data(41602, "PT15M")
for kline in bbca_kline[:5]:# 显示前5条
    dt = datetime.fromtimestamp(kline["time"] / 1000)
    print(f"{dt}: O:{kline['open']} H:{kline['high']} L:{kline['low']} C:{kline['close']}")5. 获取涨跌排行榜

def get_top_gainers():
    """获取涨幅榜"""
    url = f"{BASE_URL}/stock/updownList"
    params = {
      "countryId": 42,# 印尼国家ID
      "type": 1,         # 1=涨幅榜, 2=跌幅榜
      "key": API_KEY
    }
   
    try:
      response = requests.get(url, params=params)
      if response.status_code == 200:
            data = response.json()
            if data.get("code") == 200:
                return data["data"]
            else:
                print(f"API Error: {data.get('message')}")
      else:
            print(f"Request failed with status: {response.status_code}")
    except Exception as e:
      print(f"Error fetching top gainers: {str(e)}")
   
    return []

# 示例:获取印尼涨幅榜
gainers = get_top_gainers()
for stock in gainers[:10]:# 显示前10名
    print(f"{stock['symbol']}: {stock['last']} (+{stock['chgPct']}%)")WebSocket实时数据

WebSocket客户端实现

class IDXWebSocketClient:
    def __init__(self, api_key):
      self.api_key = api_key
      self.ws = None
      self.connected = False
      self.subscriptions = set()
      
      # 启动连接
      self.connect()
      
      # 启动心跳线程
      threading.Thread(target=self.heartbeat, daemon=True).start()
   
    def connect(self):
      """连接WebSocket服务器"""
      ws_url = f"{WS_URL}?key={self.api_key}"
      
      self.ws = websocket.WebSocketApp(
            ws_url,
            on_open=self.on_open,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close
      )
      
      # 启动WebSocket线程
      threading.Thread(target=self.ws.run_forever).start()
      time.sleep(1)# 等待连接建立
   
    def on_open(self, ws):
      """连接建立回调"""
      print("已连接到印尼股票实时数据服务")
      self.connected = True
      
      # 重新订阅之前订阅的股票
      if self.subscriptions:
            self.subscribe(list(self.subscriptions))
   
    def on_message(self, ws, message):
      """接收消息回调"""
      try:
            data = json.loads(message)
            
            # 处理实时行情数据
            if "pid" in data:
                symbol = data.get("symbol", "Unknown")
                price = data.get("last_numeric", 0)
                change = data.get("pc", 0)
                change_pct = data.get("pcp", 0)
               
                print(f"实时行情 [{symbol}]: {price} ({change} / {change_pct}%)")
               
            # 处理心跳响应
            elif data.get("action") == "pong":
                pass
               
      except Exception as e:
            print(f"处理实时数据时出错: {str(e)}")
   
    def on_error(self, ws, error):
      """错误处理回调"""
      print(f"WebSocket错误: {str(error)}")
   
    def on_close(self, ws, close_status_code, close_msg):
      """连接关闭回调"""
      print("WebSocket连接已关闭")
      self.connected = False
      
      # 尝试重新连接
      print("尝试重新连接...")
      time.sleep(3)
      self.connect()
   
    def subscribe(self, pids):
      """订阅股票"""
      if not self.connected:
            print("未连接,无法订阅")
            return False
      
      # 添加到订阅列表
      self.subscriptions.update(pids)
      
      # 构造订阅消息
      message = json.dumps({
            "action": "subscribe",
            "pids": list(pids)
      })
      
      # 发送订阅请求
      self.ws.send(message)
      print(f"已订阅: {', '.join(map(str, pids))}")
      return True
   
    def unsubscribe(self, pids):
      """取消订阅股票"""
      if not self.connected:
            print("未连接,无法取消订阅")
            return False
      
      # 从订阅列表中移除
      for pid in pids:
            self.subscriptions.discard(pid)
      
      # 构造取消订阅消息
      message = json.dumps({
            "action": "unsubscribe",
            "pids": list(pids)
      })
      
      # 发送取消订阅请求
      self.ws.send(message)
      print(f"已取消订阅: {', '.join(map(str, pids))}")
      return True
   
    def heartbeat(self):
      """心跳维护"""
      while True:
            if self.connected:
                try:
                  # 每30秒发送一次心跳
                  self.ws.send(json.dumps({"action": "ping"}))
                except Exception as e:
                  print(f"发送心跳失败: {str(e)}")
            time.sleep(30)

# 使用示例
if __name__ == "__main__":
    # 创建WebSocket客户端
    ws_client = IDXWebSocketClient(API_KEY)
   
    # 订阅股票(需要先获取股票ID)
    time.sleep(2)# 等待连接建立
    ws_client.subscribe()# 订阅BBCA和BRIS
   
    # 保持主线程运行
    try:
      while True:
            time.sleep(1)
    except KeyboardInterrupt:
      print("程序已终止")高级功能实现

1. 数据缓存策略

from cachetools import TTLCache

class IDXDataCache:
    def __init__(self, maxsize=100, ttl=60):
      """初始化数据缓存"""
      self.cache = TTLCache(maxsize=maxsize, ttl=ttl)
   
    def get_stock_data(self, symbol_or_id):
      """获取股票数据(带缓存)"""
      # 检查缓存
      if symbol_or_id in self.cache:
            return self.cache
      
      # 从API获取
      data = get_stock_detail(symbol_or_id)
      
      # 更新缓存
      if data:
            self.cache = data
      
      return data

# 使用示例
cache = IDXDataCache()
bbca_data = cache.get_stock_data("BBCA")2. 实时数据处理器

class RealTimeDataProcessor:
    def __init__(self):
      self.data_buffer = {}
      self.batch_size = 10
      self.last_process_time = time.time()
   
    def add_data(self, symbol, data):
      """添加实时数据到缓冲区"""
      if symbol not in self.data_buffer:
            self.data_buffer = []
      
      self.data_buffer.append(data)
      
      # 检查是否达到批处理条件
      current_time = time.time()
      if (len(self.data_buffer) >= self.batch_size or
            current_time - self.last_process_time >= 1.0):
            self.process_data(symbol)
            self.last_process_time = current_time
   
    def process_data(self, symbol):
      """处理缓冲区的数据"""
      if symbol not in self.data_buffer or not self.data_buffer:
            return
      
      data_points = self.data_buffer
      
      # 计算统计指标
      prices = for d in data_points]
      volumes =
      
      avg_price = sum(prices) / len(prices)
      max_price = max(prices)
      min_price = min(prices)
      total_volume = sum(volumes)
      
      print(f"\n{symbol} 实时数据统计 (最近 {len(data_points)} 个更新):")
      print(f"平均价格: {avg_price:.2f}, 最高: {max_price:.2f}, 最低: {min_price:.2f}")
      print(f"总成交量: {total_volume}")
      
      # 清空缓冲区
      self.data_buffer = []

# 在WebSocket客户端的on_message方法中使用
processor = RealTimeDataProcessor()

def on_message(self, ws, message):
    try:
      data = json.loads(message)
      
      if "pid" in data:
            symbol = data.get("symbol", "Unknown")
            processor.add_data(symbol, data)
            
    except Exception as e:
      print(f"处理实时数据时出错: {str(e)}")3. 错误处理与重试机制

def api_request_with_retry(url, params, max_retries=3):
    """带重试机制的API请求"""
    for attempt in range(max_retries):
      try:
            response = requests.get(url, params=params, timeout=10)
            if response.status_code == 200:
                data = response.json()
                if data.get("code") == 200:
                  return data
                elif data.get("code") == 429:# 请求过多
                  retry_after = int(data.get("retryAfter", 30))
                  print(f"请求过于频繁,等待 {retry_after} 秒后重试...")
                  time.sleep(retry_after)
                else:
                  print(f"API返回错误: {data.get('message')}")
            else:
                print(f"请求失败,状态码: {response.status_code}")
      except Exception as e:
            print(f"请求异常: {str(e)}")
      
      # 指数退避等待
      wait_time = 2 ** attempt
      print(f"等待 {wait_time} 秒后重试 (尝试 {attempt+1}/{max_retries})")
      time.sleep(wait_time)
   
    print(f"请求失败,已达最大重试次数 {max_retries}")
    return None完整示例应用

class IDXStockMonitor:
    def __init__(self, api_key):
      self.api_key = api_key
      self.ws_client = None
      self.data_cache = IDXDataCache()
      self.monitored_stocks = set()
      
    def start_monitoring(self, symbols):
      """开始监控指定股票"""
      print("开始监控印尼股票...")
      
      # 获取股票ID
      stock_ids = []
      for symbol in symbols:
            stock_data = self.data_cache.get_stock_data(symbol)
            if stock_data:
                stock_ids.append(stock_data["id"])
                self.monitored_stocks.add(symbol)
                print(f"已添加监控: {symbol} (ID: {stock_data['id']})")
            else:
                print(f"无法获取股票信息: {symbol}")
      
      # 启动WebSocket连接
      if stock_ids:
            self.ws_client = IDXWebSocketClient(self.api_key)
            time.sleep(2)# 等待连接建立
            self.ws_client.subscribe(stock_ids)
      
      # 启动定期数据更新
      self.start_periodic_updates()
   
    def start_periodic_updates(self):
      """启动定期数据更新"""
      def update_loop():
            while True:
                # 每5分钟更新一次指数数据
                self.update_indices()
               
                # 每10分钟更新一次股票列表
                if len(self.monitored_stocks) < 10:# 只更新少量股票
                  self.update_stock_list()
               
                time.sleep(300)# 5分钟
      
      threading.Thread(target=update_loop, daemon=True).start()
   
    def update_indices(self):
      """更新指数数据"""
      print("\n更新印尼指数数据...")
      indices = get_indonesia_indices()
      for index in indices:
            print(f"{index['symbol']}: {index['last']} ({index['chgPct']}%)")
   
    def update_stock_list(self):
      """更新股票列表"""
      print("\n更新印尼股票列表...")
      stocks = get_indonesia_stocks(page_size=50)
      for stock in stocks[:10]:# 只显示前10只
            print(f"{stock['symbol']}: {stock['name']} - {stock['last']}")
   
    def run(self):
      """运行监控"""
      try:
            # 监控主要印尼股票
            symbols_to_monitor = ["BBCA", "BBRI", "TLKM", "ASII", "UNVR"]
            self.start_monitoring(symbols_to_monitor)
            
            # 保持主线程运行
            while True:
                time.sleep(1)
               
      except KeyboardInterrupt:
            print("\n监控已停止")
      except Exception as e:
            print(f"监控出错: {str(e)}")

# 启动监控
if __name__ == "__main__":
    monitor = IDXStockMonitor(API_KEY)
    monitor.run()部署建议


[*]API密钥管理:不要将API密钥硬编码在代码中,使用环境变量或配置文件
[*]错误处理:增加更完善的错误处理和日志记录
[*]速率限制:遵守API的速率限制,避免频繁请求
[*]数据存储:考虑将重要数据存储到数据库中以供后续分析
[*]监控告警:设置价格波动告警机制
# 从环境变量获取API密钥
import os
API_KEY = os.getenv("STOCKTV_API_KEY", "YOUR_API_KEY")以上是一个完整的印尼股票数据API对接实现方案。您可以根据实际需求进行调整和扩展。如果您需要更多特定功能或有任何问题,请随时告诉我。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: 印度尼西亚股票数据API对接实现