mirror of
				https://github.com/cdryzun/tg_bot_collections.git
				synced 2025-11-04 08:46:44 +08:00 
			
		
		
		
	Merge pull request #31 from F4ria/use-del-remove-player-history-for-gemini
use 'del' to remove the player's history for gemini
This commit is contained in:
		@ -3,6 +3,7 @@ import re
 | 
				
			|||||||
import time
 | 
					import time
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import google.generativeai as genai
 | 
					import google.generativeai as genai
 | 
				
			||||||
 | 
					from google.generativeai import ChatSession
 | 
				
			||||||
from google.generativeai.types.generation_types import StopCandidateException
 | 
					from google.generativeai.types.generation_types import StopCandidateException
 | 
				
			||||||
from telebot import TeleBot
 | 
					from telebot import TeleBot
 | 
				
			||||||
from telebot.types import Message
 | 
					from telebot.types import Message
 | 
				
			||||||
@ -38,7 +39,7 @@ gemini_pro_player_dict = {}
 | 
				
			|||||||
gemini_file_player_dict = {}
 | 
					gemini_file_player_dict = {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def make_new_gemini_convo(is_pro=False):
 | 
					def make_new_gemini_convo(is_pro=False) -> ChatSession:
 | 
				
			||||||
    model_name = "models/gemini-1.0-pro-latest"
 | 
					    model_name = "models/gemini-1.0-pro-latest"
 | 
				
			||||||
    if is_pro:
 | 
					    if is_pro:
 | 
				
			||||||
        model_name = "models/gemini-1.5-pro-latest"
 | 
					        model_name = "models/gemini-1.5-pro-latest"
 | 
				
			||||||
@ -52,26 +53,46 @@ def make_new_gemini_convo(is_pro=False):
 | 
				
			|||||||
    return convo
 | 
					    return convo
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def remove_gemini_player(player_id: str, is_pro: bool) -> None:
 | 
				
			||||||
 | 
					    if is_pro:
 | 
				
			||||||
 | 
					        if player_id in gemini_pro_player_dict:
 | 
				
			||||||
 | 
					            del gemini_pro_player_dict[player_id]
 | 
				
			||||||
 | 
					        if player_id in gemini_file_player_dict:
 | 
				
			||||||
 | 
					            del gemini_file_player_dict[player_id]
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        if player_id in gemini_player_dict:
 | 
				
			||||||
 | 
					            del gemini_player_dict[player_id]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def get_gemini_player(player_id: str, is_pro: bool) -> ChatSession:
 | 
				
			||||||
 | 
					    player = None
 | 
				
			||||||
 | 
					    if is_pro:
 | 
				
			||||||
 | 
					        if player_id not in gemini_pro_player_dict:
 | 
				
			||||||
 | 
					            gemini_pro_player_dict[player_id] = make_new_gemini_convo(is_pro)
 | 
				
			||||||
 | 
					        player = gemini_pro_player_dict[player_id]
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        if player_id not in gemini_player_dict:
 | 
				
			||||||
 | 
					            gemini_player_dict[player_id] = make_new_gemini_convo()
 | 
				
			||||||
 | 
					        player = gemini_player_dict[player_id]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return player
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def gemini_handler(message: Message, bot: TeleBot) -> None:
 | 
					def gemini_handler(message: Message, bot: TeleBot) -> None:
 | 
				
			||||||
    """Gemini : /gemini <question>"""
 | 
					    """Gemini : /gemini <question>"""
 | 
				
			||||||
    m = message.text.strip()
 | 
					    m = message.text.strip()
 | 
				
			||||||
    player = None
 | 
					    player_id = str(message.from_user.id)
 | 
				
			||||||
    # restart will lose all TODO
 | 
					    is_pro = False
 | 
				
			||||||
    if str(message.from_user.id) not in gemini_player_dict:
 | 
					 | 
				
			||||||
        player = make_new_gemini_convo()
 | 
					 | 
				
			||||||
        gemini_player_dict[str(message.from_user.id)] = player
 | 
					 | 
				
			||||||
    else:
 | 
					 | 
				
			||||||
        player = gemini_player_dict[str(message.from_user.id)]
 | 
					 | 
				
			||||||
    if m.strip() == "clear":
 | 
					    if m.strip() == "clear":
 | 
				
			||||||
        bot.reply_to(
 | 
					        bot.reply_to(message, "just clear you gemini messages history")
 | 
				
			||||||
            message,
 | 
					        remove_gemini_player(player_id, is_pro)
 | 
				
			||||||
            "just clear you gemini messages history",
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
        player.history.clear()
 | 
					 | 
				
			||||||
        return
 | 
					        return
 | 
				
			||||||
    if m[:4].lower() == "new ":
 | 
					    if m[:4].lower() == "new ":
 | 
				
			||||||
        m = m[4:].strip()
 | 
					        m = m[4:].strip()
 | 
				
			||||||
        player.history.clear()
 | 
					        remove_gemini_player(player_id, is_pro)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # restart will lose all TODO
 | 
				
			||||||
 | 
					    player = get_gemini_player(player_id, is_pro)
 | 
				
			||||||
    m = enrich_text_with_urls(m)
 | 
					    m = enrich_text_with_urls(m)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    who = "Gemini"
 | 
					    who = "Gemini"
 | 
				
			||||||
@ -105,28 +126,18 @@ def gemini_handler(message: Message, bot: TeleBot) -> None:
 | 
				
			|||||||
def gemini_pro_handler(message: Message, bot: TeleBot) -> None:
 | 
					def gemini_pro_handler(message: Message, bot: TeleBot) -> None:
 | 
				
			||||||
    """Gemini : /gemini_pro <question>"""
 | 
					    """Gemini : /gemini_pro <question>"""
 | 
				
			||||||
    m = message.text.strip()
 | 
					    m = message.text.strip()
 | 
				
			||||||
    player = None
 | 
					    player_id = str(message.from_user.id)
 | 
				
			||||||
    # restart will lose all TODO
 | 
					    is_pro = True
 | 
				
			||||||
    if str(message.from_user.id) not in gemini_pro_player_dict:
 | 
					 | 
				
			||||||
        player = make_new_gemini_convo(is_pro=True)
 | 
					 | 
				
			||||||
        gemini_pro_player_dict[str(message.from_user.id)] = player
 | 
					 | 
				
			||||||
    else:
 | 
					 | 
				
			||||||
        player = gemini_pro_player_dict[str(message.from_user.id)]
 | 
					 | 
				
			||||||
    if m.strip() == "clear":
 | 
					    if m.strip() == "clear":
 | 
				
			||||||
        bot.reply_to(
 | 
					        bot.reply_to(message, "just clear you gemini messages history")
 | 
				
			||||||
            message,
 | 
					        remove_gemini_player(player_id, is_pro)
 | 
				
			||||||
            "just clear you gemini messages history",
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
        player.history.clear()
 | 
					 | 
				
			||||||
        # also need to clear the data file
 | 
					 | 
				
			||||||
        if gemini_file_player_dict.get(str(message.from_user.id)):
 | 
					 | 
				
			||||||
            del gemini_file_player_dict[str(message.from_user.id)]
 | 
					 | 
				
			||||||
        return
 | 
					        return
 | 
				
			||||||
    if m[:4].lower() == "new ":
 | 
					    if m[:4].lower() == "new ":
 | 
				
			||||||
        m = m[4:].strip()
 | 
					        m = m[4:].strip()
 | 
				
			||||||
        player.history.clear()
 | 
					        remove_gemini_player(player_id, is_pro)
 | 
				
			||||||
        if gemini_file_player_dict.get(str(message.from_user.id)):
 | 
					
 | 
				
			||||||
            del gemini_file_player_dict[str(message.from_user.id)]
 | 
					    # restart will lose all TODO
 | 
				
			||||||
 | 
					    player = get_gemini_player(player_id, is_pro)
 | 
				
			||||||
    m = enrich_text_with_urls(m)
 | 
					    m = enrich_text_with_urls(m)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    who = "Gemini Pro"
 | 
					    who = "Gemini Pro"
 | 
				
			||||||
@ -138,7 +149,7 @@ def gemini_pro_handler(message: Message, bot: TeleBot) -> None:
 | 
				
			|||||||
        player.history = player.history[2:]
 | 
					        player.history = player.history[2:]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    try:
 | 
					    try:
 | 
				
			||||||
        if path := gemini_file_player_dict.get(str(message.from_user.id)):
 | 
					        if path := gemini_file_player_dict.get(player_id):
 | 
				
			||||||
            m = [m, path]
 | 
					            m = [m, path]
 | 
				
			||||||
        r = player.send_message(m, stream=True)
 | 
					        r = player.send_message(m, stream=True)
 | 
				
			||||||
        s = ""
 | 
					        s = ""
 | 
				
			||||||
@ -200,26 +211,21 @@ def gemini_audio_handler(message: Message, bot: TeleBot) -> None:
 | 
				
			|||||||
    s = message.caption
 | 
					    s = message.caption
 | 
				
			||||||
    prompt = s.strip()
 | 
					    prompt = s.strip()
 | 
				
			||||||
    who = "Gemini File Audio"
 | 
					    who = "Gemini File Audio"
 | 
				
			||||||
    player = None
 | 
					    player_id = str(message.from_user.id)
 | 
				
			||||||
    # restart will lose all TODO
 | 
					    # restart will lose all TODO
 | 
				
			||||||
    if str(message.from_user.id) not in gemini_pro_player_dict:
 | 
					    player = get_gemini_player(player_id, is_pro=True)
 | 
				
			||||||
        player = make_new_gemini_convo(is_pro=True)
 | 
					 | 
				
			||||||
        gemini_pro_player_dict[str(message.from_user.id)] = player
 | 
					 | 
				
			||||||
    else:
 | 
					 | 
				
			||||||
        player = gemini_pro_player_dict[str(message.from_user.id)]
 | 
					 | 
				
			||||||
    file_path = None
 | 
					    file_path = None
 | 
				
			||||||
    # restart will lose all TODO
 | 
					 | 
				
			||||||
    # for file handler like {user_id: [player, file_path], user_id2: [player, file_path]}
 | 
					    # for file handler like {user_id: [player, file_path], user_id2: [player, file_path]}
 | 
				
			||||||
    reply_id = bot_reply_first(message, who, bot)
 | 
					    reply_id = bot_reply_first(message, who, bot)
 | 
				
			||||||
    file_path = bot.get_file(message.audio.file_id).file_path
 | 
					    file_path = bot.get_file(message.audio.file_id).file_path
 | 
				
			||||||
    downloaded_file = bot.download_file(file_path)
 | 
					    downloaded_file = bot.download_file(file_path)
 | 
				
			||||||
    path = f"{str(message.from_user.id)}_gemini.mp3"
 | 
					    path = f"{player_id}_gemini.mp3"
 | 
				
			||||||
    with open(path, "wb") as temp_file:
 | 
					    with open(path, "wb") as temp_file:
 | 
				
			||||||
        temp_file.write(downloaded_file)
 | 
					        temp_file.write(downloaded_file)
 | 
				
			||||||
    gemini_mp3_file = genai.upload_file(path=path)
 | 
					    gemini_mp3_file = genai.upload_file(path=path)
 | 
				
			||||||
    r = player.send_message([prompt, gemini_mp3_file], stream=True)
 | 
					    r = player.send_message([prompt, gemini_mp3_file], stream=True)
 | 
				
			||||||
    # need set it for the conversation
 | 
					    # need set it for the conversation
 | 
				
			||||||
    gemini_file_player_dict[str(message.from_user.id)] = gemini_mp3_file
 | 
					    gemini_file_player_dict[player_id] = gemini_mp3_file
 | 
				
			||||||
    try:
 | 
					    try:
 | 
				
			||||||
        s = ""
 | 
					        s = ""
 | 
				
			||||||
        start = time.time()
 | 
					        start = time.time()
 | 
				
			||||||
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user