行情示例

获取行情示例

# 用前须知

## xtdata提供和MiniQmt的交互接口,本质是和MiniQmt建立连接,由MiniQmt处理行情数据请求,再把结果回传返回到python层。使用的行情服务器以及能获取到的行情数据和MiniQmt是一致的,要检查数据或者切换连接时直接操作MiniQmt即可。

## 对于数据获取接口,使用时需要先确保MiniQmt已有所需要的数据,如果不足可以通过补充数据接口补充,再调用数据获取接口获取。

## 对于订阅接口,直接设置数据回调,数据到来时会由回调返回。订阅接收到的数据一般会保存下来,同种数据不需要再单独补充。

# 代码讲解

# 从本地python导入xtquant库,如果出现报错则说明安装失败
from xtquant import xtdata
import time

# 设定一个标的列表
code_list = ["000001.SZ"]
# 设定获取数据的周期
period = "1d"

# 下载标的行情数据
if 1:
    ## 为了方便用户进行数据管理,xtquant的大部分历史数据都是以压缩形式存储在本地的
    ## 比如行情数据,需要通过download_history_data下载,财务数据需要通过
    ## 所以在取历史数据之前,我们需要调用数据下载接口,将数据下载到本地
    for i in code_list:
        xtdata.download_history_data(i,period=period,incrementally=True) # 增量下载行情数据(开高低收,等等)到本地
    
    xtdata.download_financial_data(code_list) # 下载财务数据到本地
    xtdata.download_sector_data() # 下载板块数据到本地
    # 更多数据的下载方式可以通过数据字典查询

# 读取本地历史行情数据
history_data = xtdata.get_market_data_ex([],code_list,period=period,count=-1)
print(history_data)
print("=" * 20)

# 如果需要盘中的实时行情,需要向服务器进行订阅后才能获取
# 订阅后,get_market_data函数于get_market_data_ex函数将会自动拼接本地历史行情与服务器实时行情

# 向服务器订阅数据
for i in code_list:
    xtdata.subscribe_quote(i,period=period,count=-1) # 设置count = -1来取到当天所有实时行情

# 等待订阅完成
time.sleep(1)

# 获取订阅后的行情
kline_data = xtdata.get_market_data_ex([],code_list,period=period)
print(kline_data)

# 获取订阅后的行情,并以固定间隔进行刷新,预期会循环打印10次
for i in range(10):
    # 这边做演示,就用for来循环了,实际使用中可以用while True
    kline_data = xtdata.get_market_data_ex([],code_list,period=period)
    print(kline_data)
    time.sleep(3) # 三秒后再次获取行情

# 如果不想用固定间隔触发,可以以用订阅后的回调来执行
# 这种模式下当订阅的callback回调函数将会异步的执行,每当订阅的标的tick发生变化更新,callback回调函数就会被调用一次
# 本地已有的数据不会触发callback
    
# 定义的回测函数
    ## 回调函数中,data是本次触发回调的数据,只有一条
def f(data):
    # print(data)
    
    code_list = list(data.keys())    # 获取到本次触发的标的代码

    kline_in_callabck = xtdata.get_market_data_ex([],code_list,period = period)    # 在回调中获取klines数据
    print(kline_in_callabck)

for i in code_list:
    xtdata.subscribe_quote(i,period=period,count=-1,callback=f) # 订阅时设定回调函数

# 使用回调时,必须要同时使用xtdata.run()来阻塞程序,否则程序运行到最后一行就直接结束退出了。
xtdata.run()



链接VIP服务器

# 导入 xtdatacenter 模块
import sys

print("Python 版本:", sys.version)


import time
import pandas as pd
from xtquant import xtdatacenter as xtdc
from xtquant import xtdata
'''  
设置用于登录行情服务的token,此接口应该先于 init_quote 调用
'''
xtdc.set_token('')

'''
设置连接池,使服务器只在连接池内优选

建议将VIP服务器设为连接池
'''
addr_list = [
    '115.231.218.73:55310', 
    '115.231.218.79:55310', 
    '218.16.123.11:55310', 
    '218.16.123.27:55310'
    ]
xtdc.set_allow_optmize_address(addr_list)

xtdc.set_kline_mirror_enabled(True) # 开启K线全推功能(vip),以获取全市场实时K线数据


"""
初始化
"""
xtdc.init()
## 监听端口
port = xtdc.listen(port = 58621) # 指定固定端口进行连接
# port = xtdc.listen(port = (58620, 58630)) 通过指定port范围,可以让xtdc在范围内自动寻找可用端口

xtdata.connect(port=port)

print('-----连接上了------')
print(xtdata.data_dir)



servers = xtdata.get_quote_server_status()
# print(servers)
for k, v in servers.items():
    print(k, v)

xtdata.run()

连接指定服务器


import time
from xtquant import xtdata

#用token方式连接,不需要账号密码
#其他连接方式,需要账号密码
info = {"ip": "218.16.123.122", "port": 55300, "username": '', "pwd": ''}

connect_success = 0
def func(d):
    ip = d.get('ip', '')
    port = d.get('port')
    status = d.get('status', 'disconnected')

    global connect_success
    if ip == info['ip'] and port == info['port']:
        if status == 'connected':
            connect_success = 1
        else:
            connect_success = 2

# 注册连接回调信息
xtdata.watch_quote_server_status(func)

# 行情连接
qs = xtdata.QuoteServer(info)
qs.connect()

# 等待连接状态
while connect_success == 0:
    time.sleep(0.3)

if connect_success == 2:
    print("连接失败")

订阅全推数据/下载历史数据


# coding:utf-8
import time

from xtquant import xtdata

code = '600000.SH'

#取全推数据
full_tick = xtdata.get_full_tick([code])
print('全推数据 日线最新值', full_tick)

#下载历史数据 下载接口本身不返回数据
xtdata.download_history_data(code, period='1m', start_time='20230701')

#订阅最新行情
def callback_func(data):
    print('回调触发', data)

xtdata.subscribe_quote(code, period='1m', count=-1, callback= callback_func)
data = xtdata.get_market_data(['close'], [code], period='1m', start_time='20230701')
print('一次性取数据', data)

#死循环 阻塞主线程退出
xtdata.run()

获取对手价

# 以卖出为例

import pandas as pd
import numpy as np
from xtquant import xtdata

to_do_trade_list = ["000001.SZ"]
tick = xtdata.get_full_tick(to_do_trade_list)


# 取买一价为对手价,若买一价为0,说明已经跌停,则取最新价
for i in tick:
    fix_price = tick[i]["bidPrice"][0] if tick[i]["bidPrice"][0] != 0 else tick[i]["lastPrice"]
    print(fix_price)

获取5档盘口行情

提示

  1. 该数据为VIP数据在新窗口打开
from xtquant import xtdata
import time

symbol_list = ["rb2405.SF","ec2404.INE"] # 五档行情支持上期所,上期能源

period = "l2quote" # 获取5档盘口tick

for symbol in symbol_list:
    xtdata.subscribe_quote(symbol,period = period,count=-1)
time.sleep(1)

data = xtdata.get_market_data_ex(["askPrice","bidPrice"],symbol_list,period = period,count=-1)

print(data)

复权计算方式

#coding:utf-8

import numpy as np
import pandas as pd

from xtquant import xtdata

#def gen_divid_ratio(quote_datas, divid_datas):
#    drl = []
#    for qi in range(len(quote_datas)):
#        q = quote_datas.iloc[qi]
#        dr = 1.0
#        for di in range(len(divid_datas)):
#            d = divid_datas.iloc[di]
#            if d.name <= q.name:
#                dr *= d['dr']
#        drl.append(dr)
#    return pd.DataFrame(drl, index = quote_datas.index, columns = quote_datas.columns)

def gen_divid_ratio(quote_datas, divid_datas):
    drl = []
    dr = 1.0
    qi = 0
    qdl = len(quote_datas)
    di = 0
    ddl = len(divid_datas)
    while qi < qdl and di < ddl:
        qd = quote_datas.iloc[qi]
        dd = divid_datas.iloc[di]
        if qd.name >= dd.name:
            dr *= dd['dr']
            di += 1
        if qd.name <= dd.name:
            drl.append(dr)
            qi += 1
    while qi < qdl:
        drl.append(dr)
        qi += 1
    return pd.DataFrame(drl, index = quote_datas.index, columns = quote_datas.columns)

def process_forward_ratio(quote_datas, divid_datas):
    drl = gen_divid_ratio(quote_datas, divid_datas)
    drlf = drl / drl.iloc[-1]
    result = (quote_datas * drlf).apply(lambda x: round(x, 2))
    return result

def process_backward_ratio(quote_datas, divid_datas):
    drl = gen_divid_ratio(quote_datas, divid_datas)
    result = (quote_datas * drl).apply(lambda x: round(x, 2))
    return result

def process_forward(quote_datas1, divid_datas):
    quote_datas = quote_datas1.copy()
    def calc_front(v, d):
        return round(
            (v - d['interest'] + d['allotPrice'] * d['allotNum'])
            / (1 + d['allotNum'] + d['stockBonus'] + d['stockGift'])
            , 2
        )
    for qi in range(len(quote_datas)):
        q = quote_datas.iloc[qi]
        for di in range(len(divid_datas)):
            d = divid_datas.iloc[di]
            if d.name <= q.name:
                continue
            q.iloc[0] = calc_front(q.iloc[0], d)
    return quote_datas

def process_backward(quote_datas1, divid_datas):
    quote_datas = quote_datas1.copy()
    def calc_front(v, d):
        return round(
            (v * (1 + d['stockGift'] + d['stockBonus'] + d['allotNum'])
            + d['interest'] - d['allotNum'] * d['allotPrice'])
            , 2
        )
    for qi in range(len(quote_datas)):
        q = quote_datas.iloc[qi]
        for di in range(len(divid_datas)):
            d = divid_datas.iloc[di]
            if d.name > q.name:
                continue
            q.iloc[0] = calc_front(q.iloc[0], d)
    return quote_datas


#--------------------------------

s = '002594.SZ'

#xtdata.download_history_data(s, '1d', '20100101', '')

dd = xtdata.get_divid_factors(s)
print(dd)

#复权计算用于处理价格字段
field_list = ['open', 'high', 'low', 'close']
datas_ori = xtdata.get_market_data(field_list, [s], '1d', dividend_type = 'none')['close'].T
#print(datas_ori)

#等比前复权
datas_forward_ratio = process_forward_ratio(datas_ori, dd)
print('datas_forward_ratio', datas_forward_ratio)

#等比后复权
datas_backward_ratio = process_backward_ratio(datas_ori, dd)
print('datas_backward_ratio', datas_backward_ratio)

#前复权
datas_forward = process_forward(datas_ori, dd)
print('datas_forward', datas_forward)

#后复权
datas_backward = process_backward(datas_ori, dd)
print('datas_backward', datas_backward)



根据商品期货期权代码获取对应的商品期货合约代码

from xtquant import xtdata

def get_option_underline_code(code:str) -> str:
    """
    注意:该函数不适用于股指期货期权与ETF期权
    Todo: 根据商品期权代码获取对应的具体商品期货合约
    Args:
        code:str 期权代码
    Return:
        对应的期货合约代码
    """
    Exchange_dict = {
        "SHFE":"SF",
        "CZCE":"ZF",
        "DCE":"DF",
        "INE":"INE",
        "GFEX":"GF"
    }
    
    if code.split(".")[-1] not in [v for k,v in Exchange_dict.items()]:
        raise KeyError("此函数不支持该交易所合约")
    info = xtdata.get_option_detail_data(code)
    underline_code = info["OptUndlCode"] + "." + Exchange_dict[info["OptUndlMarket"]]

    return underline_code

if __name__ == "__main__":

    symbol_code = get_option_underline_code('sc2403C465.INE') # 获取期权合约'sc2403C465.INE'对应的期货合约代码
    print(symbol_code)

根据指数代码,返回对应的期货合约


from xtquant import xtdata
import re

def get_financial_futures_code_from_index(index_code:str) -> list:
    """
    ToDo:传入指数代码,返回对应的期货合约(当前)
    Args:
        index_code:指数代码,如"000300.SH","000905.SH"
    Retuen:
        list: 对应期货合约列表
    """
    financial_futures = xtdata.get_stock_list_in_sector("中金所")
    future_list = []
    pattern = r'^[a-zA-Z]{1,2}\d{3,4}\.[A-Z]{2}$'
    for i in financial_futures:
        
        if re.match(pattern,i):
            future_list.append(i)
    ls = []
    for i in future_list:
        _info = xtdata._get_instrument_detail(i)
        _index_code = _info["ExtendInfo"]['OptUndlCode'] + "." + _info["ExtendInfo"]['OptUndlMarket']
        if _index_code == index_code:
            ls.append(i)
    return ls

if __name__ == "__main__":
    ls = get_financial_futures_code_from_index("000905.SH")
    print(ls)

交易示例

简单买卖各一笔示例

需要调整的参数:

  • 98行的path变量需要改为本地客户端路径,券商端指定到 f"{安装目录}\userdata_mini",投研端指定到f"{安装目录}\userdata"
  • 107行的资金账号需要调整为自身资金账号
# coding:utf-8
import time, datetime, traceback, sys
from xtquant import xtdata
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant import xtconstant


# 定义一个类 创建类的实例 作为状态的容器
class _a():
    pass


A = _a()
A.bought_list = []
A.hsa = xtdata.get_stock_list_in_sector('沪深A股')


def interact():
    """执行后进入repl模式"""
    import code
    code.InteractiveConsole(locals=globals()).interact()


xtdata.download_sector_data()



class MyXtQuantTraderCallback(XtQuantTraderCallback):
    def on_disconnected(self):
        """
        连接断开
        :return:
        """
        print(datetime.datetime.now(), '连接断开回调')

    def on_stock_order(self, order):
        """
        委托回报推送
        :param order: XtOrder对象
        :return:
        """
        print(datetime.datetime.now(), '委托回调 投资备注', order.order_remark)

    def on_stock_trade(self, trade):
        """
        成交变动推送
        :param trade: XtTrade对象
        :return:
        """
        print(datetime.datetime.now(), '成交回调', trade.order_remark, f"委托方向(48买 49卖) {trade.offset_flag} 成交价格 {trade.traded_price} 成交数量 {trade.traded_volume}")

    def on_order_error(self, order_error):
        """
        委托失败推送
        :param order_error:XtOrderError 对象
        :return:
        """
        # print("on order_error callback")
        # print(order_error.order_id, order_error.error_id, order_error.error_msg)
        print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}")

    def on_cancel_error(self, cancel_error):
        """
        撤单失败推送
        :param cancel_error: XtCancelError 对象
        :return:
        """
        print(datetime.datetime.now(), sys._getframe().f_code.co_name)

    def on_order_stock_async_response(self, response):
        """
        异步下单回报推送
        :param response: XtOrderResponse 对象
        :return:
        """
        print(f"异步委托回调 投资备注: {response.order_remark}")

    def on_cancel_order_stock_async_response(self, response):
        """
        :param response: XtCancelOrderResponse 对象
        :return:
        """
        print(datetime.datetime.now(), sys._getframe().f_code.co_name)

    def on_account_status(self, status):
        """
        :param response: XtAccountStatus 对象
        :return:
        """
        print(datetime.datetime.now(), sys._getframe().f_code.co_name)


if __name__ == '__main__':
    print("start")
    # 指定客户端所在路径, 券商端指定到 userdata_mini文件夹
    # 注意:如果是连接投研端进行交易,文件目录需要指定到f"{安装目录}\userdata"
    path = r'D:\qmt\投\迅投极速交易终端睿智融科版\userdata'
    # 生成session id 整数类型 同时运行的策略不能重复
    session_id = int(time.time())
    xt_trader = XtQuantTrader(path, session_id)
    # 开启主动请求接口的专用线程 开启后在on_stock_xxx回调函数里调用XtQuantTrader.query_xxx函数不会卡住回调线程,但是查询和推送的数据在时序上会变得不确定
    # 详见: http://docs.thinktrader.net/vip/pages/ee0e9b/#开启主动请求接口的专用线程
    # xt_trader.set_relaxed_response_order_enabled(True)

    # 创建资金账号为 800068 的证券账号对象 股票账号为STOCK 信用CREDIT 期货FUTURE
    acc = StockAccount('2000128', 'STOCK')
    # 创建交易回调类对象,并声明接收回调
    callback = MyXtQuantTraderCallback()
    xt_trader.register_callback(callback)
    # 启动交易线程
    xt_trader.start()
    # 建立交易连接,返回0表示连接成功
    connect_result = xt_trader.connect()
    print('建立交易连接,返回0表示连接成功', connect_result)
    # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
    subscribe_result = xt_trader.subscribe(acc)
    print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)
    #取账号信息
    account_info = xt_trader.query_stock_asset(acc)
    #取可用资金
    available_cash = account_info.m_dCash

    print(acc.account_id, '可用资金', available_cash)
    #查账号持仓
    positions = xt_trader.query_stock_positions(acc)
    #取各品种 总持仓 可用持仓
    position_total_dict = {i.stock_code : i.m_nVolume for i in positions}
    position_available_dict = {i.stock_code : i.m_nCanUseVolume for i in positions}
    print(acc.account_id, '持仓字典', position_total_dict)
    print(acc.account_id, '可用持仓字典', position_available_dict)

    #买入 浦发银行 最新价 两万元
    stock = '600000.SH'
    target_amount = 20000
    full_tick = xtdata.get_full_tick([stock])
    print(f"{stock} 全推行情: {full_tick}")
    current_price = full_tick[stock]['lastPrice']
    #买入金额 取目标金额 与 可用金额中较小的
    buy_amount = min(target_amount, available_cash)
    #买入数量 取整为100的整数倍
    buy_vol = int(buy_amount / current_price / 100) * 100
    print(f"当前可用资金 {available_cash} 目标买入金额 {target_amount} 买入股数 {buy_vol}股")
    async_seq = xt_trader.order_stock_async(acc, stock, xtconstant.STOCK_BUY, buy_vol, xtconstant.FIX_PRICE, current_price,
                                            'strategy_name', stock)

    #卖出 500股
    stock = '513130.SH'
    #目标数量
    target_vol = 500
    #可用数量
    available_vol = position_available_dict[stock] if stock in position_available_dict else 0
    #卖出量取目标量与可用量中较小的
    sell_vol = min(target_vol, available_vol)
    print(f"{stock} 目标卖出量 {target_vol} 可用数量 {available_vol} 卖出 {sell_vol}股")
    if sell_vol > 0:
        async_seq = xt_trader.order_stock_async(acc, stock, xtconstant.STOCK_SELL, sell_vol, xtconstant.LATEST_PRICE,
                                                -1,
                                                'strategy_name', stock)
    print(f"下单完成 等待回调")
    # 阻塞主线程退出
    xt_trader.run_forever()
    # 如果使用vscode pycharm等本地编辑器 可以进入交互模式 方便调试 (把上一行的run_forever注释掉 否则不会执行到这里)
    interact()


单股订阅实盘示例

需要调整的参数:

  • 113行的path变量需要改为本地客户端路径,券商端指定到 f"{安装目录}\userdata_mini",投研端指定到f"{安装目录}\userdata"
  • 122行的资金账号需要调整为自身资金账号
# coding:utf-8
import time, datetime, traceback, sys
from xtquant import xtdata
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant import xtconstant


# 定义一个类 创建类的实例 作为状态的容器
class _a():
    pass


A = _a()
A.bought_list = []
A.hsa = xtdata.get_stock_list_in_sector('沪深A股')


def interact():
    """执行后进入repl模式"""
    import code
    code.InteractiveConsole(locals=globals()).interact()


xtdata.download_sector_data()


def f(data):
    print(data)
    now = datetime.datetime.now()
    for stock in data:
        if stock not in A.hsa:
            continue
        cuurent_price = data[stock]['close']
        pre_price = data[stock]['preClose']
        ratio = cuurent_price / pre_price - 1 if pre_price > 0 else 0
        if ratio > 0.09 and stock not in A.bought_list:
            print(f"{now} 最新价 买入 {stock} 100股")
            async_seq = xt_trader.order_stock_async(acc, stock, xtconstant.STOCK_BUY, 100, xtconstant.LATEST_PRICE, -1,
                                                    'strategy_name', stock)
            A.bought_list.append(stock)


class MyXtQuantTraderCallback(XtQuantTraderCallback):
    def on_disconnected(self):
        """
        连接断开
        :return:
        """
        print(datetime.datetime.now(), '连接断开回调')

    def on_stock_order(self, order):
        """
        委托回报推送
        :param order: XtOrder对象
        :return:
        """
        print(datetime.datetime.now(), '委托回调', order.order_remark)

    def on_stock_trade(self, trade):
        """
        成交变动推送
        :param trade: XtTrade对象
        :return:
        """
        print(datetime.datetime.now(), '成交回调', trade.order_remark)

    def on_order_error(self, order_error):
        """
        委托失败推送
        :param order_error:XtOrderError 对象
        :return:
        """
        # print("on order_error callback")
        # print(order_error.order_id, order_error.error_id, order_error.error_msg)
        print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}")

    def on_cancel_error(self, cancel_error):
        """
        撤单失败推送
        :param cancel_error: XtCancelError 对象
        :return:
        """
        print(datetime.datetime.now(), sys._getframe().f_code.co_name)

    def on_order_stock_async_response(self, response):
        """
        异步下单回报推送
        :param response: XtOrderResponse 对象
        :return:
        """
        print(f"异步委托回调 {response.order_remark}")

    def on_cancel_order_stock_async_response(self, response):
        """
        :param response: XtCancelOrderResponse 对象
        :return:
        """
        print(datetime.datetime.now(), sys._getframe().f_code.co_name)

    def on_account_status(self, status):
        """
        :param response: XtAccountStatus 对象
        :return:
        """
        print(datetime.datetime.now(), sys._getframe().f_code.co_name)


if __name__ == '__main__':
    print("start")
    # 指定客户端所在路径, 券商端指定到 userdata_mini文件夹
    # 注意:如果是连接投研端进行交易,文件目录需要指定到f"{安装目录}\userdata"
    path = r'D:\qmt\投\迅投极速交易终端睿智融科版\userdata'
    # 生成session id 整数类型 同时运行的策略不能重复
    session_id = int(time.time())
    xt_trader = XtQuantTrader(path, session_id)
    # 开启主动请求接口的专用线程 开启后在on_stock_xxx回调函数里调用XtQuantTrader.query_xxx函数不会卡住回调线程,但是查询和推送的数据在时序上会变得不确定
    # 详见: http://docs.thinktrader.net/vip/pages/ee0e9b/#开启主动请求接口的专用线程
    # xt_trader.set_relaxed_response_order_enabled(True)

    # 创建资金账号为 800068 的证券账号对象 股票账号为STOCK 信用CREDIT 期货FUTURE
    acc = StockAccount('2000128', 'STOCK')
    # 创建交易回调类对象,并声明接收回调
    callback = MyXtQuantTraderCallback()
    xt_trader.register_callback(callback)
    # 启动交易线程
    xt_trader.start()
    # 建立交易连接,返回0表示连接成功
    connect_result = xt_trader.connect()
    print('建立交易连接,返回0表示连接成功', connect_result)
    # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
    subscribe_result = xt_trader.subscribe(acc)
    print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)

    #订阅的品种列表
    code_list = ['600000.SH', '000001.SZ']

    for code in code_list:
        xtdata.subscribe_quote(code, '1d', callback = f)

    # 阻塞主线程退出
    xt_trader.run_forever()
    # 如果使用vscode pycharm等本地编辑器 可以进入交互模式 方便调试 (把上一行的run_forever注释掉 否则不会执行到这里)
    interact()


全推订阅实盘示例

本示例用于展示如何订阅上海及深圳市场全推,对于沪深A股品种策略进行判断当前涨幅超过 9 个点的买入 200 股

需要调整的参数:

  • 111行的path变量需要改为本地客户端路径
  • 116行的资金账号需要调整为自身资金账号

注意

本策略只用于提供策略写法及参考,若您直接进行实盘下单,造成损失本网站不负担责任。

#coding:utf-8
import time, datetime, traceback, sys
from xtquant import xtdata
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant import xtconstant

#定义一个类 创建类的实例 作为状态的容器
class _a():
    pass
A = _a()
A.bought_list = []
A.hsa = xtdata.get_stock_list_in_sector('沪深A股')

def interact():
    """执行后进入repl模式"""
    import code
    code.InteractiveConsole(locals=globals()).interact()
xtdata.download_sector_data()

def f(data):
    now = datetime.datetime.now()
    for stock in data:
        if stock not in A.hsa:
            continue
        cuurent_price = data[stock]['lastPrice']
        pre_price = data[stock]['lastClose']
        ratio = cuurent_price / pre_price - 1 if pre_price > 0 else 0
        if ratio > 0.09 and stock not in A.bought_list:
            print(f"{now} 最新价 买入 {stock} 200股")
            async_seq = xt_trader.order_stock_async(acc, stock, xtconstant.STOCK_BUY, 200, xtconstant.LATEST_PRICE, -1, 'strategy_name', stock)
            A.bought_list.append(stock)
    
class MyXtQuantTraderCallback(XtQuantTraderCallback):
    def on_disconnected(self):
        """
        连接断开
        :return:
        """
        print(datetime.datetime.now(),'连接断开回调')

    def on_stock_order(self, order):
        """
        委托回报推送
        :param order: XtOrder对象
        :return:
        """
        print(datetime.datetime.now(), '委托回调', order.order_remark)


    def on_stock_trade(self, trade):
        """
        成交变动推送
        :param trade: XtTrade对象
        :return:
        """
        print(datetime.datetime.now(), '成交回调', trade.order_remark)


    def on_order_error(self, order_error):
        """
        委托失败推送
        :param order_error:XtOrderError 对象
        :return:
        """
        # print("on order_error callback")
        # print(order_error.order_id, order_error.error_id, order_error.error_msg)
        print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}")

    def on_cancel_error(self, cancel_error):
        """
        撤单失败推送
        :param cancel_error: XtCancelError 对象
        :return:
        """
        print(datetime.datetime.now(), sys._getframe().f_code.co_name)

    def on_order_stock_async_response(self, response):
        """
        异步下单回报推送
        :param response: XtOrderResponse 对象
        :return:
        """
        print(f"异步委托回调 {response.order_remark}")

    def on_cancel_order_stock_async_response(self, response):
        """
        :param response: XtCancelOrderResponse 对象
        :return:
        """
        print(datetime.datetime.now(), sys._getframe().f_code.co_name)

    def on_account_status(self, status):
        """
        :param response: XtAccountStatus 对象
        :return:
        """
        print(datetime.datetime.now(), sys._getframe().f_code.co_name)


if __name__ == '__main__':
    print("start")
    #指定客户端所在路径,
    # 注意:如果是连接投研端进行交易,文件目录需要指定到f"{安装目录}\userdata"
    path = r'D:\qmt\sp3\迅投极速交易终端 睿智融科版\userdata_mini'
    # 生成session id 整数类型 同时运行的策略不能重复
    session_id = int(time.time())
    xt_trader = XtQuantTrader(path, session_id)
    # 开启主动请求接口的专用线程 开启后在on_stock_xxx回调函数里调用XtQuantTrader.query_xxx函数不会卡住回调线程,但是查询和推送的数据在时序上会变得不确定
    # 详见: http://docs.thinktrader.net/vip/pages/ee0e9b/#开启主动请求接口的专用线程
    # xt_trader.set_relaxed_response_order_enabled(True)

    # 创建资金账号为 800068 的证券账号对象
    acc = StockAccount('800068', 'STOCK')
    # 创建交易回调类对象,并声明接收回调
    callback = MyXtQuantTraderCallback()
    xt_trader.register_callback(callback)
    # 启动交易线程
    xt_trader.start()
    # 建立交易连接,返回0表示连接成功
    connect_result = xt_trader.connect()
    print('建立交易连接,返回0表示连接成功', connect_result)
    # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
    subscribe_result = xt_trader.subscribe(acc)
    print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)

    #这一行是注册全推回调函数 包括下单判断 安全起见处于注释状态 确认理解效果后再放开
    # xtdata.subscribe_whole_quote(["SH", "SZ"], callback=f)
    # 阻塞主线程退出
    xt_trader.run_forever()
    # 如果使用vscode pycharm等本地编辑器 可以进入交互模式 方便调试 (把上一行的run_forever注释掉 否则不会执行到这里)
    interact()

定时判断实盘示例

# coding:utf-8
import time, datetime, traceback, sys
from xtquant import xtdata
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant import xtconstant


# 定义一个类 创建类的实例 作为状态的容器
class _a():
    pass


A = _a()
A.bought_list = []
A.hsa = xtdata.get_stock_list_in_sector('沪深A股')


def interact():
    """执行后进入repl模式"""
    import code
    code.InteractiveConsole(locals=globals()).interact()


xtdata.download_sector_data()


def f(data):
    now = datetime.datetime.now()
    # print(data)
    for stock in data:
        if stock not in A.hsa:
            continue
        cuurent_price = data[stock].iloc[-1, 0]
        pre_price = data[stock].iloc[-2, 0]
        ratio = cuurent_price / pre_price - 1 if pre_price > 0 else 0
        if ratio > 0.09 and stock not in A.bought_list:
            print(f"{now} 最新价 买入 {stock} 100股")
            async_seq = xt_trader.order_stock_async(acc, stock, xtconstant.STOCK_BUY, 100, xtconstant.LATEST_PRICE, -1,
                                                    'strategy_name', stock)
            A.bought_list.append(stock)


class MyXtQuantTraderCallback(XtQuantTraderCallback):
    def on_disconnected(self):
        """
        连接断开
        :return:
        """
        print(datetime.datetime.now(), '连接断开回调')

    def on_stock_order(self, order):
        """
        委托回报推送
        :param order: XtOrder对象
        :return:
        """
        print(datetime.datetime.now(), '委托回调', order.order_remark)

    def on_stock_trade(self, trade):
        """
        成交变动推送
        :param trade: XtTrade对象
        :return:
        """
        print(datetime.datetime.now(), '成交回调', trade.order_remark)

    def on_order_error(self, order_error):
        """
        委托失败推送
        :param order_error:XtOrderError 对象
        :return:
        """
        # print("on order_error callback")
        # print(order_error.order_id, order_error.error_id, order_error.error_msg)
        print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}")

    def on_cancel_error(self, cancel_error):
        """
        撤单失败推送
        :param cancel_error: XtCancelError 对象
        :return:
        """
        print(datetime.datetime.now(), sys._getframe().f_code.co_name)

    def on_order_stock_async_response(self, response):
        """
        异步下单回报推送
        :param response: XtOrderResponse 对象
        :return:
        """
        print(f"异步委托回调 {response.order_remark}")

    def on_cancel_order_stock_async_response(self, response):
        """
        :param response: XtCancelOrderResponse 对象
        :return:
        """
        print(datetime.datetime.now(), sys._getframe().f_code.co_name)

    def on_account_status(self, status):
        """
        :param response: XtAccountStatus 对象
        :return:
        """
        print(datetime.datetime.now(), sys._getframe().f_code.co_name)


if __name__ == '__main__':
    print("start")
    # 指定客户端所在路径, 券商端指定到 userdata_mini文件夹
    # 注意:如果是连接投研端进行交易,文件目录需要指定到f"{安装目录}\userdata"
    path = r'D:\qmt\投\迅投极速交易终端睿智融科版\userdata'
    # 生成session id 整数类型 同时运行的策略不能重复
    session_id = int(time.time())
    xt_trader = XtQuantTrader(path, session_id)
    # 开启主动请求接口的专用线程 开启后在on_stock_xxx回调函数里调用XtQuantTrader.query_xxx函数不会卡住回调线程,但是查询和推送的数据在时序上会变得不确定
    # 详见: http://docs.thinktrader.net/vip/pages/ee0e9b/#开启主动请求接口的专用线程
    # xt_trader.set_relaxed_response_order_enabled(True)

    # 创建资金账号为 800068 的证券账号对象 股票账号为STOCK 信用CREDIT 期货FUTURE
    acc = StockAccount('2000128', 'STOCK')
    # 创建交易回调类对象,并声明接收回调
    callback = MyXtQuantTraderCallback()
    xt_trader.register_callback(callback)
    # 启动交易线程
    xt_trader.start()
    # 建立交易连接,返回0表示连接成功
    connect_result = xt_trader.connect()
    print('建立交易连接,返回0表示连接成功', connect_result)
    # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
    subscribe_result = xt_trader.subscribe(acc)
    print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result)

    #订阅的品种列表
    code_list = ['600000.SH', '000001.SZ']
    #遍历品种 下载历史k线 订阅当日行情
    for code in code_list:
        xtdata.download_history_data(code, period='1d', start_time='20200101')
        xtdata.subscribe_quote(code, '1d', callback = None)

    while True:
        now = datetime.datetime.now()
        now_time = now.strftime('%H%M%S')
        if not '093000' <= now_time < '150000':
            print(f"{now} 非交易时间 循环退出")
            break
        #取k线数据
        data = xtdata.get_market_data_ex(['close'], code_list, period= '1d', start_time= '20240101')
        #判断交易
        f(data)
        #每次循环 睡眠三秒后继续
        time.sleep(3)


    # 阻塞主线程退出
    xt_trader.run_forever()
    # 如果使用vscode pycharm等本地编辑器 可以进入交互模式 方便调试 (把上一行的run_forever注释掉 否则不会执行到这里)
    interact()

交易接口重连

该示例演示交易链接断开时重连的代码处理。

提示

  1. 该示例不是线程安全的,仅演示断开链接时应该怎么处理重连代码,实际使用时请注意避免潜在的问题
  2. 本策略只用于提供策略写法及参考,若您直接进行实盘下单,造成损失本网站不负担责任。
import time
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant import xtconstant

#指定客户端所在路径
path = 'E:\qmt\\userdata_mini'
# 指定session id 整数类型,任意整数即可,同时运行的策略不能重复
session_id = 123456

xt_trader = None


class MyXtQuantTraderCallback(XtQuantTraderCallback):
    def on_disconnected(self):
        """
        连接断开
        :return:
        """
        print("connection lost, 交易接口断开,即将重连")

        global xt_trader
        xt_trader = None


def connect():
    global session_id
    
    # 重连时需要更换session_id
    session_id += 1
    xt_trader = XtQuantTrader(path, session_id)
    # 创建资金账号为1000000365的证券账号对象
    acc = StockAccount('1000000365')
    callback = MyXtQuantTraderCallback()
    xt_trader.register_callback(callback)
    # 启动交易线程
    xt_trader.start()
    # 建立交易连接,返回0表示连接成功
    connect_result = xt_trader.connect()
    if connect_result == 0:
        return xt_trader, True
    else:
        return None, False


if __name__ == "__main__":
    xt_trader, success = connect()
    print(xt_trader, success)
    while 1:
        if xt_trader is None:
            print('开始重连交易接口')
            xt_trader, success = connect()
            if success:
                print('交易接口重连成功')
        time.sleep(3)
        

指定session id范围连接交易

该示例演示指定session重试连接次数的代码处理。


from xtquant.xttrader import XtQuantTrader


def connect(path, session):
    xttrader = XtQuantTrader(path, session)
    xttrader.start()
    connect_result = xttrader.connect()
    return xttrader, connect_result


def try_connect() -> XtQuantTrader: 
    # 用固定session取值范围建立XtQuantTrader 以防止占用大量硬盘空间
    session_list = [i for i in range(1,10)]
    path = r'd:\qmt\userdata_mini'
    for session in session_list:
        xt_trader, connect_result = connect(path, session)
        if connect_result == 0:
            return xt_trader
        else:
            continue
    # 下面可以添加一些通知代码,比如发邮件、发送钉钉等

    raise Exception('XtQuantTrader 链接失败,请重试')


if __name__ == '__main__':
    print(try_connect())


信用账号执行还款

本示例用于展示如何使用xtquant库对信用账号执行还款的操作

提示

本策略只用于提供策略写法及参考,若您直接进行实盘下单,造成损失本网站不负担责任。

#coding=utf-8
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant import xtconstant

# 修改参数
# path为mini qmt客户端安装目录下userdata_mini路径
path = 'E:\\qmt\\userdata_mini'
# session_id为会话编号,策略使用方对于不同的Python策略需要使用不同的会话编号
session_id = 1234567
repay_money = 1000.51  # 元,需要执行还款的金额

class MyXtQuantTraderCallback(XtQuantTraderCallback):
    def on_disconnected(self):
        """
        连接断开
        :return:
        """
        print("connection lost")
    def on_stock_order(self, order):
        """
        委托回报推送
        :param order: XtOrder对象
        :return:
        """
        print("on order callback:")
        print(order.stock_code, order.order_status, order.order_sysid)
    def on_stock_asset(self, asset):
        """
        资金变动推送
        :param asset: XtAsset对象
        :return:
        """
        print("on asset callback")
        print(asset.account_id, asset.cash, asset.total_asset)
    def on_stock_trade(self, trade):
        """
        成交变动推送
        :param trade: XtTrade对象
        :return:
        """
        print("on trade callback")
        print(trade.account_id, trade.stock_code, trade.order_id)
    def on_order_error(self, order_error):
        """
        委托失败推送
        :param order_error:XtOrderError 对象
        :return:
        """
        print("on order_error callback")
        print(order_error.order_id, order_error.error_id, order_error.error_msg)
    def on_cancel_error(self, cancel_error):
        """
        撤单失败推送
        :param cancel_error: XtCancelError 对象
        :return:
        """
        print("on cancel_error callback")
        print(cancel_error.order_id, cancel_error.error_id, cancel_error.error_msg)
    def on_order_stock_async_response(self, response):
        """
        异步下单回报推送
        :param response: XtOrderResponse 对象
        :return:
        """
        print("on_order_stock_async_response")
        print(response.account_id, response.order_id, response.seq)
    def on_account_status(self, status):
        """
        :param response: XtAccountStatus 对象
        :return:
        """
        print("on_account_status")
        print(status.account_id, status.account_type, status.status)


if __name__ == "__main__":
    print("demo test")


    xt_trader = XtQuantTrader(path, session_id)
    # 创建资金账号为1000000365的证券账号对象
    acc = StockAccount('200035', 'CREDIT')
    # StockAccount可以用第二个参数指定账号类型,如沪港通传'HUGANGTONG',深港通传'SHENGANGTONG'
    # acc = StockAccount('1000000365','STOCK')
    # 创建交易回调类对象,并声明接收回调
    callback = MyXtQuantTraderCallback()
    xt_trader.register_callback(callback)
    # 启动交易线程
    xt_trader.start()
    # 建立交易连接,返回0表示连接成功
    connect_result = xt_trader.connect()
    if connect_result != 0:
        import sys
        sys.exit('链接失败,程序即将退出 %d'%connect_result)
    # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功
    subscribe_result = xt_trader.subscribe(acc)
    if subscribe_result != 0:
        print('账号订阅失败 %d'%subscribe_result)
    print(subscribe_result)
    stock_code = '600000.SH'  # 参数占位用,任意股票代码都可以
    volume = 200  # 参数占位用,任意数量
    # 使用指定价下单,接口返回订单编号,后续可以用于撤单操作以及查询委托状态
    fix_result_order_id = xt_trader.order_stock(acc, stock_code, xtconstant.CREDIT_DIRECT_CASH_REPAY, repay_money, xtconstant.FIX_PRICE, -1, 'strategy_name', 'remark')

    # 阻塞线程,接收交易推送
    xt_trader.run_forever()
上次更新:
邀请注册送VIP优惠券
分享下方的内容给好友、QQ群、微信群,好友注册您即可获得VIP优惠券
玩转qmt,上迅投qmt知识库