首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >我用迅投miniqmt写个A股涨跌监控,盘面异动秒级推钉钉

我用迅投miniqmt写个A股涨跌监控,盘面异动秒级推钉钉

作者头像
子晓聊技术
发布2026-05-19 18:52:49
发布2026-05-19 18:52:49
1540
举报
文章被收录于专栏:子晓AI量化子晓AI量化

昨天写了一篇刚割完就大涨?散户如何避免这种“左右挨耳光”的尴尬 文章, 这里用迅投miniqmt写个A股涨跌监控,盘面异动秒级推钉钉。

先说效果

每个交易日,程序每30秒扫一次全市场5000多只A股,实时统计红盘多少家、绿盘多少家、上证涨跌幅多少。一旦触发条件,钉钉秒推:

  • 红盘跌破500家 → 🔴极弱盘面警告
  • 红盘冲上4000家 → 🟠过热盘面警告
  • 上证涨超1% → 📈大涨提醒
  • 上证跌超1% → 📉大跌提醒

比如那天大盘突然跳水,你正在开会,手机震了一下:

代码语言:javascript
复制
【股票监控】2025-05-16 14:23:15
📉 上证大跌提醒
红盘 680 家 | 绿盘 4100 家
上证涨跌幅: -1.37%

不用打开行情软件,扫一眼就知道该不该操作。

怎么实现?

第一步:拿到全市场实时行情

用迅投MiniQMT的Python库xtquant,它有个get_full_tick接口,一次调用就能拿全市场的分笔数据,效率很高。

先获取全市场股票列表:

代码语言:javascript
复制
from xtquant import xtdata

stocks = xtdata.get_stock_list_in_sector("沪深京A股")
tick_data = xtdata.get_full_tick(stocks)

返回的tick_data是一个字典,每只股票里面都有lastPrice(最新价)和lastClose(昨收价)。有了这两个值,涨跌家数就很好算了。

第二步:统计红盘绿盘

逻辑很简单,遍历所有股票,用最新价和昨收价算涨跌幅:

  • 涨幅 > 0.01% → 红盘
  • 跌幅 > 0.01% → 绿盘
  • 中间那点 → 平盘
代码语言:javascript
复制
for code in stocks:
    tick = tick_data[code]
    last_close = tick.get("lastClose", 0)
    last_price = tick.get("lastPrice", 0)
    change_pct = (last_price - last_close) / last_close * 100
    
    if change_pct > 0.01:
        red += 1
    elif change_pct < -0.01:
        green += 1

第三步:定告警条件,别乱叫

这一步很关键。告警设得太灵敏,一天推你20条,你很快就把群免打扰了;设得太迟钝,真出事了你又收不到。

这里设置四个信号:

条件

含义

为什么设这个阈值

红盘 < 500

极弱

5000多只股票只有不到500只涨,说明市场恐慌

红盘 > 4000

过热

几乎全线飘红,该想想是不是该减仓了

上证涨 > 1%

大涨

短期情绪高潮,追涨需谨慎

上证跌 > 1%

大跌

可能是机会也可能是崩盘,得注意

还有一个细节:同一个条件触发了不要反复推。红盘一直在400家徘徊,你不能每30秒推一条吧?所以用了一个alert_active集合来记录当前已触发的告警,只有状态首次进入才推送,恢复正常后清除标记,下次再触发才会再推。

第四步:钉钉推送

钉钉的机器人Webhook很好用,创建群→添加机器人→选"自定义关键词"模式,拿到Webhook地址就行。

推送逻辑就几行代码:

代码语言:javascript
复制
payload = {
    "msgtype": "text",
    "text": {"content": "【股票监控】红盘 680 家 | 绿盘 4100 家\n上证涨跌幅: -1.37%"},
    "at": {"isAtAll": True},
}
urllib.request.urlopen(req, timeout=10)

注意一点:创建机器人时安全设置选"自定义关键词",消息里必须包含这个关键词才能发出去。我把关键词设成了"股票监控",每条消息开头都带上,保证能推送成功。

第五步:让它自己跑

主循环就这么几步:拿数据→算涨跌→判断告警→推送→等30秒→再来一轮。

代码语言:javascript
复制
while True:
    tick_data = xtdata.get_full_tick(all_codes)
    result = count_red_green(all_codes, tick_data)
    alerts = tracker.update(result["red"], result["green"], index_chg)
    for msg in alerts:
        send_dingtalk(msg)
    time.sleep(30)

正式跑的时候把is_trading_time()那个判断打开,非交易时段自动休眠,不浪费资源。

除了这还能怎么玩?

这个监控是个基础框架,往上加东西很方便:

  • 加个股监控:把持仓股票加进来,个股涨跌超过阈值也推送
  • 加板块监控:监控热点板块的涨跌家数,比看指数更直观
  • 加量能异动:突然放量的股票筛出来,可能是资金在动手
  • 加定时播报:比如每天10:30、13:30、14:50各播一次盘面概况,不管有没有告警

代码我已经整理好了,改一下钉钉Webhook地址和MiniQMT路径就能直接跑。

代码语言:javascript
复制
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import urllib.request
import json
import logging
import datetime
from typing import Set
from xtquant import xtdata
# ---- 钉钉机器人(关键字模式)----
# 创建机器人时安全设置选"自定义关键词",添加关键词"股票监控"
DINGTALK_WEBHOOK = "https://oapi.dingtalk.com/robot/send?access_token=xxxxxxxx"
DINGTALK_KEYWORD = "股票监控"  # 钉钉机器人设置的关键词,消息中必须包含此词才能发送
# ---- 告警阈值 ----
RED_LOW_THRESHOLD    = 500     # 红盘家数低于此值告警(极弱)
RED_HIGH_THRESHOLD   = 4000    # 红盘家数高于此值告警(极强)
INDEX_CHANGE_LIMIT   = 1.0     # 上证指数涨跌幅超过此百分比告警(正负双向)
# ---- 监控标的 ----
INDEX_CODE = "000001.SH"       # 上证指数
# ---- 扫描频率 ----
SCAN_INTERVAL = 30             # 扫描间隔(秒),交易时段内每N秒全推一次
# ---- 日志 ----
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
# ============================================================
#  钉钉推送(关键字模式)
# ============================================================
def send_dingtalk(message: str, at_all: bool = True) -> bool:
    """
    发送钉钉文本消息(关键字模式,消息中须包含设置的关键词才能发送)
    message: 文本内容
    at_all: 是否@所有人
    """
    if "YOUR_TOKEN" in DINGTALK_WEBHOOK:
        logger.warning("钉钉 webhook 未配置,跳过推送(消息内容已打印到日志)")
        return False
    url = DINGTALK_WEBHOOK
    payload = {
        "msgtype": "text",
        "text": {"content": message},
        "at": {"isAtAll": at_all},
    }
    try:
        req = urllib.request.Request(
            url,
            data=json.dumps(payload, ensure_ascii=False).encode("utf-8"),
            headers={"Content-Type": "application/json"},
        )
        with urllib.request.urlopen(req, timeout=10) as resp:
            result = json.loads(resp.read().decode("utf-8"))
            if result.get("errcode") == 0:
                logger.info("钉钉推送成功")
                return True
            else:
                logger.error(f"钉钉推送失败: {result}")
                return False
    except Exception as e:
        logger.error(f"钉钉推送异常: {e}")
        return False
# ============================================================
#  市场状态追踪(防止重复告警)
# ============================================================
class MarketStateTracker:
    """
    追踪市场状态,只在状态发生变化时才触发告警。
    """
    def __init__(self):
        self.alert_active: Set[str] = set()
    def update(self, red_count: int, green_count: int, index_change_pct: float) -> list:
        """
        根据最新数据判断是否需要告警
        返回需要推送的告警消息列表(空=无新告警)
        """
        alerts = []
        # ---- 红盘家数 < 500 ----
        condition_id = "red_low"
        if red_count < RED_LOW_THRESHOLD:
            if condition_id not in self.alert_active:
                self.alert_active.add(condition_id)
                alerts.append(
                    f"🔴 极弱盘面警告\n"
                    f"红盘 {red_count} 家 | 绿盘 {green_count} 家\n"
                    f"上证涨跌幅: {index_change_pct:+.2f}%"
                )
        else:
            self.alert_active.discard(condition_id)
        # ---- 红盘家数 > 4000 ----
        condition_id = "red_high"
        if red_count > RED_HIGH_THRESHOLD:
            if condition_id not in self.alert_active:
                self.alert_active.add(condition_id)
                alerts.append(
                    f"🟠 过热盘面警告\n"
                    f"红盘 {red_count} 家 | 绿盘 {green_count} 家\n"
                    f"上证涨跌幅: {index_change_pct:+.2f}%"
                )
        else:
            self.alert_active.discard(condition_id)
        # ---- 上证涨跌幅 > +1% ----
        condition_id = "index_up"
        if index_change_pct > INDEX_CHANGE_LIMIT:
            if condition_id not in self.alert_active:
                self.alert_active.add(condition_id)
                alerts.append(
                    f"📈 上证大涨提醒\n"
                    f"红盘 {red_count} 家 | 绿盘 {green_count} 家\n"
                    f"上证涨跌幅: {index_change_pct:+.2f}%"
                )
        else:
            self.alert_active.discard(condition_id)
        # ---- 上证涨跌幅 < -1% ----
        condition_id = "index_down"
        if index_change_pct < -INDEX_CHANGE_LIMIT:
            if condition_id not in self.alert_active:
                self.alert_active.add(condition_id)
                alerts.append(
                    f"📉 上证大跌提醒\n"
                    f"红盘 {red_count} 家 | 绿盘 {green_count} 家\n"
                    f"上证涨跌幅: {index_change_pct:+.2f}%"
                )
        else:
            self.alert_active.discard(condition_id)
        return alerts
# ============================================================
#  核心监控逻辑
# ============================================================
def get_all_a_stock_codes() -> list:
    """获取全市场 A 股代码列表(沪深京)"""
    stocks = xtdata.get_stock_list_in_sector("沪深京A股")
    return list(set(stocks))
def count_red_green(codes: list, tick_data: dict) -> dict:
    """
    统计涨跌家数
    tick_data: get_full_tick 返回的数据
    返回: {"red": 红盘数, "green": 绿盘数, "flat": 平盘数, "total": 总数}
    """
    red = 0
    green = 0
    flat = 0
    for code in codes:
        if code not in tick_data or not tick_data[code]:
            continue
        tick = tick_data[code]
        # get_full_tick 返回字段: lastPrice(最新价) / lastClose(昨收盘价)
        last_close = tick.get("lastClose", 0)    # 昨收盘价
        last_price = tick.get("lastPrice", 0)     # 最新价
        if last_close <= 0 or last_price <= 0:
            continue
        change_pct = (last_price - last_close) / last_close * 100
        if change_pct > 0.01:
            red += 1
        elif change_pct < -0.01:
            green += 1
        else:
            flat += 1
    return {"red": red, "green": green, "flat": flat, "total": red + green + flat}
def get_index_change_pct(tick_data: dict) -> float:
    """获取上证指数涨跌幅(%)"""
    tick = tick_data.get(INDEX_CODE)
    if not tick:
        return 0.0
    last_close = tick.get("lastClose", 0)    # 昨收盘价
    last_price = tick.get("lastPrice", 0)     # 最新价
    if last_close <= 0 or last_price <= 0:
        return 0.0
    return (last_price - last_close) / last_close * 100
def is_trading_time() -> bool:
    """判断当前是否为交易时段(周一至周五 9:25~15:00)"""
    now = datetime.datetime.now()
    if now.weekday() >= 5:
        return False
    morning_start = now.replace(hour=9, minute=25, second=0)
    afternoon_end = now.replace(hour=15, minute=0, second=0)
    return morning_start <= now <= afternoon_end
def run():
    """主监控循环"""
    # 连接行情
    xtdata.connect()
    logger.info("xtdata 行情连接成功")
    # 下载上证指数历史数据
    xtdata.download_history_data(INDEX_CODE, period="1d", start_time="", end_time="")
    # 获取全市场 A 股代码
    all_codes = get_all_a_stock_codes()
    logger.info(f"全市场 A 股数量: {len(all_codes)}")
    # 状态追踪器
    tracker = MarketStateTracker()
    # 上一次打印摘要的时间
    last_summary_time = 0
    while True:
        try:
            # 非交易时段等待
            # if not is_trading_time():
            #     now = datetime.datetime.now()
            #     next_check = now.replace(hour=9, minute=25, second=0)
            #     if now > next_check:
            #         next_check += datetime.timedelta(days=1)
            #     wait_sec = max(60, (next_check - now).total_seconds())
            #     logger.info(f"非交易时段,{wait_sec:.0f}秒后重试")
            #     time.sleep(min(wait_sec, 300))
            #     continue
            # 1. 获取全市场分笔行情
            tick_data = xtdata.get_full_tick(all_codes)
            # 2. 统计涨跌家数
            result = count_red_green(all_codes, tick_data)
            # 3. 获取上证指数涨跌幅
            index_tick = xtdata.get_full_tick([INDEX_CODE])
            index_chg = get_index_change_pct(index_tick)
            # 4. 打印摘要(每60秒打印一次)
            now_ts = time.time()
            if now_ts - last_summary_time >= 60:
                now_str = datetime.datetime.now().strftime("%H:%M:%S")
                logger.info(
                    f"[{now_str}] 红盘={result['red']} 绿盘={result['green']} "
                    f"平盘={result['flat']} 共{result['total']}家 | "
                    f"上证{INDEX_CODE} {index_chg:+.2f}%"
                )
                last_summary_time = now_ts
            # 5. 检查告警条件
            alerts = tracker.update(result["red"], result["green"], index_chg)
            for msg in alerts:
                now_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                full_msg = f"【股票监控】{now_str}\n{msg}"
                logger.info(f"触发告警:\n{full_msg}")
                send_dingtalk(full_msg)
            # 6. 等待下一次扫描
            time.sleep(SCAN_INTERVAL)
        except KeyboardInterrupt:
            logger.info("用户中断,监控退出")
            break
        except Exception as e:
            logger.error(f"监控循环异常: {e}", exc_info=True)
            time.sleep(30)
if __name__ == "__main__":
    run()

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2026-05-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 子晓聊技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 昨天写了一篇刚割完就大涨?散户如何避免这种“左右挨耳光”的尴尬 文章, 这里用迅投miniqmt写个A股涨跌监控,盘面异动秒级推钉钉。
    • 先说效果
    • 怎么实现?
      • 第一步:拿到全市场实时行情
      • 第二步:统计红盘绿盘
      • 第三步:定告警条件,别乱叫
      • 第四步:钉钉推送
      • 第五步:让它自己跑
    • 除了这还能怎么玩?
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档