diff --git a/handlers/chatgpt.py b/handlers/chatgpt.py new file mode 100644 index 0000000..927fb7a --- /dev/null +++ b/handlers/chatgpt.py @@ -0,0 +1,246 @@ +import base64 +from os import environ +from pathlib import Path +import time + +from openai import OpenAI +from telebot import TeleBot +from telebot.types import Message + +from . import * + +from telegramify_markdown import convert +from telegramify_markdown.customize import markdown_symbol + +markdown_symbol.head_level_1 = "📌" # If you want, Customizing the head level 1 symbol +markdown_symbol.link = "🔗" # If you want, Customizing the link symbol + +CHATGPT_API_KEY = environ.get("OPENAI_API_KEY") +CHATGPT_BASE_URL = environ.get("OPENAI_API_BASE") or "https://api.openai.com/v1" +CHATGPT_MODEL = "gpt-3.5-turbo" +CHATGPT_PRO_MODEL = "gpt-4-turbo" + + +client = OpenAI(api_key=CHATGPT_API_KEY, base_url=CHATGPT_BASE_URL, timeout=20) + + +# Global history cache +chatgpt_player_dict = {} +chatgpt_pro_player_dict = {} + + +def chatgpt_handler(message: Message, bot: TeleBot) -> None: + """chatgpt : /chatgpt """ + m = message.text.strip() + + player_message = [] + # restart will lose all TODO + if str(message.from_user.id) not in chatgpt_player_dict: + chatgpt_player_dict[str(message.from_user.id)] = ( + player_message # for the imuutable list + ) + else: + player_message = chatgpt_player_dict[str(message.from_user.id)] + if m.strip() == "clear": + bot.reply_to( + message, + "just clear your chatgpt messages history", + ) + player_message.clear() + return + + # show something, make it more responsible + reply_id = bot_reply_first(message, "ChatGPT", bot) + + player_message.append({"role": "user", "content": m}) + # keep the last 5, every has two ask and answer. + if len(player_message) > 10: + player_message = player_message[2:] + + chatgpt_reply_text = "" + try: + r = client.chat.completions.create( + messages=player_message, max_tokens=1024, model=CHATGPT_MODEL + ) + content = r.choices[0].message.content.encode("utf8").decode() + if not content: + chatgpt_reply_text = "chatgpt did not answer." + player_message.pop() + else: + chatgpt_reply_text = content + player_message.append( + { + "role": "assistant", + "content": chatgpt_reply_text, + } + ) + + except Exception as e: + print(e) + bot.reply_to( + message, + "ChatGPT answer:\n" + "chatgpt answer timeout", + parse_mode="MarkdownV2", + ) + # pop my user + player_message.pop() + return + + # reply back as Markdown and fallback to plain text if failed. + bot_reply_markdown(reply_id, "ChatGPT", chatgpt_reply_text, bot) + + +def chatgpt_pro_handler(message: Message, bot: TeleBot) -> None: + """chatgpt_pro : /chatgpt_pro """ + m = message.text.strip() + + player_message = [] + # restart will lose all TODO + if str(message.from_user.id) not in chatgpt_player_dict: + chatgpt_player_dict[str(message.from_user.id)] = ( + player_message # for the imuutable list + ) + else: + player_message = chatgpt_player_dict[str(message.from_user.id)] + if m.strip() == "clear": + bot.reply_to( + message, + "just clear your chatgpt messages history", + ) + player_message.clear() + return + + reply_id = bot_reply_first(message, "ChatGPT", bot) + + player_message.append({"role": "user", "content": m}) + # keep the last 5, every has two ask and answer. + if len(player_message) > 10: + player_message = player_message[2:] + + try: + r = client.chat.completions.create( + messages=player_message, + max_tokens=2048, + model=CHATGPT_PRO_MODEL, + stream=True, + ) + s = "" + start = time.time() + for chunk in r: + if chunk.choices[0].delta.content is None: + break + s += chunk.choices[0].delta.content + + if time.time() - start > 1.7: + start = time.time() + try: + bot.edit_message_text( + message_id=reply_id.message_id, + chat_id=reply_id.chat.id, + text=convert(s), + parse_mode="MarkdownV2", + ) + except Exception as e: + print(str(e)) + try: + # maybe not complete + # maybe the same message + bot.edit_message_text( + message_id=reply_id.message_id, + chat_id=reply_id.chat.id, + text=convert(s), + parse_mode="MarkdownV2", + ) + except Exception as e: + player_message.clear() + print(str(e)) + return + + player_message.append( + { + "role": "assistant", + "content": convert(s), + } + ) + + except Exception as e: + bot.reply_to( + message, + "chatgpt answer:\n" + "chatgpt answer timeout", + parse_mode="MarkdownV2", + ) + # pop my user + player_message.clear() + return + + +def _image_to_data_uri(file_path): + with open(file_path, "rb") as image_file: + encoded_image = base64.b64encode(image_file.read()).decode("utf-8") + return f"data:image/png;base64,{encoded_image}" + + +def chatgpt_photo_handler(message: Message, bot: TeleBot) -> None: + s = message.caption + reply_message = bot.reply_to( + message, + "Generating chatgpt vision answer please wait.", + ) + prompt = s.strip() + # get the high quaility picture. + max_size_photo = max(message.photo, key=lambda p: p.file_size) + file_path = bot.get_file(max_size_photo.file_id).file_path + downloaded_file = bot.download_file(file_path) + with open("chatgpt_temp.jpg", "wb") as temp_file: + temp_file.write(downloaded_file) + + try: + r = client.chat.completions.create( + max_tokens=1024, + messages=[ + { + "role": "user", + "content": [ + {"type": "text", "text": prompt}, + { + "type": "image_url", + "image_url": { + "url": _image_to_data_uri("chatgpt_temp.jpg") + }, + }, + ], + } + ], + model=CHATGPT_PRO_MODEL, + ) + bot.reply_to( + message, + "ChatGPT vision answer:\n" + + r.choices[0].message.content.encode("utf8").decode(), + ) + except Exception as e: + print(e) + bot.reply_to( + message, + "ChatGPT vision answer:\n" + "chatgpt vision answer wrong", + parse_mode="MarkdownV2", + ) + finally: + bot.delete_message(reply_message.chat.id, reply_message.message_id) + + +def register(bot: TeleBot) -> None: + bot.register_message_handler(chatgpt_handler, commands=["chatgpt"], pass_bot=True) + bot.register_message_handler(chatgpt_handler, regexp="^chatgpt:", pass_bot=True) + bot.register_message_handler( + chatgpt_pro_handler, commands=["chatgpt_pro"], pass_bot=True + ) + bot.register_message_handler( + chatgpt_pro_handler, regexp="^chatgpt_pro:", pass_bot=True + ) + bot.register_message_handler( + chatgpt_photo_handler, + content_types=["photo"], + func=lambda m: m.caption and m.caption.startswith(("chatgpt:", "/chatgpt")), + pass_bot=True, + )