iTick 港美股实时行情API结合富途牛牛交易接口实现个人量化交易——高频股票API - iTick

在当今数字化金融时代,个人投资者通过 API 接口实现量化交易已成为可能。本文将详细介绍如何利用 iTick 提供的港美股实时行情 API,结合富途牛牛(Futu OpenAPI)的交易接口,构建一个完整的个人量化交易系统。我们将从系统架构设计开始,逐步深入到具体实现,最后提供完整的 Python 代码和回测策略。

一、系统架构设计

本系统采用模块化设计,各组件之间松耦合,便于单独扩展和维护。以下是完整的系统架构:

二、技术选型与准备

1. 行情数据源 - iTick API

iTick 提供稳定可靠的港美股实时行情数据,具有以下特点:

  • 低延迟:毫秒级行情推送
  • 全面覆盖:港股、美股、A 股(通过港股通)等全球市场
  • 丰富数据类型:tick 数据、分钟线、日线等
  • 稳定可靠:99.9%的可用性保证

2. 交易接口 - 富途牛牛 Futu OpenAPI

富途牛牛开放平台提供完善的交易 API:

  • 支持港股、美股、A 股(港股通)交易
  • 提供模拟交易环境
  • 完善的文档和社区支持
  • 相对较高的交易执行速度

3. 开发环境准备

      # 所需Python库
import requests  # 用于API调用
import pandas as pd  # 数据处理
import numpy as np  # 数值计算
from futu import *  # 富途OpenAPI
import matplotlib.pyplot as plt  # 可视化
import sqlite3  # 本地数据存储
import time  # 时间控制
import threading  # 多线程处理

    

三、系统核心模块实现

1. 行情数据获取模块

      class ITickDataFetcher:

     def __init__(self):
            self.headers = {
             "accept": "application/json",
             "token": ITICK_API_KEY
            }

    def get_realtime_data(self,symbol,region):
        """获取实时行情数据"""
       url = f"https://api.itick.org/stock/quote?symbol={symbol}&region={region}"
        try:

            response = requests.get(url, headers=self.headers)
            data = response.json()
            return self._parse_data(data)
        except Exception as e:
            print(f"获取实时数据失败: {str(e)}")
            return None

    def get_historical_data(self,symbol, region, freq='1'):
        """获取历史数据"""
            url = f"https://api.itick.org/stock/kline?symbol={symbol}&region={region}&kType={freq}"
            try:
                response = requests.get(url, headers=self.headers)
                data = response.json()
                df = self._parse_historical_data(data)
            except Exception as e:
                print(f"获取{symbol}历史数据失败: {str(e)}")
        return None

    def _parse_data(self, raw_data):
        """解析实时数据"""
        # 实现数据解析逻辑
        pass

    def _parse_historical_data(self, raw_data):
        """解析历史数据"""
        # 实现历史数据解析逻辑
        pass

    

2. 数据预处理模块

      class DataProcessor:
    def __init__(self):
        self.conn = sqlite3.connect('quant_trading.db')

    def clean_data(self, df):
        """数据清洗"""
        # 处理缺失值
        df = df.dropna()
        # 去除异常值
        df = df[(df['v'] > 0) & (df['c'] > 0)]
        return df

    def calculate_technical_indicators(self, df):
        """计算技术指标"""
        # 移动平均线
        df['ma5'] = df['c'].rolling(5).mean()
        df['ma20'] = df['c'].rolling(20).mean()

        # RSI
        delta = df['c'].diff()
        gain = (delta.where(delta > 0, 0)).rolling(14).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
        rs = gain / loss
        df['rsi'] = 100 - (100 / (1 + rs))

        # MACD
        exp12 = df['c'].ewm(span=12, adjust=False).mean()
        exp26 = df['c'].ewm(span=26, adjust=False).mean()
        df['macd'] = exp12 - exp26
        df['signal'] = df['macd'].ewm(span=9, adjust=False).mean()

        return df

    def save_to_db(self, df, table_name):
        """保存数据到数据库"""
        df.to_sql(table_name, self.conn, if_exists='append', index=False)

    def load_from_db(self, table_name):
        """从数据库加载数据"""
        return pd.read_sql(f"SELECT * FROM {table_name}", self.conn)

    

3. 策略引擎

      class DualMovingAverageStrategy:
    def __init__(self, fast_window=5, slow_window=20):
        self.fast_window = fast_window
        self.slow_window = slow_window
        self.position = 0  # 0:无持仓, 1:多头, -1:空头
        self.signals = []

    def generate_signals(self, df):
        """生成交易信号"""
        signals = pd.DataFrame(index=df.index)
        signals['price'] = df['c']
        signals['fast_ma'] = df['c'].rolling(self.fast_window).mean()
        signals['slow_ma'] = df['c'].rolling(self.slow_window).mean()

        # 生成交易信号
        signals['signal'] = 0
        signals['signal'][self.fast_window:] = np.where(
            signals['fast_ma'][self.fast_window:] > signals['slow_ma'][self.fast_window:], 1, 0)

        # 计算实际的买卖信号
        signals['positions'] = signals['signal'].diff()

        return signals

    def on_tick(self, tick_data):
        """实时行情处理"""
        # 实现实时行情处理逻辑
        pass

    

4. 交易执行模块

      class FutuTrader:
    def __init__(self, host='127.0.0.1', port=11111):
        self.quote_ctx = OpenQuoteContext(host=host, port=port)
        self.trade_ctx = OpenTradeContext(host=host, port=port)

    def get_account_info(self):
        """获取账户信息"""
        ret, data = self.trade_ctx.accinfo_query()
        if ret == RET_OK:
            return data
        else:
            print(f"获取账户信息失败: {data}")
            return None

    def place_order(self, symbol, price, quantity, trd_side, order_type=OrderType.NORMAL):
        """下单"""
        ret, data = self.trade_ctx.place_order(
            price=price,
            qty=quantity,
            code=symbol,
            trd_side=trd_side,
            order_type=order_type,
            trd_env=TrdEnv.SIMULATE  # 模拟交易,实盘改为TrdEnv.REAL
        )
        if ret == RET_OK:
            print(f"下单成功: {data}")
            return data
        else:
            print(f"下单失败: {data}")
            return None

    def close_all_positions(self):
        """平掉所有仓位"""
        ret, data = self.trade_ctx.position_list_query()
        if ret == RET_OK:
            for _, row in data.iterrows():
                if row['qty'] > 0:
                    self.place_order(
                        row['code'],
                        row['cost_price'],
                        row['qty'],
                        TrdSide.SELL
                    )
        else:
            print(f"获取持仓失败: {data}")

    def subscribe(self, symbols, subtype_list=[SubType.QUOTE]):
        """订阅行情"""
        ret, data = self.quote_ctx.subscribe(symbols, subtype_list)
        if ret != RET_OK:
            print(f"订阅行情失败: {data}")

    def unsubscribe(self, symbols, subtype_list=[SubType.QUOTE]):
        """取消订阅"""
        ret, data = self.quote_ctx.unsubscribe(symbols, subtype_list)
        if ret != RET_OK:
            print(f"取消订阅失败: {data}")

    def __del__(self):
        self.quote_ctx.close()
        self.trade_ctx.close()

    

四、回测系统实现

      class BacktestEngine:
    def __init__(self, initial_capital=100000):
        self.initial_capital = initial_capital
        self.results = None

    def run_backtest(self, strategy, data):
        """运行回测"""
        signals = strategy.generate_signals(data)

        # 初始化投资组合
        portfolio = pd.DataFrame(index=signals.index)
        portfolio['signal'] = signals['positions']
        portfolio['price'] = signals['price']
        portfolio['cash'] = self.initial_capital
        portfolio['shares'] = 0
        portfolio['total'] = self.initial_capital

        # 执行交易
        for i in range(1, len(portfolio)):
            # 买入信号
            if portfolio['signal'][i] == 1:
                shares_to_buy = portfolio['cash'][i-1] // portfolio['price'][i]
                portfolio['shares'][i] = portfolio['shares'][i-1] + shares_to_buy
                portfolio['cash'][i] = portfolio['cash'][i-1] - shares_to_buy * portfolio['price'][i]

            # 卖出信号
            elif portfolio['signal'][i] == -1:
                portfolio['cash'][i] = portfolio['cash'][i-1] + portfolio['shares'][i-1] * portfolio['price'][i]
                portfolio['shares'][i] = 0

            # 无信号
            else:
                portfolio['shares'][i] = portfolio['shares'][i-1]
                portfolio['cash'][i] = portfolio['cash'][i-1]

            # 更新总资产
            portfolio['total'][i] = portfolio['cash'][i] + portfolio['shares'][i] * portfolio['price'][i]

        self.results = portfolio
        return portfolio

    def analyze_results(self):
        """分析回测结果"""
        if self.results is None:
            print("请先运行回测")
            return None

        returns = self.results['total'].pct_change()
        cumulative_returns = (returns + 1).cumprod()

        # 计算年化收益率
        total_days = len(self.results)
        annualized_return = (cumulative_returns.iloc[-1] ** (252/total_days)) - 1

        # 计算最大回撤
        max_drawdown = (self.results['total'].cummax() - self.results['total']).max()

        # 计算夏普比率
        sharpe_ratio = returns.mean() / returns.std() * np.sqrt(252)

        # 可视化
        plt.figure(figsize=(12, 6))
        plt.plot(cumulative_returns)
        plt.title("Cumulative Returns")
        plt.xlabel("Date")
        plt.ylabel("Returns")
        plt.grid(True)
        plt.show()

        return {
            'annualized_return': annualized_return,
            'max_drawdown': max_drawdown,
            'sharpe_ratio': sharpe_ratio,
            'final_value': self.results['total'].iloc[-1]
        }

    

五、总结

本文详细介绍了如何利用iTick港美股实时行情API和富途牛牛交易接口构建个人量化交易系统。系统包含以下核心组件:

  • 行情数据获取模块:从iTick API获取实时和历史数据
  • 数据预处理模块:清洗数据并计算技术指标
  • 策略引擎:实现双均线交易策略
  • 交易执行模块:通过富途OpenAPI执行交易
  • 回测系统:评估策略历史表现