找回密码
 立即注册
首页 业界区 安全 印度尼西亚股票数据API对接实现

印度尼西亚股票数据API对接实现

狭踝仇 昨天 11:34
环境准备

首先安装必要的依赖包:
  1. pip install requests websocket-client pandas numpy
复制代码
基础配置
  1. import requests
  2. import json
  3. import websocket
  4. import threading
  5. import time
  6. from datetime import datetime
  7. # API配置
  8. API_KEY = "YOUR_API_KEY"  # 替换为您的实际API密钥
  9. BASE_URL = "https://api.stocktv.top"
  10. WS_URL = "wss://ws-api.stocktv.top/connect"
  11. # 印尼股票代码映射(示例)
  12. IDX_SYMBOLS = {
  13.     "BBCA": "Bank Central Asia",
  14.     "BBRI": "Bank Rakyat Indonesia",
  15.     "TLKM": "Telkom Indonesia",
  16.     "ASII": "Astra International",
  17.     "UNVR": "Unilever Indonesia"
  18. }
复制代码
REST API实现

1. 获取印尼股票列表
  1. def get_indonesia_stocks(page=1, page_size=100):
  2.     """获取印尼交易所股票列表"""
  3.     url = f"{BASE_URL}/stock/stocks"
  4.     params = {
  5.         "countryId": 42,      # 印尼国家ID
  6.         "exchangeId": 62,     # IDX交易所ID
  7.         "pageSize": page_size,
  8.         "page": page,
  9.         "key": API_KEY
  10.     }
  11.    
  12.     try:
  13.         response = requests.get(url, params=params)
  14.         if response.status_code == 200:
  15.             data = response.json()
  16.             if data.get("code") == 200:
  17.                 return data["data"]["records"]
  18.             else:
  19.                 print(f"API Error: {data.get('message')}")
  20.         else:
  21.             print(f"Request failed with status: {response.status_code}")
  22.     except Exception as e:
  23.         print(f"Error fetching stock list: {str(e)}")
  24.    
  25.     return []
  26. # 示例:获取第一页股票列表
  27. stocks = get_indonesia_stocks()
  28. for stock in stocks:
  29.     print(f"{stock['symbol']}: {stock['name']} - {stock['last']}")
复制代码
2. 查询特定股票详情
  1. def get_stock_detail(symbol_or_id):
  2.     """获取股票详细信息"""
  3.     url = f"{BASE_URL}/stock/queryStocks"
  4.    
  5.     # 判断是symbol还是id
  6.     if isinstance(symbol_or_id, str) and symbol_or_id.isdigit():
  7.         params = {"id": symbol_or_id, "key": API_KEY}
  8.     else:
  9.         params = {"symbol": symbol_or_id, "key": API_KEY}
  10.    
  11.     try:
  12.         response = requests.get(url, params=params)
  13.         if response.status_code == 200:
  14.             data = response.json()
  15.             if data.get("code") == 200 and data["data"]:
  16.                 return data["data"][0]
  17.             else:
  18.                 print(f"API Error: {data.get('message')}")
  19.         else:
  20.             print(f"Request failed with status: {response.status_code}")
  21.     except Exception as e:
  22.         print(f"Error fetching stock detail: {str(e)}")
  23.    
  24.     return None
  25. # 示例:获取BBCA股票详情
  26. bbca_detail = get_stock_detail("BBCA")
  27. if bbca_detail:
  28.     print(f"BBCA当前价格: {bbca_detail['last']}")
  29.     print(f"涨跌幅: {bbca_detail['chgPct']}%")
复制代码
3. 获取指数数据
  1. def get_indonesia_indices():
  2.     """获取印尼主要指数"""
  3.     url = f"{BASE_URL}/stock/indices"
  4.     params = {
  5.         "countryId": 42,  # 印尼国家ID
  6.         "key": API_KEY
  7.     }
  8.    
  9.     try:
  10.         response = requests.get(url, params=params)
  11.         if response.status_code == 200:
  12.             data = response.json()
  13.             if data.get("code") == 200:
  14.                 return data["data"]
  15.             else:
  16.                 print(f"API Error: {data.get('message')}")
  17.         else:
  18.             print(f"Request failed with status: {response.status_code}")
  19.     except Exception as e:
  20.         print(f"Error fetching indices: {str(e)}")
  21.    
  22.     return []
  23. # 示例:获取印尼指数
  24. indices = get_indonesia_indices()
  25. for index in indices:
  26.     print(f"{index['symbol']}: {index['last']} ({index['chgPct']}%)")
复制代码
4. 获取K线数据
  1. def get_kline_data(pid, interval="PT15M"):
  2.     """获取股票K线数据"""
  3.     url = f"{BASE_URL}/stock/kline"
  4.     params = {
  5.         "pid": pid,
  6.         "interval": interval,
  7.         "key": API_KEY
  8.     }
  9.    
  10.     try:
  11.         response = requests.get(url, params=params)
  12.         if response.status_code == 200:
  13.             data = response.json()
  14.             if data.get("code") == 200:
  15.                 return data["data"]
  16.             else:
  17.                 print(f"API Error: {data.get('message')}")
  18.         else:
  19.             print(f"Request failed with status: {response.status_code}")
  20.     except Exception as e:
  21.         print(f"Error fetching kline data: {str(e)}")
  22.    
  23.     return []
  24. # 示例:获取BBCA的15分钟K线数据
  25. bbca_kline = get_kline_data(41602, "PT15M")
  26. for kline in bbca_kline[:5]:  # 显示前5条
  27.     dt = datetime.fromtimestamp(kline["time"] / 1000)
  28.     print(f"{dt}: O:{kline['open']} H:{kline['high']} L:{kline['low']} C:{kline['close']}")
复制代码
5. 获取涨跌排行榜
  1. def get_top_gainers():
  2.     """获取涨幅榜"""
  3.     url = f"{BASE_URL}/stock/updownList"
  4.     params = {
  5.         "countryId": 42,  # 印尼国家ID
  6.         "type": 1,         # 1=涨幅榜, 2=跌幅榜
  7.         "key": API_KEY
  8.     }
  9.    
  10.     try:
  11.         response = requests.get(url, params=params)
  12.         if response.status_code == 200:
  13.             data = response.json()
  14.             if data.get("code") == 200:
  15.                 return data["data"]
  16.             else:
  17.                 print(f"API Error: {data.get('message')}")
  18.         else:
  19.             print(f"Request failed with status: {response.status_code}")
  20.     except Exception as e:
  21.         print(f"Error fetching top gainers: {str(e)}")
  22.    
  23.     return []
  24. # 示例:获取印尼涨幅榜
  25. gainers = get_top_gainers()
  26. for stock in gainers[:10]:  # 显示前10名
  27.     print(f"{stock['symbol']}: {stock['last']} (+{stock['chgPct']}%)")
复制代码
WebSocket实时数据

WebSocket客户端实现
  1. class IDXWebSocketClient:
  2.     def __init__(self, api_key):
  3.         self.api_key = api_key
  4.         self.ws = None
  5.         self.connected = False
  6.         self.subscriptions = set()
  7.         
  8.         # 启动连接
  9.         self.connect()
  10.         
  11.         # 启动心跳线程
  12.         threading.Thread(target=self.heartbeat, daemon=True).start()
  13.    
  14.     def connect(self):
  15.         """连接WebSocket服务器"""
  16.         ws_url = f"{WS_URL}?key={self.api_key}"
  17.         
  18.         self.ws = websocket.WebSocketApp(
  19.             ws_url,
  20.             on_open=self.on_open,
  21.             on_message=self.on_message,
  22.             on_error=self.on_error,
  23.             on_close=self.on_close
  24.         )
  25.         
  26.         # 启动WebSocket线程
  27.         threading.Thread(target=self.ws.run_forever).start()
  28.         time.sleep(1)  # 等待连接建立
  29.    
  30.     def on_open(self, ws):
  31.         """连接建立回调"""
  32.         print("已连接到印尼股票实时数据服务")
  33.         self.connected = True
  34.         
  35.         # 重新订阅之前订阅的股票
  36.         if self.subscriptions:
  37.             self.subscribe(list(self.subscriptions))
  38.    
  39.     def on_message(self, ws, message):
  40.         """接收消息回调"""
  41.         try:
  42.             data = json.loads(message)
  43.             
  44.             # 处理实时行情数据
  45.             if "pid" in data:
  46.                 symbol = data.get("symbol", "Unknown")
  47.                 price = data.get("last_numeric", 0)
  48.                 change = data.get("pc", 0)
  49.                 change_pct = data.get("pcp", 0)
  50.                
  51.                 print(f"实时行情 [{symbol}]: {price} ({change} / {change_pct}%)")
  52.                
  53.             # 处理心跳响应
  54.             elif data.get("action") == "pong":
  55.                 pass
  56.                
  57.         except Exception as e:
  58.             print(f"处理实时数据时出错: {str(e)}")
  59.    
  60.     def on_error(self, ws, error):
  61.         """错误处理回调"""
  62.         print(f"WebSocket错误: {str(error)}")
  63.    
  64.     def on_close(self, ws, close_status_code, close_msg):
  65.         """连接关闭回调"""
  66.         print("WebSocket连接已关闭")
  67.         self.connected = False
  68.         
  69.         # 尝试重新连接
  70.         print("尝试重新连接...")
  71.         time.sleep(3)
  72.         self.connect()
  73.    
  74.     def subscribe(self, pids):
  75.         """订阅股票"""
  76.         if not self.connected:
  77.             print("未连接,无法订阅")
  78.             return False
  79.         
  80.         # 添加到订阅列表
  81.         self.subscriptions.update(pids)
  82.         
  83.         # 构造订阅消息
  84.         message = json.dumps({
  85.             "action": "subscribe",
  86.             "pids": list(pids)
  87.         })
  88.         
  89.         # 发送订阅请求
  90.         self.ws.send(message)
  91.         print(f"已订阅: {', '.join(map(str, pids))}")
  92.         return True
  93.    
  94.     def unsubscribe(self, pids):
  95.         """取消订阅股票"""
  96.         if not self.connected:
  97.             print("未连接,无法取消订阅")
  98.             return False
  99.         
  100.         # 从订阅列表中移除
  101.         for pid in pids:
  102.             self.subscriptions.discard(pid)
  103.         
  104.         # 构造取消订阅消息
  105.         message = json.dumps({
  106.             "action": "unsubscribe",
  107.             "pids": list(pids)
  108.         })
  109.         
  110.         # 发送取消订阅请求
  111.         self.ws.send(message)
  112.         print(f"已取消订阅: {', '.join(map(str, pids))}")
  113.         return True
  114.    
  115.     def heartbeat(self):
  116.         """心跳维护"""
  117.         while True:
  118.             if self.connected:
  119.                 try:
  120.                     # 每30秒发送一次心跳
  121.                     self.ws.send(json.dumps({"action": "ping"}))
  122.                 except Exception as e:
  123.                     print(f"发送心跳失败: {str(e)}")
  124.             time.sleep(30)
  125. # 使用示例
  126. if __name__ == "__main__":
  127.     # 创建WebSocket客户端
  128.     ws_client = IDXWebSocketClient(API_KEY)
  129.    
  130.     # 订阅股票(需要先获取股票ID)
  131.     time.sleep(2)  # 等待连接建立
  132.     ws_client.subscribe([41602, 41605])  # 订阅BBCA和BRIS
  133.    
  134.     # 保持主线程运行
  135.     try:
  136.         while True:
  137.             time.sleep(1)
  138.     except KeyboardInterrupt:
  139.         print("程序已终止")
复制代码
高级功能实现

1. 数据缓存策略
  1. from cachetools import TTLCache
  2. class IDXDataCache:
  3.     def __init__(self, maxsize=100, ttl=60):
  4.         """初始化数据缓存"""
  5.         self.cache = TTLCache(maxsize=maxsize, ttl=ttl)
  6.    
  7.     def get_stock_data(self, symbol_or_id):
  8.         """获取股票数据(带缓存)"""
  9.         # 检查缓存
  10.         if symbol_or_id in self.cache:
  11.             return self.cache[symbol_or_id]
  12.         
  13.         # 从API获取
  14.         data = get_stock_detail(symbol_or_id)
  15.         
  16.         # 更新缓存
  17.         if data:
  18.             self.cache[symbol_or_id] = data
  19.         
  20.         return data
  21. # 使用示例
  22. cache = IDXDataCache()
  23. bbca_data = cache.get_stock_data("BBCA")
复制代码
2. 实时数据处理器
  1. class RealTimeDataProcessor:
  2.     def __init__(self):
  3.         self.data_buffer = {}
  4.         self.batch_size = 10
  5.         self.last_process_time = time.time()
  6.    
  7.     def add_data(self, symbol, data):
  8.         """添加实时数据到缓冲区"""
  9.         if symbol not in self.data_buffer:
  10.             self.data_buffer[symbol] = []
  11.         
  12.         self.data_buffer[symbol].append(data)
  13.         
  14.         # 检查是否达到批处理条件
  15.         current_time = time.time()
  16.         if (len(self.data_buffer[symbol]) >= self.batch_size or
  17.             current_time - self.last_process_time >= 1.0):
  18.             self.process_data(symbol)
  19.             self.last_process_time = current_time
  20.    
  21.     def process_data(self, symbol):
  22.         """处理缓冲区的数据"""
  23.         if symbol not in self.data_buffer or not self.data_buffer[symbol]:
  24.             return
  25.         
  26.         data_points = self.data_buffer[symbol]
  27.         
  28.         # 计算统计指标
  29.         prices = [d["last_numeric"] for d in data_points]
  30.         volumes = [d.get("turnover_numeric", 0) for d in data_points]
  31.         
  32.         avg_price = sum(prices) / len(prices)
  33.         max_price = max(prices)
  34.         min_price = min(prices)
  35.         total_volume = sum(volumes)
  36.         
  37.         print(f"\n{symbol} 实时数据统计 (最近 {len(data_points)} 个更新):")
  38.         print(f"平均价格: {avg_price:.2f}, 最高: {max_price:.2f}, 最低: {min_price:.2f}")
  39.         print(f"总成交量: {total_volume}")
  40.         
  41.         # 清空缓冲区
  42.         self.data_buffer[symbol] = []
  43. # 在WebSocket客户端的on_message方法中使用
  44. processor = RealTimeDataProcessor()
  45. def on_message(self, ws, message):
  46.     try:
  47.         data = json.loads(message)
  48.         
  49.         if "pid" in data:
  50.             symbol = data.get("symbol", "Unknown")
  51.             processor.add_data(symbol, data)
  52.             
  53.     except Exception as e:
  54.         print(f"处理实时数据时出错: {str(e)}")
复制代码
3. 错误处理与重试机制
  1. def api_request_with_retry(url, params, max_retries=3):
  2.     """带重试机制的API请求"""
  3.     for attempt in range(max_retries):
  4.         try:
  5.             response = requests.get(url, params=params, timeout=10)
  6.             if response.status_code == 200:
  7.                 data = response.json()
  8.                 if data.get("code") == 200:
  9.                     return data
  10.                 elif data.get("code") == 429:  # 请求过多
  11.                     retry_after = int(data.get("retryAfter", 30))
  12.                     print(f"请求过于频繁,等待 {retry_after} 秒后重试...")
  13.                     time.sleep(retry_after)
  14.                 else:
  15.                     print(f"API返回错误: {data.get('message')}")
  16.             else:
  17.                 print(f"请求失败,状态码: {response.status_code}")
  18.         except Exception as e:
  19.             print(f"请求异常: {str(e)}")
  20.         
  21.         # 指数退避等待
  22.         wait_time = 2 ** attempt
  23.         print(f"等待 {wait_time} 秒后重试 (尝试 {attempt+1}/{max_retries})")
  24.         time.sleep(wait_time)
  25.    
  26.     print(f"请求失败,已达最大重试次数 {max_retries}")
  27.     return None
复制代码
完整示例应用
  1. class IDXStockMonitor:
  2.     def __init__(self, api_key):
  3.         self.api_key = api_key
  4.         self.ws_client = None
  5.         self.data_cache = IDXDataCache()
  6.         self.monitored_stocks = set()
  7.         
  8.     def start_monitoring(self, symbols):
  9.         """开始监控指定股票"""
  10.         print("开始监控印尼股票...")
  11.         
  12.         # 获取股票ID
  13.         stock_ids = []
  14.         for symbol in symbols:
  15.             stock_data = self.data_cache.get_stock_data(symbol)
  16.             if stock_data:
  17.                 stock_ids.append(stock_data["id"])
  18.                 self.monitored_stocks.add(symbol)
  19.                 print(f"已添加监控: {symbol} (ID: {stock_data['id']})")
  20.             else:
  21.                 print(f"无法获取股票信息: {symbol}")
  22.         
  23.         # 启动WebSocket连接
  24.         if stock_ids:
  25.             self.ws_client = IDXWebSocketClient(self.api_key)
  26.             time.sleep(2)  # 等待连接建立
  27.             self.ws_client.subscribe(stock_ids)
  28.         
  29.         # 启动定期数据更新
  30.         self.start_periodic_updates()
  31.    
  32.     def start_periodic_updates(self):
  33.         """启动定期数据更新"""
  34.         def update_loop():
  35.             while True:
  36.                 # 每5分钟更新一次指数数据
  37.                 self.update_indices()
  38.                
  39.                 # 每10分钟更新一次股票列表
  40.                 if len(self.monitored_stocks) < 10:  # 只更新少量股票
  41.                     self.update_stock_list()
  42.                
  43.                 time.sleep(300)  # 5分钟
  44.         
  45.         threading.Thread(target=update_loop, daemon=True).start()
  46.    
  47.     def update_indices(self):
  48.         """更新指数数据"""
  49.         print("\n更新印尼指数数据...")
  50.         indices = get_indonesia_indices()
  51.         for index in indices:
  52.             print(f"{index['symbol']}: {index['last']} ({index['chgPct']}%)")
  53.    
  54.     def update_stock_list(self):
  55.         """更新股票列表"""
  56.         print("\n更新印尼股票列表...")
  57.         stocks = get_indonesia_stocks(page_size=50)
  58.         for stock in stocks[:10]:  # 只显示前10只
  59.             print(f"{stock['symbol']}: {stock['name']} - {stock['last']}")
  60.    
  61.     def run(self):
  62.         """运行监控"""
  63.         try:
  64.             # 监控主要印尼股票
  65.             symbols_to_monitor = ["BBCA", "BBRI", "TLKM", "ASII", "UNVR"]
  66.             self.start_monitoring(symbols_to_monitor)
  67.             
  68.             # 保持主线程运行
  69.             while True:
  70.                 time.sleep(1)
  71.                
  72.         except KeyboardInterrupt:
  73.             print("\n监控已停止")
  74.         except Exception as e:
  75.             print(f"监控出错: {str(e)}")
  76. # 启动监控
  77. if __name__ == "__main__":
  78.     monitor = IDXStockMonitor(API_KEY)
  79.     monitor.run()
复制代码
部署建议


  • API密钥管理:不要将API密钥硬编码在代码中,使用环境变量或配置文件
  • 错误处理:增加更完善的错误处理和日志记录
  • 速率限制:遵守API的速率限制,避免频繁请求
  • 数据存储:考虑将重要数据存储到数据库中以供后续分析
  • 监控告警:设置价格波动告警机制
  1. # 从环境变量获取API密钥
  2. import os
  3. API_KEY = os.getenv("STOCKTV_API_KEY", "YOUR_API_KEY")
复制代码
以上是一个完整的印尼股票数据API对接实现方案。您可以根据实际需求进行调整和扩展。如果您需要更多特定功能或有任何问题,请随时告诉我。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册