Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions astrbot/core/config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,8 @@
"client_id": "",
"client_secret": "",
"card_template_id": "",
"card_content_key": "content",
"card_update_interval": 0.35,
},
"Telegram": {
"id": "telegram",
Expand Down Expand Up @@ -750,6 +752,16 @@
"type": "string",
"hint": "可选。钉钉互动卡片模板 ID。启用后将使用互动卡片进行流式回复。",
},
"card_content_key": {
"description": "卡片内容变量名",
"type": "string",
"hint": "可选。钉钉互动卡片模板中用于接收流式文本内容的变量名,默认 content。",
},
"card_update_interval": {
"description": "卡片更新间隔",
"type": "float",
"hint": "可选。钉钉互动卡片流式更新的最小间隔,单位秒,默认 0.35。",
},
"telegram_command_register": {
"description": "Telegram 命令注册",
"type": "bool",
Expand Down
81 changes: 81 additions & 0 deletions astrbot/core/platform/sources/dingtalk/dingtalk_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ async def process(self, message: dingtalk_stream.CallbackMessage):
self.client_ = client # 用于 websockets 的 client
self._shutdown_event = threading.Event()
self._terminated_event = threading.Event()
self.card_template_id = str(platform_config.get("card_template_id", "") or "")
self.card_content_key = str(
platform_config.get("card_content_key", "content") or "content"
)
self.card_update_interval = _safe_float(
platform_config.get("card_update_interval", 0.35),
0.35,
)
self._card_sessions: dict[str, tuple[object, str]] = {}

def _id_to_sid(self, dingtalk_id: str | None) -> str:
if not dingtalk_id:
Expand Down Expand Up @@ -753,6 +762,71 @@ async def send_message_chain_with_incoming(
# at_str=at_str,
)

async def create_message_card(
self,
message_id: str,
incoming_message: dingtalk_stream.ChatbotMessage,
) -> str:
if not self.card_template_id:
return ""

try:
card_replier = dingtalk_stream.AICardReplier(
self.client_,
incoming_message,
)
card_instance_id = await card_replier.async_create_and_deliver_card(
self.card_template_id,
{self.card_content_key: ""},
)
except AttributeError as e:
logger.error(
"钉钉流式卡片创建失败: 当前 dingtalk_stream 不支持 AICardReplier: %s",
e,
)
return ""
except Exception as e:
logger.error("钉钉流式卡片创建失败: %s", e)
return ""

if not card_instance_id:
logger.error("钉钉流式卡片创建失败: card_instance_id 为空")
return ""

card_token = message_id or card_instance_id
self._card_sessions[card_token] = (card_replier, card_instance_id)
return card_token

async def send_card_message(
self,
card_token: str,
content: str,
is_final: bool,
) -> None:
card_session = self._card_sessions.get(card_token)
if not card_session:
logger.warning("钉钉流式卡片更新跳过: 找不到卡片会话")
return

card_replier, card_instance_id = card_session
try:
await card_replier.async_streaming(
card_instance_id,
content_key=self.card_content_key,
content_value=content,
append=False,
finished=is_final,
failed=False,
)
except Exception as e:
logger.error("钉钉流式卡片更新失败: %s", e)
if is_final:
self._card_sessions.pop(card_token, None)
return

if is_final:
self._card_sessions.pop(card_token, None)

def create_event(self, message: AstrBotMessage) -> DingtalkMessageEvent:
"""Creates a Dingtalk message event.

Expand Down Expand Up @@ -860,3 +934,10 @@ def monkey_patch_close() -> NoReturn:

def get_client(self):
return self.client


def _safe_float(value: object, default: float) -> float:
try:
return float(value)
except (TypeError, ValueError):
return default
61 changes: 59 additions & 2 deletions astrbot/core/platform/sources/dingtalk/dingtalk_event.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import time
from typing import Any

from astrbot import logger
from astrbot.api.event import AstrMessageEvent, MessageChain
from astrbot.api.message_components import Plain


class DingtalkMessageEvent(AstrMessageEvent):
Expand Down Expand Up @@ -29,7 +31,62 @@ async def send(self, message: MessageChain) -> None:
await super().send(message)

async def send_streaming(self, generator, use_fallback: bool = False):
# 钉钉统一回退为缓冲发送:最终发送仍使用新的 HTTP 消息接口。
if not self.adapter:
logger.error("钉钉流式消息发送失败: 缺少 adapter")
return await self._send_streaming_as_plain_text(generator)

if not getattr(self.adapter, "card_template_id", ""):
return await self._send_streaming_as_plain_text(generator)

incoming_message = getattr(self.message_obj, "raw_message", None)
if incoming_message is None:
logger.warning("钉钉流式卡片发送失败: 缺少原始消息,回退普通消息")
return await self._send_streaming_as_plain_text(generator)

card_token = await self.adapter.create_message_card(
message_id=getattr(self.message_obj, "message_id", ""),
incoming_message=incoming_message,
)
if not card_token:
return await self._send_streaming_as_plain_text(generator)

full_content = ""
pending_chain = MessageChain()
last_update_at = 0.0
update_interval = max(
0.1,
getattr(self.adapter, "card_update_interval", 0.35),
)

try:
async for chain in generator:
for segment in chain.chain:
if isinstance(segment, Plain):
full_content += segment.text
else:
pending_chain.chain.append(segment)

now = time.monotonic()
if full_content and now - last_update_at >= update_interval:
await self.adapter.send_card_message(
card_token=card_token,
content=full_content,
is_final=False,
)
last_update_at = now
finally:
await self.adapter.send_card_message(
card_token=card_token,
content=full_content,
is_final=True,
)

if pending_chain.chain:
await self.send(pending_chain)

return None
Comment thread
Accenemy marked this conversation as resolved.

async def _send_streaming_as_plain_text(self, generator):
buffer = None
async for chain in generator:
if not buffer:
Expand All @@ -40,4 +97,4 @@ async def send_streaming(self, generator, use_fallback: bool = False):
return None
buffer.squash_plain()
await self.send(buffer)
return await super().send_streaming(generator, use_fallback)
return None