From 9b36e5285e0d30f6793b6deac56755974868bb1a Mon Sep 17 00:00:00 2001 From: 574895168 <574895168@qq.com> Date: Fri, 19 Jun 2026 13:12:32 +0800 Subject: [PATCH] feat: support DingTalk streaming AI cards --- astrbot/core/config/default.py | 12 +++ .../sources/dingtalk/dingtalk_adapter.py | 81 +++++++++++++++++++ .../sources/dingtalk/dingtalk_event.py | 61 +++++++++++++- 3 files changed, 152 insertions(+), 2 deletions(-) diff --git a/astrbot/core/config/default.py b/astrbot/core/config/default.py index 0c5f58d0ca..a1e42d5d79 100644 --- a/astrbot/core/config/default.py +++ b/astrbot/core/config/default.py @@ -438,6 +438,8 @@ "client_id": "", "client_secret": "", "card_template_id": "", + "card_content_key": "content", + "card_update_interval": 0.35, }, "Telegram": { "id": "telegram", @@ -750,6 +752,16 @@ "type": "string", "hint": "可选。钉钉互动卡片模板 ID。启用后将使用互动卡片进行流式回复。", }, + "card_content_key": { + "description": "卡片内容变量名", + "type": "string", + "hint": "可选。钉钉互动卡片模板中用于接收流式文本内容的变量名,默认 content。", + }, + "card_update_interval": { + "description": "卡片更新间隔", + "type": "float", + "hint": "可选。钉钉互动卡片流式更新的最小间隔,单位秒,默认 0.35。", + }, "telegram_command_register": { "description": "Telegram 命令注册", "type": "bool", diff --git a/astrbot/core/platform/sources/dingtalk/dingtalk_adapter.py b/astrbot/core/platform/sources/dingtalk/dingtalk_adapter.py index 9a07608c3c..984ba0bad2 100644 --- a/astrbot/core/platform/sources/dingtalk/dingtalk_adapter.py +++ b/astrbot/core/platform/sources/dingtalk/dingtalk_adapter.py @@ -99,6 +99,15 @@ async def process(self, message: dingtalk_stream.CallbackMessage): self.client_ = client # 用于 websockets 的 client self._shutdown_event = threading.Event() self._terminated_event = threading.Event() + self.card_template_id = str(platform_config.get("card_template_id", "") or "") + self.card_content_key = str( + platform_config.get("card_content_key", "content") or "content" + ) + self.card_update_interval = _safe_float( + platform_config.get("card_update_interval", 0.35), + 0.35, + ) + self._card_sessions: dict[str, tuple[object, str]] = {} def _id_to_sid(self, dingtalk_id: str | None) -> str: if not dingtalk_id: @@ -753,6 +762,71 @@ async def send_message_chain_with_incoming( # at_str=at_str, ) + async def create_message_card( + self, + message_id: str, + incoming_message: dingtalk_stream.ChatbotMessage, + ) -> str: + if not self.card_template_id: + return "" + + try: + card_replier = dingtalk_stream.AICardReplier( + self.client_, + incoming_message, + ) + card_instance_id = await card_replier.async_create_and_deliver_card( + self.card_template_id, + {self.card_content_key: ""}, + ) + except AttributeError as e: + logger.error( + "钉钉流式卡片创建失败: 当前 dingtalk_stream 不支持 AICardReplier: %s", + e, + ) + return "" + except Exception as e: + logger.error("钉钉流式卡片创建失败: %s", e) + return "" + + if not card_instance_id: + logger.error("钉钉流式卡片创建失败: card_instance_id 为空") + return "" + + card_token = message_id or card_instance_id + self._card_sessions[card_token] = (card_replier, card_instance_id) + return card_token + + async def send_card_message( + self, + card_token: str, + content: str, + is_final: bool, + ) -> None: + card_session = self._card_sessions.get(card_token) + if not card_session: + logger.warning("钉钉流式卡片更新跳过: 找不到卡片会话") + return + + card_replier, card_instance_id = card_session + try: + await card_replier.async_streaming( + card_instance_id, + content_key=self.card_content_key, + content_value=content, + append=False, + finished=is_final, + failed=False, + ) + except Exception as e: + logger.error("钉钉流式卡片更新失败: %s", e) + if is_final: + self._card_sessions.pop(card_token, None) + return + + if is_final: + self._card_sessions.pop(card_token, None) + def create_event(self, message: AstrBotMessage) -> DingtalkMessageEvent: """Creates a Dingtalk message event. @@ -860,3 +934,10 @@ def monkey_patch_close() -> NoReturn: def get_client(self): return self.client + + +def _safe_float(value: object, default: float) -> float: + try: + return float(value) + except (TypeError, ValueError): + return default diff --git a/astrbot/core/platform/sources/dingtalk/dingtalk_event.py b/astrbot/core/platform/sources/dingtalk/dingtalk_event.py index 3331c51476..9740b0146b 100644 --- a/astrbot/core/platform/sources/dingtalk/dingtalk_event.py +++ b/astrbot/core/platform/sources/dingtalk/dingtalk_event.py @@ -1,7 +1,9 @@ +import time from typing import Any from astrbot import logger from astrbot.api.event import AstrMessageEvent, MessageChain +from astrbot.api.message_components import Plain class DingtalkMessageEvent(AstrMessageEvent): @@ -29,7 +31,62 @@ async def send(self, message: MessageChain) -> None: await super().send(message) async def send_streaming(self, generator, use_fallback: bool = False): - # 钉钉统一回退为缓冲发送:最终发送仍使用新的 HTTP 消息接口。 + if not self.adapter: + logger.error("钉钉流式消息发送失败: 缺少 adapter") + return await self._send_streaming_as_plain_text(generator) + + if not getattr(self.adapter, "card_template_id", ""): + return await self._send_streaming_as_plain_text(generator) + + incoming_message = getattr(self.message_obj, "raw_message", None) + if incoming_message is None: + logger.warning("钉钉流式卡片发送失败: 缺少原始消息,回退普通消息") + return await self._send_streaming_as_plain_text(generator) + + card_token = await self.adapter.create_message_card( + message_id=getattr(self.message_obj, "message_id", ""), + incoming_message=incoming_message, + ) + if not card_token: + return await self._send_streaming_as_plain_text(generator) + + full_content = "" + pending_chain = MessageChain() + last_update_at = 0.0 + update_interval = max( + 0.1, + getattr(self.adapter, "card_update_interval", 0.35), + ) + + try: + async for chain in generator: + for segment in chain.chain: + if isinstance(segment, Plain): + full_content += segment.text + else: + pending_chain.chain.append(segment) + + now = time.monotonic() + if full_content and now - last_update_at >= update_interval: + await self.adapter.send_card_message( + card_token=card_token, + content=full_content, + is_final=False, + ) + last_update_at = now + finally: + await self.adapter.send_card_message( + card_token=card_token, + content=full_content, + is_final=True, + ) + + if pending_chain.chain: + await self.send(pending_chain) + + return None + + async def _send_streaming_as_plain_text(self, generator): buffer = None async for chain in generator: if not buffer: @@ -40,4 +97,4 @@ async def send_streaming(self, generator, use_fallback: bool = False): return None buffer.squash_plain() await self.send(buffer) - return await super().send_streaming(generator, use_fallback) + return None