Python如何实时获取通达信数据?有哪些方法可以实现?
摘要:
核心原理:通达信的TCP/IP协议通达信客户端(如“通达信金融终端”、“通达信旗舰版”等)在启动时,会作为一个服务器,监听一个本地端口(通常是7709或7708),其他程序(包括我... 核心原理:通达信的TCP/IP协议
通达信客户端(如“通达信金融终端”、“通达信旗舰版”等)在启动时,会作为一个服务器,监听一个本地端口(通常是7709或7708),其他程序(包括我们的Python脚本)可以作为一个客户端,连接到这个端口,并按照特定的协议格式发送请求数据包,通达信服务器则会返回对应的数据包。
这个过程类似于网络爬虫,但协议是二进制的,而不是HTTP/HTTPS。
(图片来源网络,侵删)
使用成熟的第三方库(推荐)
对于大多数用户来说,使用现成的第三方库是最高效、最稳定的选择,这些库已经帮我们完成了复杂的协议解析工作。
推荐库:pytdx
pytdx 是一个非常流行的、专门用于获取通达信数据的Python库,它支持获取:
- 实时行情数据
- 历史K线数据
- 股票基本信息
- 板块信息等
安装 pytdx
pip install pytdx
准备工作:配置通达信
(图片来源网络,侵删)
- 确保通达信客户端已启动:打开你的通达信软件,并登录账号,软件必须在后台运行,Python脚本才能连接到它。
- 找到本地服务器端口:通常默认是
7709,如果连接失败,可以尝试7708或其他端口,有些版本可能需要手动配置允许外部连接。
代码示例:获取实时行情
下面是一个完整的示例,展示如何连接通达信并获取所有A股的实时行情数据。
from pytdx.hq import TdxHq_API
import pandas as pd
import time
# --- 配置 ---
#通达信客户端的IP和端口,通常是本地
HOST = '127.0.0.1'
PORT = 7709
def get_realtime_data():
"""
连接通达信并获取实时行情数据
"""
api = TdxHqApi()
data_list = []
try:
# 1. 建立连接
# with语句可以确保连接在使用完毕后自动关闭
with api.connect(HOST, PORT):
print("成功连接到通达信服务器...")
# 2. 获取所有股票的代码(市场代码 + 股票代码)
# ('sh', '600000') 表示上海市场的浦发银行
all_stocks = api.get_security_list('sh') + api.get_security_list('sz')
print(f"共获取到 {len(all_stocks)} 只股票")
# 3. 分批获取实时行情数据
# 通达信一次最多请求400只股票,所以需要分批
batch_size = 400
for i in range(0, len(all_stocks), batch_size):
batch = all_stocks[i:i + batch_size]
market_codes, stock_codes = zip(*batch)
# 获取这一批股票的实时行情数据
data = api.get_security_quotes(market_codes, stock_codes)
if data:
data_list.extend(data)
# 添加短暂延迟,避免请求过快
time.sleep(0.1)
except ConnectionRefusedError:
print("连接失败!请检查:")
print("1. 通达信客户端是否已启动并登录。")
print("2. 端口号(PORT)是否正确。")
print("3. 通达信是否允许外部连接。")
except Exception as e:
print(f"发生未知错误: {e}")
finally:
# 4. 关闭连接(在with语句块中会自动执行,但写上更清晰)
api.close()
return data_list
def format_to_dataframe(data_list):
"""
将获取到的数据转换为Pandas DataFrame,方便处理和分析
"""
if not data_list:
print("未获取到任何数据。")
return pd.DataFrame()
# 将数据列表转换为DataFrame
df = pd.DataFrame(data_list)
# 重命名列名为中文,更易读
df.columns = [
'代码', '名称', '当前价', '昨收', '今开', '最高', '最低',
'成交量', '成交额', '买一价', '买一量', '卖一价', '卖一量',
'日期', '时间', '涨跌额', '涨跌幅', '委比', '委差',
'昨成交量', '是否ST', '板块代码'
]
# 设置代码为索引,并补全市场前缀
df['代码'] = df['市场代码'].astype(str) + df['代码'].astype(str).str.zfill(6)
df = df.set_index('代码')
# 只保留我们关心的列,并调整顺序
df = df[['名称', '当前价', '涨跌幅', '成交量', '成交额', '最高', '最低']]
# 将数值列转换为float类型
numeric_cols = ['当前价', '涨跌幅', '成交量', '成交额', '最高', '最低']
df[numeric_cols] = df[numeric_cols].apply(pd.to_numeric, errors='coerce')
return df
if __name__ == '__main__':
# 获取实时数据
realtime_data = get_realtime_data()
if realtime_data:
# 格式化数据
df_realtime = format_to_dataframe(realtime_data)
# 打印结果
print("\n--- 实时行情数据 (前10条) ---")
print(df_realtime.head(10))
# 可以将数据保存到CSV文件
# df_realtime.to_csv('realtime_stock_data.csv', encoding='gbk')
# print("\n数据已保存到 realtime_stock_data.csv")
手动解析数据包(进阶)
如果你对底层协议感兴趣,或者需要实现一些pytdx没有覆盖的功能,可以尝试手动解析数据包,这需要你理解通达信的二进制协议格式。
原理:
- 构造请求数据包:按照协议格式,构造一个二进制数据包,告诉通达信你需要什么数据(所有股票的行情)。
- 发送数据包:通过
socket将请求数据包发送到通达信的指定端口。 - 接收响应数据包:接收通达信返回的二进制数据流。
- 解析数据包:根据协议规范,解析这个二进制流,提取出有用的信息(如股票代码、价格等)。
代码示例:手动获取所有股票列表
这个例子比较基础,只获取股票列表,但足以展示手动解析的流程。
import socket
import struct
import pandas as pd
# --- 协议相关常量 ---
TDX_HOST = '127.0.0.1'
TDX_PORT = 7709
# 定义一些操作码
# 0x0a01 是获取股票列表的请求命令
CMD_GET_STOCK_LIST = 0x0a01
def create_request_package(cmd, market, page):
"""
构造请求数据包
"""
# 包头: 2字节(16进制)的命令 + 2字节(16进制)的市场代码 + 2字节(16进制)的页码
# 小端字节序
package = struct.pack('<HHH', cmd, market, page)
return package
def parse_stock_list_response(data):
"""
解析获取股票列表的响应数据包
"""
stocks = []
# 响应包的前6个字节是包头,从第7个字节开始才是真正的数据
# 每个股票信息长度为16字节
stock_data = data[6:]
for i in range(0, len(stock_data), 16):
if i + 16 > len(stock_data):
break
# 解包股票信息
# <: 小端字节序
# H: 2字节无符号整数 (市场代码)
# 6s: 6字节字符串 (股票代码)
# 32s: 32字节字符串 (股票名称)
market_code, stock_code, stock_name = struct.unpack('<H6s32s', stock_data[i:i+16])
# 去掉名称中的空字节
stock_name = stock_name.decode('gbk').strip('\x00')
stock_code = stock_code.decode('ascii').strip('\x00')
stocks.append((market_code, stock_code, stock_name))
return stocks
def manual_get_stocks():
"""
手动获取股票列表
"""
s = None
all_stocks = []
try:
# 1. 创建socket并连接
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((TDX_HOST, TDX_PORT))
print("成功连接到通达信服务器...")
# 2. 构造并发送请求数据包
# 获取上海市场(sh)的股票,第0页
package_sh = create_request_package(CMD_GET_STOCK_LIST, 1, 0)
s.send(package_sh)
# 接收响应数据
# 先接收包头,获取数据包长度
header = s.recv(6)
if not header:
raise ConnectionError("接收数据头失败")
# 解析包头,获取数据包长度(小端)
# <: 小端, H: 2字节无符号整数, I: 4字节无符号整数
# 包头格式: 2字节命令 + 4字节长度
_, data_len = struct.unpack('<HI', header)
# 根据长度接收完整的数据包
response_data = b''
while len(response_data) < data_len:
chunk = s.recv(4096)
if not chunk:
raise ConnectionError("连接中断,数据接收不完整")
response_data += chunk
# 解析数据
stocks_sh = parse_stock_list_response(response_data)
all_stocks.extend(stocks_sh)
print(f"获取到上海市场股票: {len(stocks_sh)} 只")
# 获取深圳市场(sz)的股票
package_sz = create_request_package(CMD_GET_STOCK_LIST, 2, 0)
s.send(package_sz)
header = s.recv(6)
_, data_len = struct.unpack('<HI', header)
response_data = b''
while len(response_data) < data_len:
response_data += s.recv(4096)
stocks_sz = parse_stock_list_response(response_data)
all_stocks.extend(stocks_sz)
print(f"获取到深圳市场股票: {len(stocks_sz)} 只")
except ConnectionRefusedError:
print("连接失败!请检查通达信是否已启动。")
except Exception as e:
print(f"发生错误: {e}")
finally:
if s:
s.close()
return all_stocks
if __name__ == '__main__':
stocks = manual_get_stocks()
if stocks:
print("\n--- 所有股票列表 (前10条) ---")
# 转换为DataFrame方便查看
df_stocks = pd.DataFrame(stocks, columns=['市场代码', '股票代码', '股票名称'])
print(df_stocks.head(10))
总结与对比
| 特性 | 使用 pytdx 库 |
手动解析数据包 |
|---|---|---|
| 易用性 | 非常高,几行代码即可实现功能。 | 非常低,需要深入理解二进制协议,代码复杂。 |
| 稳定性 | 高,由社区维护,处理了各种边界情况。 | 低,需要自己处理网络异常、数据包不完整等问题。 |
| 功能 | 丰富,已封装好获取行情、K线、板块等多种功能。 | 有限,每实现一个新功能都需要重新研究和编写解析代码。 |
| 学习价值 | 低,直接调用API,无法了解底层原理。 | 非常高,是学习网络编程、二进制协议解析的绝佳实践。 |
| 适用人群 | 绝大多数用户,特别是数据分析师、量化交易初学者。 | 开发者、研究人员,需要深度定制或进行协议研究。 |
强烈建议初学者和绝大多数用户直接使用 pytdx,它能让你专注于数据分析、策略回测等核心业务,而不是把时间花费在底层的网络通信和协议解析上,只有当你有特殊需求,或者想深入探索其原理时,才去考虑手动解析的方法。
文章版权及转载声明
作者:咔咔本文地址:https://www.jits.cn/content/28050.html发布于 今天
文章转载或复制请以超链接形式并注明出处杰思科技・AI 股讯


还没有评论,来说两句吧...