mirror of
https://github.com/cdryzun/tg_bot_collections.git
synced 2026-01-13 21:04:24 +08:00
@@ -29,6 +29,13 @@ store = MessageStore("data/messages.db")
|
|||||||
TIGONG_CHAT_ID = settings.tigong_chat_id
|
TIGONG_CHAT_ID = settings.tigong_chat_id
|
||||||
|
|
||||||
|
|
||||||
|
def is_chinese_ban_time() -> bool:
|
||||||
|
"""检查当前是否在禁止中文的时间段(北京时间 15:00-16:00)"""
|
||||||
|
beijing_tz = zoneinfo.ZoneInfo("Asia/Shanghai")
|
||||||
|
current_hour = datetime.now(tz=beijing_tz).hour
|
||||||
|
return 15 <= current_hour < 16
|
||||||
|
|
||||||
|
|
||||||
def get_display_width(text: str) -> int:
|
def get_display_width(text: str) -> int:
|
||||||
"""获取字符串的显示宽度,考虑中文字符"""
|
"""获取字符串的显示宽度,考虑中文字符"""
|
||||||
width = wcswidth(text)
|
width = wcswidth(text)
|
||||||
@@ -42,17 +49,156 @@ def pad_to_width(text: str, target_width: int) -> str:
|
|||||||
return text + " " * max(0, padding)
|
return text + " " * max(0, padding)
|
||||||
|
|
||||||
|
|
||||||
@non_llm_handler
|
def check_poll_for_chinese(message: Message) -> bool:
|
||||||
def handle_message(message: Message, bot: TeleBot):
|
"""检查投票消息是否包含中文"""
|
||||||
logger.debug(
|
if message.poll is None:
|
||||||
"Received message: %s, chat_id=%d, from=%s",
|
return False
|
||||||
message.text,
|
# 检查投票问题
|
||||||
message.chat.id,
|
if contains_non_ascii(message.poll.question):
|
||||||
message.from_user.id,
|
return True
|
||||||
)
|
# 检查投票选项
|
||||||
|
for option in message.poll.options:
|
||||||
|
if contains_non_ascii(option.text):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
# 检测中文消息并删除(仅在特定时间和群组)
|
|
||||||
# 只在提肛群组且每天北京时间 15:00-16:00 之间删除
|
def check_caption_for_chinese(message: Message) -> bool:
|
||||||
|
"""检查媒体消息的 caption 是否包含中文"""
|
||||||
|
if hasattr(message, 'caption') and message.caption:
|
||||||
|
return contains_non_ascii(message.caption)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def check_link_preview_for_chinese(message: Message) -> bool:
|
||||||
|
"""检查消息链接预览是否包含中文"""
|
||||||
|
# 检查 link_preview_options(如果存在)
|
||||||
|
if hasattr(message, 'link_preview_options') and message.link_preview_options:
|
||||||
|
lpo = message.link_preview_options
|
||||||
|
if hasattr(lpo, 'url') and lpo.url and contains_non_ascii(lpo.url):
|
||||||
|
return True
|
||||||
|
|
||||||
|
# 检查 web_page(链接预览的详细信息)
|
||||||
|
if hasattr(message, 'web_page') and message.web_page:
|
||||||
|
wp = message.web_page
|
||||||
|
# 检查标题
|
||||||
|
if hasattr(wp, 'title') and wp.title and contains_non_ascii(wp.title):
|
||||||
|
return True
|
||||||
|
# 检查描述
|
||||||
|
if hasattr(wp, 'description') and wp.description and contains_non_ascii(wp.description):
|
||||||
|
return True
|
||||||
|
# 检查站点名称
|
||||||
|
if hasattr(wp, 'site_name') and wp.site_name and contains_non_ascii(wp.site_name):
|
||||||
|
return True
|
||||||
|
# 检查 URL
|
||||||
|
if hasattr(wp, 'url') and wp.url and contains_non_ascii(wp.url):
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def message_has_url(message: Message) -> bool:
|
||||||
|
"""检查消息是否包含 URL"""
|
||||||
|
# 检查 entities 中是否有 URL 类型
|
||||||
|
if hasattr(message, 'entities') and message.entities:
|
||||||
|
for entity in message.entities:
|
||||||
|
if entity.type in ('url', 'text_link'):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
@non_llm_handler
|
||||||
|
def check_and_delete_message_with_url(message: Message, bot: TeleBot):
|
||||||
|
"""检测并删除包含 URL 的消息(因为链接预览可能包含中文)"""
|
||||||
|
beijing_tz = zoneinfo.ZoneInfo("Asia/Shanghai")
|
||||||
|
current_time = datetime.now(tz=beijing_tz)
|
||||||
|
|
||||||
|
try:
|
||||||
|
bot.delete_message(message.chat.id, message.message_id)
|
||||||
|
bot.send_message(
|
||||||
|
message.chat.id,
|
||||||
|
f"已删除 @{message.from_user.username or message.from_user.full_name} 的消息:禁止中文时段不允许发送链接(预览可能包含中文)",
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
"Deleted message with URL from user %s in chat %d at %s",
|
||||||
|
message.from_user.full_name,
|
||||||
|
message.chat.id,
|
||||||
|
current_time.strftime("%H:%M:%S"),
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to delete message with URL: %s", e)
|
||||||
|
|
||||||
|
|
||||||
|
@non_llm_handler
|
||||||
|
def check_and_delete_chinese_link_preview(message: Message, bot: TeleBot):
|
||||||
|
"""检测并删除链接预览包含中文的消息(仅在特定时间和群组)"""
|
||||||
|
beijing_tz = zoneinfo.ZoneInfo("Asia/Shanghai")
|
||||||
|
current_time = datetime.now(tz=beijing_tz)
|
||||||
|
|
||||||
|
try:
|
||||||
|
bot.delete_message(message.chat.id, message.message_id)
|
||||||
|
bot.send_message(
|
||||||
|
message.chat.id,
|
||||||
|
f"已删除 @{message.from_user.username or message.from_user.full_name} 的消息:链接预览包含中文",
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
"Deleted message with Chinese link preview from user %s in chat %d at %s",
|
||||||
|
message.from_user.full_name,
|
||||||
|
message.chat.id,
|
||||||
|
current_time.strftime("%H:%M:%S"),
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to delete message with Chinese link preview: %s", e)
|
||||||
|
|
||||||
|
|
||||||
|
@non_llm_handler
|
||||||
|
def check_and_delete_chinese_poll(message: Message, bot: TeleBot):
|
||||||
|
"""检测并删除包含中文的投票消息(仅在特定时间和群组)"""
|
||||||
|
beijing_tz = zoneinfo.ZoneInfo("Asia/Shanghai")
|
||||||
|
current_time = datetime.now(tz=beijing_tz)
|
||||||
|
|
||||||
|
try:
|
||||||
|
bot.delete_message(message.chat.id, message.message_id)
|
||||||
|
bot.send_message(
|
||||||
|
message.chat.id,
|
||||||
|
f"已删除 @{message.from_user.username or message.from_user.full_name} 的投票:投票内容不能包含中文",
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
"Deleted Chinese poll from user %s in chat %d at %s",
|
||||||
|
message.from_user.full_name,
|
||||||
|
message.chat.id,
|
||||||
|
current_time.strftime("%H:%M:%S"),
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to delete poll message: %s", e)
|
||||||
|
|
||||||
|
|
||||||
|
@non_llm_handler
|
||||||
|
def check_and_delete_chinese_caption(message: Message, bot: TeleBot):
|
||||||
|
"""检测并删除 caption 包含中文的媒体消息(仅在特定时间和群组)"""
|
||||||
|
beijing_tz = zoneinfo.ZoneInfo("Asia/Shanghai")
|
||||||
|
current_time = datetime.now(tz=beijing_tz)
|
||||||
|
|
||||||
|
try:
|
||||||
|
bot.delete_message(message.chat.id, message.message_id)
|
||||||
|
bot.send_message(
|
||||||
|
message.chat.id,
|
||||||
|
f"已删除 @{message.from_user.username or message.from_user.full_name} 的消息:图片/文件说明不能包含中文",
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
"Deleted message with Chinese caption from user %s in chat %d at %s",
|
||||||
|
message.from_user.full_name,
|
||||||
|
message.chat.id,
|
||||||
|
current_time.strftime("%H:%M:%S"),
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to delete message with Chinese caption: %s", e)
|
||||||
|
|
||||||
|
|
||||||
|
@non_llm_handler
|
||||||
|
def check_and_delete_chinese(message: Message, bot: TeleBot):
|
||||||
|
"""检测并删除中文消息(仅在特定时间和群组)"""
|
||||||
|
# 只在提肛群组且每天北京时间 15:00-16:00 之间删除所有含中文的消息(包括命令及其参数)
|
||||||
if (
|
if (
|
||||||
TIGONG_CHAT_ID
|
TIGONG_CHAT_ID
|
||||||
and message.chat.id == TIGONG_CHAT_ID
|
and message.chat.id == TIGONG_CHAT_ID
|
||||||
@@ -61,25 +207,41 @@ def handle_message(message: Message, bot: TeleBot):
|
|||||||
):
|
):
|
||||||
beijing_tz = zoneinfo.ZoneInfo("Asia/Shanghai")
|
beijing_tz = zoneinfo.ZoneInfo("Asia/Shanghai")
|
||||||
current_time = datetime.now(tz=beijing_tz)
|
current_time = datetime.now(tz=beijing_tz)
|
||||||
current_hour = current_time.hour
|
is_command = message.text.startswith("/")
|
||||||
|
|
||||||
# 检查是否在北京时间 15:00-16:00 之间
|
try:
|
||||||
if 15 <= current_hour < 16:
|
bot.delete_message(message.chat.id, message.message_id)
|
||||||
try:
|
|
||||||
bot.delete_message(message.chat.id, message.message_id)
|
if is_command:
|
||||||
|
bot.send_message(
|
||||||
|
message.chat.id,
|
||||||
|
f"已删除 @{message.from_user.username or message.from_user.full_name} 的消息:命令参数不能包含中文",
|
||||||
|
)
|
||||||
|
else:
|
||||||
bot.send_message(
|
bot.send_message(
|
||||||
message.chat.id,
|
message.chat.id,
|
||||||
f"已删除 @{message.from_user.username or message.from_user.full_name} 的中文消息",
|
f"已删除 @{message.from_user.username or message.from_user.full_name} 的中文消息",
|
||||||
)
|
)
|
||||||
logger.info(
|
|
||||||
"Deleted Chinese message from user %s in chat %d at %s",
|
logger.info(
|
||||||
message.from_user.full_name,
|
"Deleted Chinese message from user %s in chat %d at %s (is_command: %s)",
|
||||||
message.chat.id,
|
message.from_user.full_name,
|
||||||
current_time.strftime("%H:%M:%S"),
|
message.chat.id,
|
||||||
)
|
current_time.strftime("%H:%M:%S"),
|
||||||
return
|
is_command,
|
||||||
except Exception as e:
|
)
|
||||||
logger.error("Failed to delete message: %s", e)
|
except Exception as e:
|
||||||
|
logger.error("Failed to delete message: %s", e)
|
||||||
|
|
||||||
|
|
||||||
|
@non_llm_handler
|
||||||
|
def handle_message(message: Message, bot: TeleBot):
|
||||||
|
logger.debug(
|
||||||
|
"Received message: %s, chat_id=%d, from=%s",
|
||||||
|
message.text,
|
||||||
|
message.chat.id,
|
||||||
|
message.from_user.id,
|
||||||
|
)
|
||||||
|
|
||||||
store.add_message(
|
store.add_message(
|
||||||
ChatMessage(
|
ChatMessage(
|
||||||
@@ -349,11 +511,66 @@ def schedule_tigong_reminders(bot: TeleBot):
|
|||||||
logger.info("Tigong reminder scheduler started")
|
logger.info("Tigong reminder scheduler started")
|
||||||
|
|
||||||
|
|
||||||
load_priority = 5
|
load_priority = 1 # 设置最高优先级,让中文检测先注册,但其他处理器仍然会执行
|
||||||
if settings.openai_api_key:
|
if settings.openai_api_key:
|
||||||
|
|
||||||
def register(bot: TeleBot):
|
def register(bot: TeleBot):
|
||||||
"""注册命令处理器"""
|
"""注册命令处理器"""
|
||||||
|
# 首先注册中文检测处理器(最高优先级)
|
||||||
|
# 只在特定时间段(15:00-16:00)和提肛群组中过滤中文消息
|
||||||
|
if TIGONG_CHAT_ID:
|
||||||
|
chinese_filter = lambda msg: (
|
||||||
|
msg.text is not None
|
||||||
|
and msg.chat.id == TIGONG_CHAT_ID
|
||||||
|
and is_chinese_ban_time() # 先判断时间
|
||||||
|
and contains_non_ascii(msg.text)
|
||||||
|
)
|
||||||
|
# 处理新消息
|
||||||
|
bot.register_message_handler(
|
||||||
|
check_and_delete_chinese,
|
||||||
|
func=chinese_filter,
|
||||||
|
pass_bot=True,
|
||||||
|
)
|
||||||
|
# 处理编辑后的消息
|
||||||
|
bot.register_edited_message_handler(
|
||||||
|
check_and_delete_chinese,
|
||||||
|
func=chinese_filter,
|
||||||
|
pass_bot=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 处理包含中文的投票
|
||||||
|
poll_filter = lambda msg: (
|
||||||
|
hasattr(msg, 'poll') and msg.poll is not None
|
||||||
|
and msg.chat.id == TIGONG_CHAT_ID
|
||||||
|
and is_chinese_ban_time()
|
||||||
|
and check_poll_for_chinese(msg)
|
||||||
|
)
|
||||||
|
bot.register_message_handler(
|
||||||
|
check_and_delete_chinese_poll,
|
||||||
|
func=poll_filter,
|
||||||
|
pass_bot=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 处理 caption 包含中文的媒体消息(图片、视频、文档等)
|
||||||
|
caption_filter = lambda msg: (
|
||||||
|
msg.chat.id == TIGONG_CHAT_ID
|
||||||
|
and is_chinese_ban_time()
|
||||||
|
and check_caption_for_chinese(msg)
|
||||||
|
)
|
||||||
|
bot.register_message_handler(
|
||||||
|
check_and_delete_chinese_caption,
|
||||||
|
func=caption_filter,
|
||||||
|
content_types=['photo', 'video', 'document', 'audio', 'voice', 'video_note', 'animation'],
|
||||||
|
pass_bot=True,
|
||||||
|
)
|
||||||
|
bot.register_edited_message_handler(
|
||||||
|
check_and_delete_chinese_caption,
|
||||||
|
func=caption_filter,
|
||||||
|
content_types=['photo', 'video', 'document', 'audio', 'voice', 'video_note', 'animation'],
|
||||||
|
pass_bot=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 然后注册命令处理器
|
||||||
bot.register_message_handler(
|
bot.register_message_handler(
|
||||||
summary_command, commands=["summary"], pass_bot=True
|
summary_command, commands=["summary"], pass_bot=True
|
||||||
)
|
)
|
||||||
@@ -368,10 +585,9 @@ if settings.openai_api_key:
|
|||||||
bot.register_message_handler(
|
bot.register_message_handler(
|
||||||
confirm_command, commands=["confirm"], pass_bot=True
|
confirm_command, commands=["confirm"], pass_bot=True
|
||||||
)
|
)
|
||||||
|
# 最后注册普通消息处理器(只处理非命令消息)
|
||||||
bot.register_message_handler(
|
bot.register_message_handler(
|
||||||
handle_message,
|
handle_message, func=partial(filter_message, bot=bot), pass_bot=True
|
||||||
func=partial(filter_message, bot=bot, check_chinese=True),
|
|
||||||
pass_bot=True,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# 启动提肛提醒定时任务
|
# 启动提肛提醒定时任务
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import re
|
import re
|
||||||
import zoneinfo
|
import zoneinfo
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
from urllib.parse import unquote
|
||||||
|
|
||||||
from telebot import TeleBot
|
from telebot import TeleBot
|
||||||
from telebot.types import Message
|
from telebot.types import Message
|
||||||
@@ -15,29 +16,30 @@ PROMPT = """\
|
|||||||
|
|
||||||
|
|
||||||
def contains_non_ascii(text: str) -> bool:
|
def contains_non_ascii(text: str) -> bool:
|
||||||
return not text.isascii()
|
# 首先检查原始文本
|
||||||
|
if not text.isascii():
|
||||||
|
return True
|
||||||
|
# 然后检查 URL decode 后的文本(处理 %XX 编码的中文)
|
||||||
|
try:
|
||||||
|
decoded = unquote(text)
|
||||||
|
if not decoded.isascii():
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def filter_message(message: Message, bot: TeleBot, check_chinese: bool = False) -> bool:
|
def filter_message(message: Message, bot: TeleBot) -> bool:
|
||||||
"""过滤消息,排除非文本消息和命令消息
|
"""过滤消息,排除非文本消息和命令消息"""
|
||||||
|
|
||||||
Args:
|
|
||||||
message: 消息对象
|
|
||||||
bot: Bot 实例
|
|
||||||
check_chinese: 是否允许检查中文消息(即不过滤命令)
|
|
||||||
"""
|
|
||||||
if not message.text:
|
if not message.text:
|
||||||
return False
|
return False
|
||||||
if not message.from_user:
|
if not message.from_user:
|
||||||
return False
|
return False
|
||||||
if message.from_user.id == bot.get_me().id:
|
if message.from_user.id == bot.get_me().id:
|
||||||
return False
|
return False
|
||||||
# 如果需要检查中文,则不过滤命令消息(让 handle_message 处理)
|
if message.text.startswith("/"):
|
||||||
if not check_chinese and message.text.startswith("/"):
|
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
date_regex = re.compile(r"^(\d+)([dhm])$")
|
date_regex = re.compile(r"^(\d+)([dhm])$")
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user