Files
tg_bot_collections/handlers/summary/__init__.py
yihong0618 17d02a0312 fix: black
Signed-off-by: yihong0618 <zouzou0208@gmail.com>
2025-11-24 20:48:08 +08:00

376 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import logging
import random
import zoneinfo
from datetime import datetime, timezone
from functools import partial
import shlex
import threading
import telegramify_markdown
from telebot import TeleBot
from telebot.types import Message
from wcwidth import wcswidth
from config import settings
from handlers._utils import non_llm_handler
from .messages import ChatMessage, MessageStore
from .utils import PROMPT, filter_message, parse_date, contains_non_ascii
from datetime import timedelta
from rich import print
logger = logging.getLogger("bot")
store = MessageStore("data/messages.db")
# 从环境变量获取提肛群组 ID
TIGONG_CHAT_ID = settings.tigong_chat_id
def get_display_width(text: str) -> int:
"""获取字符串的显示宽度,考虑中文字符"""
width = wcswidth(text)
return width if width is not None else len(text)
def pad_to_width(text: str, target_width: int) -> str:
"""根据显示宽度填充字符串到指定宽度"""
current_width = get_display_width(text)
padding = target_width - current_width
return text + " " * max(0, padding)
@non_llm_handler
def handle_message(message: Message, bot: TeleBot):
logger.debug(
"Received message: %s, chat_id=%d, from=%s",
message.text,
message.chat.id,
message.from_user.id,
)
# 检测中文消息并删除(仅在特定时间和群组)
# 只在提肛群组且每天北京时间 15:00-16:00 之间删除
if (
TIGONG_CHAT_ID
and message.chat.id == TIGONG_CHAT_ID
and message.text
and contains_non_ascii(message.text)
):
beijing_tz = zoneinfo.ZoneInfo("Asia/Shanghai")
current_time = datetime.now(tz=beijing_tz)
current_hour = current_time.hour
# 检查是否在北京时间 15:00-16:00 之间
if 15 <= current_hour < 16:
try:
bot.delete_message(message.chat.id, message.message_id)
bot.send_message(
message.chat.id,
f"已删除 @{message.from_user.username or message.from_user.full_name} 的中文消息",
)
logger.info(
"Deleted Chinese message from user %s in chat %d at %s",
message.from_user.full_name,
message.chat.id,
current_time.strftime("%H:%M:%S"),
)
return
except Exception as e:
logger.error("Failed to delete message: %s", e)
store.add_message(
ChatMessage(
chat_id=message.chat.id,
message_id=message.id,
content=message.text or "",
user_id=message.from_user.id,
user_name=message.from_user.full_name,
timestamp=datetime.fromtimestamp(message.date, tz=timezone.utc),
)
)
# 检测100整数倍消息提醒
if TIGONG_CHAT_ID and message.chat.id == TIGONG_CHAT_ID:
beijing_tz = zoneinfo.ZoneInfo("Asia/Shanghai")
today = datetime.now(tz=beijing_tz).strftime("%Y-%m-%d")
count = store.get_today_message_count(message.chat.id, today)
if count > 0 and count % 100 == 0:
bot.send_message(
message.chat.id,
f"🎉 今日第 {count} 条消息!提肛小助手提醒:该做提肛运动啦!",
)
@non_llm_handler
def summary_command(message: Message, bot: TeleBot):
"""生成消息摘要。示例:/summary today; /summary 2d"""
text_parts = message.text.split(maxsplit=1)
if len(text_parts) < 2:
date = "today"
else:
date = text_parts[1].strip()
since, now = parse_date(date, settings.timezone)
messages = store.get_messages_since(message.chat.id, since)
messages_text = "\n".join(
f"{msg.timestamp.isoformat()} - @{msg.user_name}: {msg.content}"
for msg in messages
)
if not messages_text:
bot.reply_to(message, "没有找到指定时间范围内的历史消息。")
return
new_message = bot.reply_to(message, "正在生成摘要,请稍候...")
response = settings.openai_client.chat.completions.create(
model=settings.openai_model,
messages=[
{"role": "user", "content": PROMPT.format(messages=messages_text)},
],
)
reply_text = f"""*👇 前情提要 👇 \\({since.strftime("%Y/%m/%d %H:%M")} \\- {now.strftime("%Y/%m/%d %H:%M")}\\)*
{telegramify_markdown.markdownify(response.choices[0].message.content)}
"""
logger.debug("Generated summary:\n%s", reply_text)
bot.edit_message_text(
chat_id=new_message.chat.id,
message_id=new_message.message_id,
text=reply_text,
parse_mode="MarkdownV2",
)
@non_llm_handler
def stats_command(message: Message, bot: TeleBot):
"""获取群组消息统计信息"""
stats = store.get_stats(message.chat.id)
if not stats:
bot.reply_to(message, "没有找到任何统计信息。")
return
# 计算数字部分的最大宽度
max_count_width = max(len(str(entry.message_count)) for entry in stats)
stats_text = "\n".join(
f"{entry.message_count:>{max_count_width}} messages - {entry.date}"
for entry in stats
)
text_args = shlex.split(message.text)
if len(text_args) > 1 and text_args[1].isdigit():
limit = int(text_args[1])
else:
limit = 30
user_stats = store.get_user_stats(message.chat.id, limit=limit)
if user_stats:
# 计算用户消息数量的最大宽度
max_user_count_width = max(
len(str(entry.message_count)) for entry in user_stats
)
user_text = "\n".join(
f"{entry.message_count:>{max_user_count_width}} messages - {entry.user_name}"
for entry in user_stats
)
else:
user_text = ""
return_message = f"📊 群组消息统计信息:\n```\n{stats_text}\n```\n👤 用户消息统计信息:\n```\n{user_text}\n```\\-\\-\\-\n"
bot.reply_to(
message,
return_message,
parse_mode="MarkdownV2",
)
@non_llm_handler
def search_command(message: Message, bot: TeleBot):
"""搜索群组消息(示例:/search 关键词 [N]"""
text_parts = shlex.split(message.text)
if len(text_parts) < 2:
bot.reply_to(message, "请提供要搜索的关键词。")
return
keyword = text_parts[1].strip()
if len(text_parts) > 2 and text_parts[2].isdigit():
limit = int(text_parts[2])
else:
limit = 10
messages = store.search_messages(message.chat.id, keyword, limit=limit)
if not messages:
bot.reply_to(message, "没有找到匹配的消息。")
return
chat_id = str(message.chat.id)
if chat_id.startswith("-100"):
chat_id = chat_id[4:]
items = []
for msg in messages:
link = f"https://t.me/c/{chat_id}/{msg.message_id}"
items.append(f"{link}\n```\n{msg.user_name}: {msg.content}\n```")
message_text = telegramify_markdown.markdownify("\n".join(items))
bot.reply_to(
message,
f"🔍 *搜索结果\\(只显示前 {limit}\\):*\n{message_text}",
parse_mode="MarkdownV2",
)
TIGONG_MESSAGES = [
"💪 提肛时间到!记得做提肛运动哦~",
"🏋️ 该做提肛运动了!坚持就是胜利!",
"⏰ 提肛小助手提醒:现在是提肛时间!",
"🎯 提肛运动打卡时间!加油!",
"💯 定时提醒:做做提肛运动,健康生活每一天!",
"🌟 提肛运动不能停!现在开始吧!",
"✨ 提肛小助手:该运动啦!",
]
@non_llm_handler
def alert_me_command(message: Message, bot: TeleBot):
"""加入提肛提醒队列"""
if TIGONG_CHAT_ID and message.chat.id == TIGONG_CHAT_ID:
beijing_tz = zoneinfo.ZoneInfo("Asia/Shanghai")
today = datetime.now(tz=beijing_tz).strftime("%Y-%m-%d")
username = message.from_user.username or ""
store.add_tigong_alert_user(
message.chat.id,
message.from_user.id,
message.from_user.full_name,
username,
today,
)
bot.reply_to(
message,
"✅ 已加入今日提肛提醒队列!每次提醒都会 @ 你,记得 /confirm 打卡哦!",
)
else:
bot.reply_to(message, "此命令仅在指定群组中可用。")
@non_llm_handler
def confirm_command(message: Message, bot: TeleBot):
"""确认完成今日提肛"""
if TIGONG_CHAT_ID and message.chat.id == TIGONG_CHAT_ID:
beijing_tz = zoneinfo.ZoneInfo("Asia/Shanghai")
today = datetime.now(tz=beijing_tz).strftime("%Y-%m-%d")
success = store.confirm_tigong_alert(
message.chat.id, message.from_user.id, today
)
if success:
bot.reply_to(message, "✅ 今日提肛已打卡!明天继续加油!")
else:
bot.reply_to(message, "你还没有加入提醒队列,请先使用 /alert_me 加入。")
else:
bot.reply_to(message, "此命令仅在指定群组中可用。")
@non_llm_handler
def standup_command(message: Message, bot: TeleBot):
"""手动发送提肛提醒消息"""
if TIGONG_CHAT_ID and message.chat.id == TIGONG_CHAT_ID:
try:
send_random_tigong_reminder(bot)
# 不需要reply因为send_random_tigong_reminder已经发送消息了
except Exception as e:
logger.error("Error in standup_command: %s", e)
bot.reply_to(message, "❌ 发送提醒失败,请稍后重试。")
else:
bot.reply_to(message, "此命令仅在指定群组中可用。")
def send_random_tigong_reminder(bot: TeleBot):
"""发送随机提肛提醒消息"""
try:
beijing_tz = zoneinfo.ZoneInfo("Asia/Shanghai")
today = datetime.now(tz=beijing_tz).strftime("%Y-%m-%d")
# 获取未确认用户列表
unconfirmed_users = store.get_unconfirmed_users(TIGONG_CHAT_ID, today)
message = random.choice(TIGONG_MESSAGES)
# 如果有未确认用户,@他们
if unconfirmed_users:
message += "\n\n"
mentions = []
for user in unconfirmed_users:
# 使用 username 或者 text mention
username = user.get("username", "")
if username:
mentions.append(f"@{username}")
else:
# 如果没有 username使用名字但不能点击
mentions.append(user["user_name"])
message += " ".join(mentions) + " 记得打卡哦!"
# 发送消息
bot.send_message(TIGONG_CHAT_ID, message)
logger.info(
"Sent tigong reminder to chat %d with %d mentions",
TIGONG_CHAT_ID,
len(unconfirmed_users),
)
except Exception as e:
logger.error("Failed to send tigong reminder: %s", e, exc_info=True)
raise
def schedule_tigong_reminders(bot: TeleBot):
"""安排提肛提醒任务每天北京时间8:00-19:00每2小时发送一次"""
def run_scheduler():
import time
beijing_tz = zoneinfo.ZoneInfo("Asia/Shanghai")
while True:
now = datetime.now(tz=beijing_tz)
current_hour = now.hour
# 检查是否在北京时间8:00-19:00之间
if 8 <= current_hour < 19:
# 检查是否在偶数小时的整点8, 10, 12, 14, 16, 18
if current_hour % 2 == 0 and now.minute == 0 and now.second < 30:
send_random_tigong_reminder(bot)
time.sleep(30) # 避免在同一分钟内重复发送
# 每30秒检查一次
time.sleep(30)
# 在后台线程中运行调度器
scheduler_thread = threading.Thread(target=run_scheduler, daemon=True)
scheduler_thread.start()
logger.info("Tigong reminder scheduler started")
load_priority = 5
if settings.openai_api_key:
def register(bot: TeleBot):
"""注册命令处理器"""
bot.register_message_handler(
summary_command, commands=["summary"], pass_bot=True
)
bot.register_message_handler(stats_command, commands=["stats"], pass_bot=True)
bot.register_message_handler(search_command, commands=["search"], pass_bot=True)
bot.register_message_handler(
standup_command, commands=["standup"], pass_bot=True
)
bot.register_message_handler(
alert_me_command, commands=["alert_me"], pass_bot=True
)
bot.register_message_handler(
confirm_command, commands=["confirm"], pass_bot=True
)
bot.register_message_handler(
handle_message,
func=partial(filter_message, bot=bot, check_chinese=True),
pass_bot=True,
)
# 启动提肛提醒定时任务
schedule_tigong_reminders(bot)