期货市场API对接完全指南:实时行情获取与实战应用
本文详细介绍了如何通过API接口获取全球期货市场的实时行情数据,包含完整的代码示例、数据处理方法和实战应用场景。
一、期货API概述
期货市场是金融市场的重要组成部分,提供各种商品、金融指数和利率的标准化合约交易。通过期货API,开发者可以获取实时行情、历史数据、合约信息等关键数据,为量化交易、风险管理和市场分析提供支持。
主要期货API提供商对比
- Infoway API:提供全球主要期货市场的实时数据,支持RESTful和WebSocket接口
- Bloomberg:专业级金融数据服务,覆盖全面但成本较高
- Reuters:老牌金融信息提供商,数据准确性高
- Quandl:提供历史期货数据,适合回测和研究
- 各交易所官方API:如CME、ICE等交易所提供的直接数据接口
- StockTV:提供外汇、股票、加密货币等多类金融数据API,无限制接调用次数。提供免费API密钥
二、API接口详解
2.1 期货合约标识
期货合约有特定的命名规则,通常包含:
- 标的物代码(如CL代表原油)
- 到期月份代码(F=1月,G=2月,...,Z=12月)
- 到期年份(如2024年=4)
示例:CLZ4表示2024年12月到期的原油期货合约
2.2 核心API端点
- # 基础URL结构
- BASE_URL = "https://api.infoway.io/futures"
- # 主要端点
- ENDPOINTS = {
- "list": "/list", # 期货列表
- "quote": "/quote", # 实时行情
- "historical": "/historical", # 历史数据
- "kline": "/kline" # K线数据
- }
复制代码 三、Python实现期货数据获取
3.1 基础配置与认证
- import requests
- import pandas as pd
- import numpy as np
- import time
- from datetime import datetime, timedelta
- import json
- class FuturesAPI:
- def __init__(self, api_key, base_url="https://api.infoway.io/futures"):
- self.api_key = api_key
- self.base_url = base_url
- self.session = self._create_session()
-
- def _create_session(self):
- """创建带重试机制的会话"""
- session = requests.Session()
- retry_strategy = requests.packages.urllib3.util.retry.Retry(
- total=3,
- backoff_factor=0.3,
- status_forcelist=[429, 500, 502, 503, 504],
- )
- adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy)
- session.mount("http://", adapter)
- session.mount("https://", adapter)
- return session
-
- def _make_request(self, endpoint, params=None):
- """发起API请求"""
- url = f"{self.base_url}{endpoint}"
- headers = {
- "Authorization": f"Bearer {self.api_key}",
- "Content-Type": "application/json"
- }
-
- try:
- response = self.session.get(
- url,
- headers=headers,
- params=params,
- timeout=10
- )
- response.raise_for_status()
- return response.json()
- except requests.exceptions.RequestException as e:
- print(f"API请求失败: {e}")
- return None
复制代码 3.2 获取期货列表
- def get_futures_list(self, exchange=None, category=None):
- """
- 获取期货合约列表
-
- Args:
- exchange: 交易所代码(可选)
- category: 品种类别(可选)
- """
- params = {}
- if exchange:
- params["exchange"] = exchange
- if category:
- params["category"] = category
-
- data = self._make_request("/list", params)
- if data and data.get("code") == 200:
- return data.get("data", [])
- return []
- # 使用示例
- api = FuturesAPI("your_api_key")
- futures_list = api.get_futures_list(exchange="CME", category="energy")
- print(f"找到 {len(futures_list)} 个期货合约")
复制代码 3.3 获取实时行情
- def get_realtime_quotes(self, symbols):
- """
- 获取实时行情数据
-
- Args:
- symbols: 合约代码列表
- """
- if not symbols:
- return []
-
- if isinstance(symbols, str):
- symbols = [symbols]
-
- params = {"symbols": ",".join(symbols)}
- data = self._make_request("/quote", params)
-
- if data and data.get("code") == 200:
- return self._parse_quotes(data.get("data", []))
- return []
- def _parse_quotes(self, quotes_data):
- """解析行情数据"""
- parsed_data = []
- for item in quotes_data:
- parsed = {
- "symbol": item.get("symbol"),
- "name": item.get("name"),
- "last_price": float(item.get("last_price", 0)),
- "change": float(item.get("chg", 0)),
- "change_percent": float(item.get("chg_pct", 0)),
- "open": float(item.get("open_price", 0)),
- "high": float(item.get("high_price", 0)),
- "low": float(item.get("low_price", 0)),
- "prev_close": float(item.get("prev_price", 0)),
- "volume": int(item.get("volume", 0)),
- "timestamp": item.get("time"),
- "exchange": item.get("exchange")
- }
- parsed_data.append(parsed)
- return parsed_data
- # 使用示例
- quotes = api.get_realtime_quotes(["CLZ4", "GCZ4", "ESZ4"])
- for quote in quotes:
- print(f"{quote['symbol']}: {quote['last_price']} ({quote['change_percent']:.2f}%)")
复制代码 3.4 获取K线数据
- def get_kline_data(self, symbol, interval="1d", limit=100, start_time=None, end_time=None):
- """
- 获取K线数据
-
- Args:
- symbol: 合约代码
- interval: 时间间隔 (1m, 5m, 15m, 30m, 1h, 4h, 1d)
- limit: 数据条数
- start_time: 开始时间(时间戳)
- end_time: 结束时间(时间戳)
- """
- params = {
- "symbol": symbol,
- "interval": interval,
- "limit": limit
- }
-
- if start_time:
- params["startTime"] = start_time
- if end_time:
- params["endTime"] = end_time
-
- data = self._make_request("/kline", params)
-
- if data and data.get("code") == 200:
- return self._parse_kline(data.get("data", []))
- return []
- def _parse_kline(self, kline_data):
- """解析K线数据"""
- df_data = []
- for item in kline_data:
- df_data.append({
- "timestamp": item.get("timestamp"),
- "datetime": datetime.fromtimestamp(item.get("timestamp", 0)),
- "open": float(item.get("open", 0)),
- "high": float(item.get("high", 0)),
- "low": float(item.get("low", 0)),
- "close": float(item.get("close", 0)),
- "volume": float(item.get("volume", 0)),
- "turnover": float(item.get("turnover", 0))
- })
-
- return pd.DataFrame(df_data)
- # 使用示例
- kline_data = api.get_kline_data("CLZ4", interval="1h", limit=100)
- print(kline_data.head())
复制代码 四、WebSocket实时数据流
对于需要实时数据的应用,WebSocket是更好的选择:- import websockets
- import asyncio
- import json
- class FuturesWebSocketClient:
- def __init__(self, api_key):
- self.api_key = api_key
- self.ws_url = "wss://api.infoway.io/futures/ws"
- self.connected = False
- self.callbacks = []
-
- async def connect(self):
- """建立WebSocket连接"""
- try:
- self.connection = await websockets.connect(
- f"{self.ws_url}?apikey={self.api_key}"
- )
- self.connected = True
- print("WebSocket连接已建立")
-
- # 启动消息处理任务
- asyncio.create_task(self._message_handler())
-
- except Exception as e:
- print(f"连接失败: {e}")
-
- async def subscribe(self, symbols, data_type="quote"):
- """订阅期货数据"""
- if not self.connected:
- print("未建立连接")
- return False
-
- subscribe_msg = {
- "action": "subscribe",
- "symbols": symbols if isinstance(symbols, list) else [symbols],
- "type": data_type
- }
-
- try:
- await self.connection.send(json.dumps(subscribe_msg))
- print(f"已订阅: {symbols}")
- return True
- except Exception as e:
- print(f"订阅失败: {e}")
- return False
-
- async def _message_handler(self):
- """处理接收到的消息"""
- while self.connected:
- try:
- message = await self.connection.recv()
- data = json.loads(message)
- await self._process_message(data)
- except websockets.exceptions.ConnectionClosed:
- print("连接已关闭")
- break
- except Exception as e:
- print(f"处理消息错误: {e}")
-
- async def _process_message(self, data):
- """处理实时数据"""
- # 调用所有注册的回调函数
- for callback in self.callbacks:
- try:
- await callback(data)
- except Exception as e:
- print(f"回调函数执行错误: {e}")
-
- def add_callback(self, callback):
- """添加消息回调函数"""
- self.callbacks.append(callback)
-
- async def disconnect(self):
- """断开连接"""
- if self.connected:
- await self.connection.close()
- self.connected = False
- # 使用示例
- async def example_usage():
- client = FuturesWebSocketClient("your_api_key")
- await client.connect()
-
- # 添加数据处理回调
- async def handle_data(data):
- print(f"收到数据: {data}")
-
- client.add_callback(handle_data)
-
- # 订阅数据
- await client.subscribe(["CLZ4", "GCZ4"])
-
- # 保持连接
- try:
- await asyncio.Future() # 永久运行
- except KeyboardInterrupt:
- await client.disconnect()
- # 运行示例
- # asyncio.run(example_usage())
复制代码 五、数据处理与分析
5.1 数据清洗与转换
- class FuturesDataProcessor:
- @staticmethod
- def clean_data(df):
- """清洗期货数据"""
- # 去除空值
- df_clean = df.dropna()
-
- # 处理异常值
- for col in ['open', 'high', 'low', 'close']:
- q1 = df_clean[col].quantile(0.25)
- q3 = df_clean[col].quantile(0.75)
- iqr = q3 - q1
- lower_bound = q1 - 1.5 * iqr
- upper_bound = q3 + 1.5 * iqr
-
- df_clean = df_clean[
- (df_clean[col] >= lower_bound) &
- (df_clean[col] <= upper_bound)
- ]
-
- return df_clean
-
- @staticmethod
- def calculate_technical_indicators(df):
- """计算技术指标"""
- df = df.copy()
-
- # 移动平均线
- df['ma5'] = df['close'].rolling(window=5).mean()
- df['ma20'] = df['close'].rolling(window=20).mean()
-
- # 相对强弱指数(RSI)
- delta = df['close'].diff()
- gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
- loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
- rs = gain / loss
- df['rsi'] = 100 - (100 / (1 + rs))
-
- # 布林带
- df['bb_middle'] = df['close'].rolling(window=20).mean()
- bb_std = df['close'].rolling(window=20).std()
- df['bb_upper'] = df['bb_middle'] + 2 * bb_std
- df['bb_lower'] = df['bb_middle'] - 2 * bb_std
-
- return df
复制代码 5.2 数据可视化
- import matplotlib.pyplot as plt
- import seaborn as sns
- class FuturesVisualizer:
- @staticmethod
- def plot_price_with_indicators(df, symbol):
- """绘制价格和技术指标"""
- fig, axes = plt.subplots(3, 1, figsize=(12, 10))
-
- # 价格和移动平均线
- axes[0].plot(df['datetime'], df['close'], label='Close Price')
- axes[0].plot(df['datetime'], df['ma5'], label='5MA', alpha=0.7)
- axes[0].plot(df['datetime'], df['ma20'], label='20MA', alpha=0.7)
- axes[0].set_title(f'{symbol} Price and Moving Averages')
- axes[0].legend()
- axes[0].grid(True, alpha=0.3)
-
- # RSI
- axes[1].plot(df['datetime'], df['rsi'], label='RSI', color='orange')
- axes[1].axhline(70, linestyle='--', alpha=0.3, color='red')
- axes[1].axhline(30, linestyle='--', alpha=0.3, color='green')
- axes[1].set_title('RSI Indicator')
- axes[1].set_ylim(0, 100)
- axes[1].legend()
- axes[1].grid(True, alpha=0.3)
-
- # 成交量
- axes[2].bar(df['datetime'], df['volume'], alpha=0.7, color='purple')
- axes[2].set_title('Volume')
- axes[2].grid(True, alpha=0.3)
-
- plt.tight_layout()
- plt.savefig(f'{symbol}_analysis.png', dpi=300, bbox_inches='tight')
- plt.show()
-
- @staticmethod
- def plot_correlation_matrix(symbols_data):
- """绘制相关性矩阵"""
- closes = pd.DataFrame()
- for symbol, df in symbols_data.items():
- closes[symbol] = df['close']
-
- correlation = closes.corr()
-
- plt.figure(figsize=(10, 8))
- sns.heatmap(correlation, annot=True, cmap='coolwarm', center=0)
- plt.title('Futures Correlation Matrix')
- plt.tight_layout()
- plt.savefig('futures_correlation.png', dpi=300, bbox_inches='tight')
- plt.show()
- # 使用示例
- processor = FuturesDataProcessor()
- visualizer = FuturesVisualizer()
- # 数据处理
- cleaned_data = processor.clean_data(kline_data)
- indicators_data = processor.calculate_technical_indicators(cleaned_data)
- # 可视化
- visualizer.plot_price_with_indicators(indicators_data, "CLZ4")
复制代码 六、实战应用场景
6.1 期货价格监控系统
- class FuturesMonitor:
- def __init__(self, api_client, alert_rules):
- self.api_client = api_client
- self.alert_rules = alert_rules
- self.price_history = {}
-
- async def start_monitoring(self, symbols, interval=60):
- """启动监控"""
- print("启动期货价格监控...")
-
- while True:
- try:
- quotes = self.api_client.get_realtime_quotes(symbols)
- for quote in quotes:
- await self._check_alerts(quote)
-
- # 记录历史价格
- for quote in quotes:
- symbol = quote['symbol']
- if symbol not in self.price_history:
- self.price_history[symbol] = []
- self.price_history[symbol].append({
- 'timestamp': datetime.now(),
- 'price': quote['last_price']
- })
-
- # 保持最近100条记录
- for symbol in self.price_history:
- if len(self.price_history[symbol]) > 100:
- self.price_history[symbol] = self.price_history[symbol][-100:]
-
- await asyncio.sleep(interval)
-
- except Exception as e:
- print(f"监控错误: {e}")
- await asyncio.sleep(5) # 错误后等待5秒再重试
-
- async def _check_alerts(self, quote):
- """检查警报条件"""
- symbol = quote['symbol']
-
- if symbol in self.alert_rules:
- rules = self.alert_rules[symbol]
- current_price = quote['last_price']
-
- # 检查价格突破
- if 'price_breakout' in rules:
- breakout_level = rules['price_breakout']
- if current_price >= breakout_level['upper']:
- await self._trigger_alert(
- symbol,
- f"价格突破上限: {current_price} >= {breakout_level['upper']}",
- "high"
- )
- elif current_price <= breakout_level['lower']:
- await self._trigger_alert(
- symbol,
- f"价格突破下限: {current_price} <= {breakout_level['lower']}",
- "low"
- )
-
- # 检查涨跌幅
- if 'change_alert' in rules:
- change_percent = abs(quote['change_percent'])
- if change_percent >= rules['change_alert']:
- await self._trigger_alert(
- symbol,
- f"大幅波动: {change_percent:.2f}%",
- "volatility"
- )
-
- async def _trigger_alert(self, symbol, message, alert_type):
- """触发警报"""
- timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- alert_msg = f"[{timestamp}] {symbol} {message}"
-
- print(f"ALERT: {alert_msg}")
-
- # 这里可以集成邮件、短信等通知方式
- # await self._send_email_alert(alert_msg)
- # await self._send_sms_alert(alert_msg)
- # 使用示例
- alert_rules = {
- "CLZ4": {
- "price_breakout": {
- "upper": 80.00,
- "lower": 75.00
- },
- "change_alert": 2.0 # 2%
- },
- "GCZ4": {
- "price_breakout": {
- "upper": 2000.00,
- "lower": 1950.00
- },
- "change_alert": 1.5 # 1.5%
- }
- }
- monitor = FuturesMonitor(api, alert_rules)
- # asyncio.run(monitor.start_monitoring(["CLZ4", "GCZ4"]))
复制代码 6.2 简单的趋势跟踪策略
- class TrendFollowingStrategy:
- def __init__(self, api_client, symbols):
- self.api_client = api_client
- self.symbols = symbols
- self.positions = {}
-
- async def run_strategy(self):
- """运行趋势跟踪策略"""
- print("启动趋势跟踪策略...")
-
- while True:
- try:
- for symbol in self.symbols:
- # 获取历史数据计算指标
- data = self.api_client.get_kline_data(symbol, "1h", 50)
- if len(data) < 20: # 确保有足够的数据
- continue
-
- # 计算技术指标
- data = FuturesDataProcessor.calculate_technical_indicators(data)
-
- # 生成交易信号
- signal = self._generate_signal(data, symbol)
-
- if signal != "hold":
- await self._execute_trade(symbol, signal, data.iloc[-1]['close'])
-
- await asyncio.sleep(3600) # 每小时检查一次
-
- except Exception as e:
- print(f"策略执行错误: {e}")
- await asyncio.sleep(300) # 错误后等待5分钟
-
- def _generate_signal(self, data, symbol):
- """生成交易信号"""
- current_close = data.iloc[-1]['close']
- ma20 = data.iloc[-1]['ma20']
- ma5 = data.iloc[-1]['ma5']
- rsi = data.iloc[-1]['rsi']
-
- # 简单的趋势跟踪逻辑
- if ma5 > ma20 and rsi < 70: # 上升趋势且不过热
- return "buy"
- elif ma5 < ma20 and rsi > 30: # 下降趋势且不超卖
- return "sell"
- else:
- return "hold"
-
- async def _execute_trade(self, symbol, signal, price):
- """执行交易"""
- # 这里只是示例,实际交易需要连接交易API
- print(f"{datetime.now()} - {signal.upper()} {symbol} @ {price:.2f}")
-
- # 更新持仓
- if signal == "buy":
- self.positions[symbol] = {
- "entry_price": price,
- "entry_time": datetime.now(),
- "direction": "long"
- }
- elif signal == "sell" and symbol in self.positions:
- position = self.positions[symbol]
- pnl = price - position["entry_price"] if position["direction"] == "long" else position["entry_price"] - price
- print(f"平仓盈亏: {pnl:.2f}")
- del self.positions[symbol]
- # 使用示例
- strategy = TrendFollowingStrategy(api, ["CLZ4", "GCZ4"])
- # asyncio.run(strategy.run_strategy())
复制代码 七、注意事项与最佳实践
7.1 错误处理与重试机制
- def robust_api_call(func):
- """API调用重试装饰器"""
- def wrapper(*args, **kwargs):
- max_retries = 3
- retry_delay = 1
-
- for attempt in range(max_retries):
- try:
- return func(*args, **kwargs)
- except requests.exceptions.ConnectionError as e:
- if attempt == max_retries - 1:
- raise e
- print(f"连接错误,{retry_delay}秒后重试...")
- time.sleep(retry_delay)
- retry_delay *= 2 # 指数退避
- except requests.exceptions.Timeout as e:
- if attempt == max_retries - 1:
- raise e
- print(f"请求超时,{retry_delay}秒后重试...")
- time.sleep(retry_delay)
- except Exception as e:
- print(f"API调用错误: {e}")
- raise e
- return wrapper
复制代码 7.2 数据缓存策略
- from functools import lru_cache
- from datetime import datetime, timedelta
- class DataCache:
- def __init__(self, ttl=300): # 默认5分钟缓存
- self.cache = {}
- self.ttl = ttl
-
- @lru_cache(maxsize=128)
- def get_cached_data(self, key, data_func, *args, **kwargs):
- """带缓存的数据获取"""
- current_time = datetime.now()
-
- if key in self.cache:
- data, timestamp = self.cache[key]
- if (current_time - timestamp).total_seconds() < self.ttl:
- return data
-
- # 缓存不存在或已过期
- new_data = data_func(*args, **kwargs)
- if new_data is not None:
- self.cache[key] = (new_data, current_time)
-
- return new_data
- # 使用示例
- cache = DataCache(ttl=300) # 5分钟缓存
- # 带缓存的API调用
- cached_data = cache.get_cached_data(
- "CLZ4_1h_100",
- api.get_kline_data,
- "CLZ4", "1h", 100
- )
复制代码 八、总结
本文详细介绍了期货市场API的对接方法,涵盖了从基础的数据获取到高级的应用场景。通过合理的错误处理、数据缓存和实时监控,可以构建稳定可靠的期货数据应用系统。
关键要点:
- 选择合适的API提供商:根据需求选择功能、成本和稳定性合适的API服务
- 实现健壮的错误处理:网络不稳定是常态,必须要有完善的重试机制
- 合理使用缓存:对不经常变化的数据实施缓存,减少API调用次数
- 实时监控与警报:对于交易应用,实时监控和及时警报至关重要
- 数据处理与分析:原始数据需要经过清洗和转换才能用于分析和决策
期货市场数据具有高度的实时性和复杂性,在实际应用中需要根据具体需求不断完善和优化系统架构。
提示:本文示例代码仅供参考,实际使用时请替换为有效的API密钥,并遵守API提供商的使用条款。期货交易有风险,请谨慎决策。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |