环境准备
首先安装必要的依赖包:- pip install requests websocket-client pandas numpy
复制代码 基础配置
- import requests
- import json
- import websocket
- import threading
- import time
- from datetime import datetime
- # API配置
- API_KEY = "YOUR_API_KEY" # 替换为您的实际API密钥
- BASE_URL = "https://api.stocktv.top"
- WS_URL = "wss://ws-api.stocktv.top/connect"
- # 印尼股票代码映射(示例)
- IDX_SYMBOLS = {
- "BBCA": "Bank Central Asia",
- "BBRI": "Bank Rakyat Indonesia",
- "TLKM": "Telkom Indonesia",
- "ASII": "Astra International",
- "UNVR": "Unilever Indonesia"
- }
复制代码 REST API实现
1. 获取印尼股票列表
- def get_indonesia_stocks(page=1, page_size=100):
- """获取印尼交易所股票列表"""
- url = f"{BASE_URL}/stock/stocks"
- params = {
- "countryId": 42, # 印尼国家ID
- "exchangeId": 62, # IDX交易所ID
- "pageSize": page_size,
- "page": page,
- "key": API_KEY
- }
-
- try:
- response = requests.get(url, params=params)
- if response.status_code == 200:
- data = response.json()
- if data.get("code") == 200:
- return data["data"]["records"]
- else:
- print(f"API Error: {data.get('message')}")
- else:
- print(f"Request failed with status: {response.status_code}")
- except Exception as e:
- print(f"Error fetching stock list: {str(e)}")
-
- return []
- # 示例:获取第一页股票列表
- stocks = get_indonesia_stocks()
- for stock in stocks:
- print(f"{stock['symbol']}: {stock['name']} - {stock['last']}")
复制代码 2. 查询特定股票详情
- def get_stock_detail(symbol_or_id):
- """获取股票详细信息"""
- url = f"{BASE_URL}/stock/queryStocks"
-
- # 判断是symbol还是id
- if isinstance(symbol_or_id, str) and symbol_or_id.isdigit():
- params = {"id": symbol_or_id, "key": API_KEY}
- else:
- params = {"symbol": symbol_or_id, "key": API_KEY}
-
- try:
- response = requests.get(url, params=params)
- if response.status_code == 200:
- data = response.json()
- if data.get("code") == 200 and data["data"]:
- return data["data"][0]
- else:
- print(f"API Error: {data.get('message')}")
- else:
- print(f"Request failed with status: {response.status_code}")
- except Exception as e:
- print(f"Error fetching stock detail: {str(e)}")
-
- return None
- # 示例:获取BBCA股票详情
- bbca_detail = get_stock_detail("BBCA")
- if bbca_detail:
- print(f"BBCA当前价格: {bbca_detail['last']}")
- print(f"涨跌幅: {bbca_detail['chgPct']}%")
复制代码 3. 获取指数数据
- def get_indonesia_indices():
- """获取印尼主要指数"""
- url = f"{BASE_URL}/stock/indices"
- params = {
- "countryId": 42, # 印尼国家ID
- "key": API_KEY
- }
-
- try:
- response = requests.get(url, params=params)
- if response.status_code == 200:
- data = response.json()
- if data.get("code") == 200:
- return data["data"]
- else:
- print(f"API Error: {data.get('message')}")
- else:
- print(f"Request failed with status: {response.status_code}")
- except Exception as e:
- print(f"Error fetching indices: {str(e)}")
-
- return []
- # 示例:获取印尼指数
- indices = get_indonesia_indices()
- for index in indices:
- print(f"{index['symbol']}: {index['last']} ({index['chgPct']}%)")
复制代码 4. 获取K线数据
- def get_kline_data(pid, interval="PT15M"):
- """获取股票K线数据"""
- url = f"{BASE_URL}/stock/kline"
- params = {
- "pid": pid,
- "interval": interval,
- "key": API_KEY
- }
-
- try:
- response = requests.get(url, params=params)
- if response.status_code == 200:
- data = response.json()
- if data.get("code") == 200:
- return data["data"]
- else:
- print(f"API Error: {data.get('message')}")
- else:
- print(f"Request failed with status: {response.status_code}")
- except Exception as e:
- print(f"Error fetching kline data: {str(e)}")
-
- return []
- # 示例:获取BBCA的15分钟K线数据
- bbca_kline = get_kline_data(41602, "PT15M")
- for kline in bbca_kline[:5]: # 显示前5条
- dt = datetime.fromtimestamp(kline["time"] / 1000)
- print(f"{dt}: O:{kline['open']} H:{kline['high']} L:{kline['low']} C:{kline['close']}")
复制代码 5. 获取涨跌排行榜
- def get_top_gainers():
- """获取涨幅榜"""
- url = f"{BASE_URL}/stock/updownList"
- params = {
- "countryId": 42, # 印尼国家ID
- "type": 1, # 1=涨幅榜, 2=跌幅榜
- "key": API_KEY
- }
-
- try:
- response = requests.get(url, params=params)
- if response.status_code == 200:
- data = response.json()
- if data.get("code") == 200:
- return data["data"]
- else:
- print(f"API Error: {data.get('message')}")
- else:
- print(f"Request failed with status: {response.status_code}")
- except Exception as e:
- print(f"Error fetching top gainers: {str(e)}")
-
- return []
- # 示例:获取印尼涨幅榜
- gainers = get_top_gainers()
- for stock in gainers[:10]: # 显示前10名
- print(f"{stock['symbol']}: {stock['last']} (+{stock['chgPct']}%)")
复制代码 WebSocket实时数据
WebSocket客户端实现
- class IDXWebSocketClient:
- def __init__(self, api_key):
- self.api_key = api_key
- self.ws = None
- self.connected = False
- self.subscriptions = set()
-
- # 启动连接
- self.connect()
-
- # 启动心跳线程
- threading.Thread(target=self.heartbeat, daemon=True).start()
-
- def connect(self):
- """连接WebSocket服务器"""
- ws_url = f"{WS_URL}?key={self.api_key}"
-
- self.ws = websocket.WebSocketApp(
- ws_url,
- on_open=self.on_open,
- on_message=self.on_message,
- on_error=self.on_error,
- on_close=self.on_close
- )
-
- # 启动WebSocket线程
- threading.Thread(target=self.ws.run_forever).start()
- time.sleep(1) # 等待连接建立
-
- def on_open(self, ws):
- """连接建立回调"""
- print("已连接到印尼股票实时数据服务")
- self.connected = True
-
- # 重新订阅之前订阅的股票
- if self.subscriptions:
- self.subscribe(list(self.subscriptions))
-
- def on_message(self, ws, message):
- """接收消息回调"""
- try:
- data = json.loads(message)
-
- # 处理实时行情数据
- if "pid" in data:
- symbol = data.get("symbol", "Unknown")
- price = data.get("last_numeric", 0)
- change = data.get("pc", 0)
- change_pct = data.get("pcp", 0)
-
- print(f"实时行情 [{symbol}]: {price} ({change} / {change_pct}%)")
-
- # 处理心跳响应
- elif data.get("action") == "pong":
- pass
-
- except Exception as e:
- print(f"处理实时数据时出错: {str(e)}")
-
- def on_error(self, ws, error):
- """错误处理回调"""
- print(f"WebSocket错误: {str(error)}")
-
- def on_close(self, ws, close_status_code, close_msg):
- """连接关闭回调"""
- print("WebSocket连接已关闭")
- self.connected = False
-
- # 尝试重新连接
- print("尝试重新连接...")
- time.sleep(3)
- self.connect()
-
- def subscribe(self, pids):
- """订阅股票"""
- if not self.connected:
- print("未连接,无法订阅")
- return False
-
- # 添加到订阅列表
- self.subscriptions.update(pids)
-
- # 构造订阅消息
- message = json.dumps({
- "action": "subscribe",
- "pids": list(pids)
- })
-
- # 发送订阅请求
- self.ws.send(message)
- print(f"已订阅: {', '.join(map(str, pids))}")
- return True
-
- def unsubscribe(self, pids):
- """取消订阅股票"""
- if not self.connected:
- print("未连接,无法取消订阅")
- return False
-
- # 从订阅列表中移除
- for pid in pids:
- self.subscriptions.discard(pid)
-
- # 构造取消订阅消息
- message = json.dumps({
- "action": "unsubscribe",
- "pids": list(pids)
- })
-
- # 发送取消订阅请求
- self.ws.send(message)
- print(f"已取消订阅: {', '.join(map(str, pids))}")
- return True
-
- def heartbeat(self):
- """心跳维护"""
- while True:
- if self.connected:
- try:
- # 每30秒发送一次心跳
- self.ws.send(json.dumps({"action": "ping"}))
- except Exception as e:
- print(f"发送心跳失败: {str(e)}")
- time.sleep(30)
- # 使用示例
- if __name__ == "__main__":
- # 创建WebSocket客户端
- ws_client = IDXWebSocketClient(API_KEY)
-
- # 订阅股票(需要先获取股票ID)
- time.sleep(2) # 等待连接建立
- ws_client.subscribe([41602, 41605]) # 订阅BBCA和BRIS
-
- # 保持主线程运行
- try:
- while True:
- time.sleep(1)
- except KeyboardInterrupt:
- print("程序已终止")
复制代码 高级功能实现
1. 数据缓存策略
- from cachetools import TTLCache
- class IDXDataCache:
- def __init__(self, maxsize=100, ttl=60):
- """初始化数据缓存"""
- self.cache = TTLCache(maxsize=maxsize, ttl=ttl)
-
- def get_stock_data(self, symbol_or_id):
- """获取股票数据(带缓存)"""
- # 检查缓存
- if symbol_or_id in self.cache:
- return self.cache[symbol_or_id]
-
- # 从API获取
- data = get_stock_detail(symbol_or_id)
-
- # 更新缓存
- if data:
- self.cache[symbol_or_id] = data
-
- return data
- # 使用示例
- cache = IDXDataCache()
- bbca_data = cache.get_stock_data("BBCA")
复制代码 2. 实时数据处理器
- class RealTimeDataProcessor:
- def __init__(self):
- self.data_buffer = {}
- self.batch_size = 10
- self.last_process_time = time.time()
-
- def add_data(self, symbol, data):
- """添加实时数据到缓冲区"""
- if symbol not in self.data_buffer:
- self.data_buffer[symbol] = []
-
- self.data_buffer[symbol].append(data)
-
- # 检查是否达到批处理条件
- current_time = time.time()
- if (len(self.data_buffer[symbol]) >= self.batch_size or
- current_time - self.last_process_time >= 1.0):
- self.process_data(symbol)
- self.last_process_time = current_time
-
- def process_data(self, symbol):
- """处理缓冲区的数据"""
- if symbol not in self.data_buffer or not self.data_buffer[symbol]:
- return
-
- data_points = self.data_buffer[symbol]
-
- # 计算统计指标
- prices = [d["last_numeric"] for d in data_points]
- volumes = [d.get("turnover_numeric", 0) for d in data_points]
-
- avg_price = sum(prices) / len(prices)
- max_price = max(prices)
- min_price = min(prices)
- total_volume = sum(volumes)
-
- print(f"\n{symbol} 实时数据统计 (最近 {len(data_points)} 个更新):")
- print(f"平均价格: {avg_price:.2f}, 最高: {max_price:.2f}, 最低: {min_price:.2f}")
- print(f"总成交量: {total_volume}")
-
- # 清空缓冲区
- self.data_buffer[symbol] = []
- # 在WebSocket客户端的on_message方法中使用
- processor = RealTimeDataProcessor()
- def on_message(self, ws, message):
- try:
- data = json.loads(message)
-
- if "pid" in data:
- symbol = data.get("symbol", "Unknown")
- processor.add_data(symbol, data)
-
- except Exception as e:
- print(f"处理实时数据时出错: {str(e)}")
复制代码 3. 错误处理与重试机制
- def api_request_with_retry(url, params, max_retries=3):
- """带重试机制的API请求"""
- for attempt in range(max_retries):
- try:
- response = requests.get(url, params=params, timeout=10)
- if response.status_code == 200:
- data = response.json()
- if data.get("code") == 200:
- return data
- elif data.get("code") == 429: # 请求过多
- retry_after = int(data.get("retryAfter", 30))
- print(f"请求过于频繁,等待 {retry_after} 秒后重试...")
- time.sleep(retry_after)
- else:
- print(f"API返回错误: {data.get('message')}")
- else:
- print(f"请求失败,状态码: {response.status_code}")
- except Exception as e:
- print(f"请求异常: {str(e)}")
-
- # 指数退避等待
- wait_time = 2 ** attempt
- print(f"等待 {wait_time} 秒后重试 (尝试 {attempt+1}/{max_retries})")
- time.sleep(wait_time)
-
- print(f"请求失败,已达最大重试次数 {max_retries}")
- return None
复制代码 完整示例应用
- class IDXStockMonitor:
- def __init__(self, api_key):
- self.api_key = api_key
- self.ws_client = None
- self.data_cache = IDXDataCache()
- self.monitored_stocks = set()
-
- def start_monitoring(self, symbols):
- """开始监控指定股票"""
- print("开始监控印尼股票...")
-
- # 获取股票ID
- stock_ids = []
- for symbol in symbols:
- stock_data = self.data_cache.get_stock_data(symbol)
- if stock_data:
- stock_ids.append(stock_data["id"])
- self.monitored_stocks.add(symbol)
- print(f"已添加监控: {symbol} (ID: {stock_data['id']})")
- else:
- print(f"无法获取股票信息: {symbol}")
-
- # 启动WebSocket连接
- if stock_ids:
- self.ws_client = IDXWebSocketClient(self.api_key)
- time.sleep(2) # 等待连接建立
- self.ws_client.subscribe(stock_ids)
-
- # 启动定期数据更新
- self.start_periodic_updates()
-
- def start_periodic_updates(self):
- """启动定期数据更新"""
- def update_loop():
- while True:
- # 每5分钟更新一次指数数据
- self.update_indices()
-
- # 每10分钟更新一次股票列表
- if len(self.monitored_stocks) < 10: # 只更新少量股票
- self.update_stock_list()
-
- time.sleep(300) # 5分钟
-
- threading.Thread(target=update_loop, daemon=True).start()
-
- def update_indices(self):
- """更新指数数据"""
- print("\n更新印尼指数数据...")
- indices = get_indonesia_indices()
- for index in indices:
- print(f"{index['symbol']}: {index['last']} ({index['chgPct']}%)")
-
- def update_stock_list(self):
- """更新股票列表"""
- print("\n更新印尼股票列表...")
- stocks = get_indonesia_stocks(page_size=50)
- for stock in stocks[:10]: # 只显示前10只
- print(f"{stock['symbol']}: {stock['name']} - {stock['last']}")
-
- def run(self):
- """运行监控"""
- try:
- # 监控主要印尼股票
- symbols_to_monitor = ["BBCA", "BBRI", "TLKM", "ASII", "UNVR"]
- self.start_monitoring(symbols_to_monitor)
-
- # 保持主线程运行
- while True:
- time.sleep(1)
-
- except KeyboardInterrupt:
- print("\n监控已停止")
- except Exception as e:
- print(f"监控出错: {str(e)}")
- # 启动监控
- if __name__ == "__main__":
- monitor = IDXStockMonitor(API_KEY)
- monitor.run()
复制代码 部署建议
- API密钥管理:不要将API密钥硬编码在代码中,使用环境变量或配置文件
- 错误处理:增加更完善的错误处理和日志记录
- 速率限制:遵守API的速率限制,避免频繁请求
- 数据存储:考虑将重要数据存储到数据库中以供后续分析
- 监控告警:设置价格波动告警机制
- # 从环境变量获取API密钥
- import os
- API_KEY = os.getenv("STOCKTV_API_KEY", "YOUR_API_KEY")
复制代码 以上是一个完整的印尼股票数据API对接实现方案。您可以根据实际需求进行调整和扩展。如果您需要更多特定功能或有任何问题,请随时告诉我。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |