mirror of
				https://github.com/cdryzun/tg_bot_collections.git
				synced 2025-11-04 16:56:43 +08:00 
			
		
		
		
	feat: get user stats in the group
Signed-off-by: Keming <kemingy94@gmail.com>
This commit is contained in:
		@ -94,6 +94,23 @@ def stats_command(message: Message, bot: TeleBot):
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@non_llm_handler
 | 
			
		||||
def user_stats_command(message: Message, bot: TeleBot):
 | 
			
		||||
    """看看谁才是最能摸鱼的"""
 | 
			
		||||
    user_stats = store.get_user_stats(message.chat.id)
 | 
			
		||||
    if not user_stats:
 | 
			
		||||
        bot.reply_to(message, "没有找到任何用户统计信息。")
 | 
			
		||||
        return
 | 
			
		||||
    user_stats_text = "\n".join(
 | 
			
		||||
        f"{entry.user_name}: {entry.message_count}" for entry in user_stats
 | 
			
		||||
    )
 | 
			
		||||
    bot.reply_to(
 | 
			
		||||
        message,
 | 
			
		||||
        f"👤 用户消息统计信息:\n```\n{user_stats_text}\n```",
 | 
			
		||||
        parse_mode="MarkdownV2",
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@non_llm_handler
 | 
			
		||||
def search_command(message: Message, bot: TeleBot):
 | 
			
		||||
    """搜索群组消息(示例:/search 关键词 [N])"""
 | 
			
		||||
@ -134,6 +151,9 @@ if settings.openai_api_key:
 | 
			
		||||
            summary_command, commands=["summary"], pass_bot=True
 | 
			
		||||
        )
 | 
			
		||||
        bot.register_message_handler(stats_command, commands=["stats"], pass_bot=True)
 | 
			
		||||
        bot.register_message_handler(
 | 
			
		||||
            user_stats_command, commands=["userstats"], pass_bot=True
 | 
			
		||||
        )
 | 
			
		||||
        bot.register_message_handler(search_command, commands=["search"], pass_bot=True)
 | 
			
		||||
        bot.register_message_handler(
 | 
			
		||||
            handle_message, func=partial(filter_message, bot=bot)
 | 
			
		||||
 | 
			
		||||
@ -20,6 +20,13 @@ class StatsEntry:
 | 
			
		||||
    message_count: int
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@dataclass(frozen=True)
 | 
			
		||||
class UserStatsEntry:
 | 
			
		||||
    user_id: int
 | 
			
		||||
    user_name: str
 | 
			
		||||
    message_count: int
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class MessageStore:
 | 
			
		||||
    def __init__(self, db_file: str):
 | 
			
		||||
        parent_folder = os.path.dirname(db_file)
 | 
			
		||||
@ -124,6 +131,25 @@ class MessageStore:
 | 
			
		||||
            rows = cursor.fetchall()
 | 
			
		||||
        return [StatsEntry(date=row[0], message_count=row[1]) for row in rows]
 | 
			
		||||
 | 
			
		||||
    def get_user_stats(self, chat_id: int, topk: int = 10) -> list[UserStatsEntry]:
 | 
			
		||||
        with self.connect() as conn:
 | 
			
		||||
            self._clean_old_messages(chat_id, conn)
 | 
			
		||||
            cursor = conn.cursor()
 | 
			
		||||
            cursor.execute(
 | 
			
		||||
                """
 | 
			
		||||
                SELECT user_id, 
 | 
			
		||||
                    (SELECT user_name FROM messages m0 WHERE m0.user_id = m1.user_id LIMIT 1) AS name,
 | 
			
		||||
                    COUNT(*) AS num
 | 
			
		||||
                FROM messages m1
 | 
			
		||||
                WHERE chat_id = ?
 | 
			
		||||
                GROUP BY user_id
 | 
			
		||||
                ORDER BY num DESC
 | 
			
		||||
                LIMIT ?;""",
 | 
			
		||||
                (chat_id, topk),
 | 
			
		||||
            )
 | 
			
		||||
            rows = cursor.fetchall()
 | 
			
		||||
            return [UserStatsEntry(*row) for row in rows]
 | 
			
		||||
 | 
			
		||||
    def search_messages(
 | 
			
		||||
        self, chat_id: int, keyword: str, limit: int = 10
 | 
			
		||||
    ) -> list[ChatMessage]:
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user