yihong0618 9e5d5db767 feat: support yi
Signed-off-by: yihong0618 <zouzou0208@gmail.com>
2024-03-17 14:55:39 +08:00

165 lines
4.8 KiB
Python

import base64
from os import environ
from pathlib import Path
from openai import OpenAI
import requests
from telebot import TeleBot
from telebot.types import Message
import telegramify_markdown
from telegramify_markdown.customize import markdown_symbol
markdown_symbol.head_level_1 = "📌" # If you want, Customizing the head level 1 symbol
markdown_symbol.link = "🔗" # If you want, Customizing the link symbol
YI_BASE_URL = environ.get("YI_BASE_URL")
YI_API_KEY = environ.get("YI_API_KEY")
YI_MODEL = "yi-34b-chat-0205" # also support yi-34b-chat-200k
client = OpenAI(
# defaults to os.environ.get("OPENAI_API_KEY")
api_key=YI_API_KEY,
base_url=YI_BASE_URL,
)
print(client.base_url)
# Global history cache
yi_player_dict = {}
def yi_handler(message: Message, bot: TeleBot) -> None:
"""yi : /yi <question>"""
m = message.text.strip()
player_message = []
# restart will lose all TODO
if str(message.from_user.id) not in yi_player_dict:
yi_player_dict[str(message.from_user.id)] = (
player_message # for the imuutable list
)
else:
player_message = yi_player_dict[str(message.from_user.id)]
player_message.append({"role": "user", "content": m})
# keep the last 5, every has two ask and answer.
if len(player_message) > 10:
player_message = player_message[2:]
yi_reply_text = ""
try:
if len(player_message) > 2:
if player_message[-1]["role"] == player_message[-2]["role"]:
# tricky
player_message.pop()
r = client.chat.completions.create(messages=player_message, model=YI_MODEL)
content = r.choices[0].message.content.encode("utf8").decode()
print(content)
if not content:
yi_reply_text = "yi did not answer."
player_message.pop()
else:
yi_reply_text = content
player_message.append(
{
"role": "assistant",
"content": content,
}
)
except Exception as e:
print(e)
bot.reply_to(
message,
"yi answer:\n" + "yi answer timeout",
parse_mode="MarkdownV2",
)
# pop my user
player_message.pop()
return
try:
bot.reply_to(
message,
"yi answer:\n" + telegramify_markdown.convert(yi_reply_text),
parse_mode="MarkdownV2",
)
return
except:
print("wrong markdown format")
bot.reply_to(
message,
"yi answer:\n\n" + yi_reply_text,
)
return
def _image_to_data_uri(file_path):
with open(file_path, "rb") as image_file:
encoded_image = base64.b64encode(image_file.read()).decode("utf-8")
return f"data:image/png;base64,{encoded_image}"
def yi_photo_handler(message: Message, bot: TeleBot) -> None:
s = message.caption
bot.reply_to(
message,
"Generating yi vision answer please wait.",
)
prompt = s.strip()
# get the high quaility picture.
max_size_photo = max(message.photo, key=lambda p: p.file_size)
file_path = bot.get_file(max_size_photo.file_id).file_path
downloaded_file = bot.download_file(file_path)
with open("yi_temp.jpg", "wb") as temp_file:
temp_file.write(downloaded_file)
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {client.api_key}",
}
payload = {
"model": "yi-vl-plus",
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {"url": _image_to_data_uri("yi_temp.jpg")},
},
],
}
],
"max_tokens": 2048,
}
response = requests.post(
f"https://api.lingyiwanwu.com/v1/chat/completions",
headers=headers,
json=payload,
).json()
print(response)
try:
text = response["choices"][0]["message"]["content"].encode("utf8").decode()
bot.reply_to(message, "yi vision answer:\n" + text)
except Exception as e:
print(e)
bot.reply_to(
message,
"yi vision answer:\n" + "yi vision answer wrong",
parse_mode="MarkdownV2",
)
def register(bot: TeleBot) -> None:
bot.register_message_handler(yi_handler, commands=["yi"], pass_bot=True)
bot.register_message_handler(yi_handler, regexp="^yi:", pass_bot=True)
bot.register_message_handler(
yi_photo_handler,
content_types=["photo"],
func=lambda m: m.caption and m.caption.startswith(("yi:", "/yi")),
pass_bot=True,
)