找回密码
 立即注册
首页 业界区 安全 Python如何调用港股行情接口

Python如何调用港股行情接口

伯绮梦 7 天前
1. 接口信息
  1. 接口类型:实时综合行情接口
  2. 支持品种:贵金属,商品期货,外汇,A股,港股,美股
  3. 查询方式:HTTP, WebSocket
  4. 申请密钥:https://infoway.io
  5. 官方对接文档:https://infoway.readme.io/reference/ws-subscription
复制代码
2. 获取股票清单

这个接口用来查询股票的名单,比如我可以获取美股清单:
  1. import requests
  2. url = "https://data.infoway.io/common/basic/symbols?type=STOCK_HK"
  3. headers = {"accept": "application/json"}
  4. response = requests.get(url, headers=headers)
  5. print(response.text)
复制代码
只需要在type中传入STOCK_US,或者STOCK_CN即可查询所有美股或A股的清单。
3. 批量获取股票K线

这个接口用来获取多只股票的K线,请求地址:
  1. https://data.infoway.io/stock/batch_kline/{klineType}/{klineNum}/{codes}
  2. # {klineType} 是K线的时间
  3. # 1 = 1分钟k线
  4. # 2        = 5分钟k线
  5. # 3 = 15分钟k线
  6. # 4        = 30分钟k线
  7. # 5        = 1小时k线
  8. # 6        = 2小时k线
  9. # 7        = 4小时k线
  10. # 8        = 1日k线
  11. # 9        = 1周k线
  12. # 10        = 1月k线
  13. # 11        = 1季k线
  14. # 12        = 1年k线
  15. # {klineNum}是需要返回的K线数量,最大支持一次性返回500根K线
复制代码
可以跨市场查询,比如同时获取A股,港股和美股的个股K线:
  1. import requests
  2. api_url = 'https://data.infoway.io/stock/batch_kline/1/10/002594.SZ%2C00285.HK%2CTSLA.US'
  3. # 申请免费密钥: https://infoway.io
  4. # 设置请求头
  5. headers = {
  6.     'User-Agent': 'Mozilla/5.0',
  7.     'Accept': 'application/json',
  8.     'apiKey': 'yourApikey'
  9. }
  10. # 发送GET请求
  11. response = requests.get(api_url, headers=headers)
  12. # 输出结果
  13. print(f"HTTP code: {response.status_code}")
  14. print(f"message: {response.text}")
复制代码
4. 查询股票盘口

这个接口用于查询股票的交易盘口,最大支持5档盘口查询:
  1. import requests
  2. url = "https://data.infoway.io/stock/batch_depth/002594.SZ%2C00285.HK%2CTSLA.US%2CAUDCAD%2CHK50%2CXNIUSD"
  3. # 申请免费密钥: https://infoway.io
  4. # 设置请求头
  5. headers = {
  6.     'User-Agent': 'Mozilla/5.0',
  7.     'Accept': 'application/json',
  8.     'apiKey': 'yourApikey'
  9. }
  10. response = requests.get(url, headers=headers)
  11. print(response.text)
复制代码
5. WebSocket订阅实时股票行情

以上介绍的是HTTP请求,这种方式存在一定的延时,在实盘交易中推荐使用WebSocket来获取实时的行情推送。WebSocket的优势是和服务器建立一个长连接,只要连接不断开都会收到来自服务器的数据推送,延时一般在120ms以内。而你只需要每隔一分钟向服务器发送一个心跳来保活即可。下面是WebSocket的代码示例,虽然是查询的贵金属,但原理是一样的:
  1. import json
  2. import time
  3. import schedule
  4. import threading
  5. import websocket
  6. from loguru import logger
  7. class WebsocketExample:
  8.     def __init__(self):
  9.         self.session = None
  10.         # 申请免费API Key: https://infoway.io
  11.         self.ws_url = "wss://data.infoway.io/ws?business=common&apikey=YourAPIKey"
  12.         self.reconnecting = False
  13.         self.last_ping_time = 0
  14.         self.max_ping_interval = 30  # 最大心跳包间隔时间
  15.         self.retry_attempts = 0  # 重试次数
  16.         self.max_retry_attempts = 5  # 最大重试次数
  17.     def connect_all(self):
  18.         """建立WebSocket连接并启动自动重连机制"""
  19.         try:
  20.             self.connect(self.ws_url)
  21.             self.start_reconnection(self.ws_url)
  22.         except Exception as e:
  23.             logger.error(f"Failed to connect to {self.ws_url}: {str(e)}")
  24.     def start_reconnection(self, url):
  25.         """启动定时重连检查"""
  26.         def check_connection():
  27.             if not self.is_connected():
  28.                 logger.debug("Reconnection attempt...")
  29.                 self.retry_attempts += 1
  30.                 if self.retry_attempts <= self.max_retry_attempts:
  31.                     self.connect(url)
  32.                 else:
  33.                     logger.error("Exceeded max retry attempts.")
  34.         
  35.         # 使用线程定期检查连接状态
  36.         threading.Thread(target=lambda: schedule.every(10).seconds.do(check_connection), daemon=True).start()
  37.     def is_connected(self):
  38.         """检查WebSocket连接状态"""
  39.         return self.session and self.session.connected
  40.     def connect(self, url):
  41.         """建立WebSocket连接"""
  42.         try:
  43.             if self.is_connected():
  44.                 self.session.close()
  45.             
  46.             self.session = websocket.WebSocketApp(
  47.                 url,
  48.                 on_open=self.on_open,
  49.                 on_message=self.on_message,
  50.                 on_error=self.on_error,
  51.                 on_close=self.on_close
  52.             )
  53.             
  54.             # 启动WebSocket连接(非阻塞模式)
  55.             threading.Thread(target=self.session.run_forever, daemon=True).start()
  56.         except Exception as e:
  57.             logger.error(f"Failed to connect to the server: {str(e)}")
  58.     def on_open(self, ws):
  59.         """WebSocket连接建立成功后的回调"""
  60.         logger.info(f"Connection opened")
  61.         
  62.         try:
  63.             # 发送贵金属实时成交明细订阅请求
  64.             trade_send_obj = {
  65.                 "code": 10000,
  66.                 "trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
  67.                 "data": {"codes": "XAUUSD"}  # XAUUSD 为贵金属黄金(黄金/美元)
  68.             }
  69.             self.send_message(trade_send_obj)
  70.             
  71.             # 不同请求之间间隔一段时间
  72.             time.sleep(5)
  73.             
  74.             # 发送贵金属实时盘口数据订阅请求
  75.             depth_send_obj = {
  76.                 "code": 10003,
  77.                 "trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
  78.                 "data": {"codes": "XAUUSD"}  # XAUUSD为黄金的实时盘口数据
  79.             }
  80.             self.send_message(depth_send_obj)
  81.             
  82.             # 不同请求之间间隔一段时间
  83.             time.sleep(5)
  84.             
  85.             # 发送贵金属实时K线数据订阅请求
  86.             kline_data = {
  87.                 "arr": [
  88.                     {
  89.                         "type": 1,
  90.                         "codes": "XAUUSD"  # XAUUSD 为黄金K线数据
  91.                     }
  92.                 ]
  93.             }
  94.             kline_send_obj = {
  95.                 "code": 10006,
  96.                 "trace": "01213e9d-90a0-426e-a380-ebed633cba7a",
  97.                 "data": kline_data
  98.             }
  99.             self.send_message(kline_send_obj)
  100.             
  101.             # 启动定时心跳任务
  102.             threading.Thread(target=lambda: schedule.every(30).seconds.do(self.ping), daemon=True).start()
  103.             
  104.         except Exception as e:
  105.             logger.error(f"Error sending initial messages: {str(e)}")
  106.     def on_message(self, ws, message):
  107.         """接收消息的回调"""
  108.         try:
  109.             logger.info(f"Message received: {message}")
  110.         except Exception as e:
  111.             logger.error(f"Error processing message: {str(e)}")
  112.     def on_close(self, ws, close_status_code, close_msg):
  113.         """连接关闭的回调"""
  114.         logger.info(f"Connection closed: {close_status_code} - {close_msg}")
  115.     def on_error(self, ws, error):
  116.         """错误处理的回调"""
  117.         logger.error(f"WebSocket error: {str(error)}")
  118.     def send_message(self, message_obj):
  119.         """发送消息到WebSocket服务器"""
  120.         if self.is_connected():
  121.             try:
  122.                 self.session.send(json.dumps(message_obj))
  123.             except Exception as e:
  124.                 logger.error(f"Error sending message: {str(e)}")
  125.         else:
  126.             logger.warning("Cannot send message: Not connected")
  127.     def ping(self):
  128.         """发送心跳包"""
  129.         current_time = time.time()
  130.         if current_time - self.last_ping_time >= self.max_ping_interval:
  131.             ping_obj = {
  132.                 "code": 10010,
  133.                 "trace": "01213e9d-90a0-426e-a380-ebed633cba7a"
  134.             }
  135.             self.send_message(ping_obj)
  136.             self.last_ping_time = current_time
  137.         else:
  138.             logger.debug(f"Ping skipped: Time interval between pings is less than {self.max_ping_interval} seconds.")
  139. # 使用示例
  140. if __name__ == "__main__":
  141.     ws_client = WebsocketExample()
  142.     ws_client.connect_all()
  143.    
  144.     # 保持主线程运行
  145.     try:
  146.         while True:
  147.             schedule.run_pending()
  148.             time.sleep(1)
  149.     except KeyboardInterrupt:
  150.         logger.info("Exiting...")
  151.         if ws_client.is_connected():
  152.             ws_client.session.close()            
复制代码
6. 常见问题

HTTP和WebSocket有什么区别?

通过HTTP请求,返回的数据是基于请求和响应传输的。客户端向服务器发送请求,服务器返回响应。每次请求/响应都必须是完整的。而WebSocket的数据是全双工的,客户端和服务器可以随时向对方发送数据,也就是所谓的长连接。它更适用于实时通信,因为一旦连接建立,可以保持双向流量。
港股的延迟是多少?

这个和接口的设计有关,一般通过WebSocket获取的数据,延迟在100ms左右。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册