mirror of
				https://github.com/cdryzun/tg_bot_collections.git
				synced 2025-11-04 16:56:43 +08:00 
			
		
		
		
	@ -1,6 +1,7 @@
 | 
			
		||||
from telebot import TeleBot
 | 
			
		||||
from telebot.types import Message
 | 
			
		||||
import requests
 | 
			
		||||
from openai import OpenAI
 | 
			
		||||
from os import environ
 | 
			
		||||
 | 
			
		||||
from . import *
 | 
			
		||||
@ -8,6 +9,13 @@ from . import *
 | 
			
		||||
 | 
			
		||||
SD_API_KEY = environ.get("SD3_KEY")
 | 
			
		||||
 | 
			
		||||
# TODO refactor this shit to __init__
 | 
			
		||||
CHATGPT_API_KEY = environ.get("OPENAI_API_KEY")
 | 
			
		||||
CHATGPT_BASE_URL = environ.get("OPENAI_API_BASE") or "https://api.openai.com/v1"
 | 
			
		||||
CHATGPT_PRO_MODEL = "gpt-4o-2024-05-13"
 | 
			
		||||
 | 
			
		||||
client = OpenAI(api_key=CHATGPT_API_KEY, base_url=CHATGPT_BASE_URL, timeout=20)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_user_balance():
 | 
			
		||||
    api_host = "https://api.stability.ai"
 | 
			
		||||
@ -67,8 +75,46 @@ def sd_handler(message: Message, bot: TeleBot):
 | 
			
		||||
        bot.reply_to(message, "sd3 error")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if SD_API_KEY:
 | 
			
		||||
def sd_pro_handler(message: Message, bot: TeleBot):
 | 
			
		||||
    """pretty sd3_pro: /sd3_pro <address>"""
 | 
			
		||||
    credits = get_user_balance()
 | 
			
		||||
    m = message.text.strip()
 | 
			
		||||
    prompt = m.strip()
 | 
			
		||||
    rewrite_prompt = (
 | 
			
		||||
        f"revise `{prompt}` to a DALL-E prompt only return the prompt in English."
 | 
			
		||||
    )
 | 
			
		||||
    completion = client.chat.completions.create(
 | 
			
		||||
        messages=[{"role": "user", "content": rewrite_prompt}],
 | 
			
		||||
        max_tokens=2048,
 | 
			
		||||
        model=CHATGPT_PRO_MODEL,
 | 
			
		||||
    )
 | 
			
		||||
    sd_prompt = completion.choices[0].message.content.encode("utf8").decode()
 | 
			
		||||
    # drop all the Chinese characters
 | 
			
		||||
    sd_prompt = "".join([i for i in sd_prompt if ord(i) < 128])
 | 
			
		||||
    bot.reply_to(
 | 
			
		||||
        message,
 | 
			
		||||
        f"Generating pretty sd3-turbo image may take some time please left credits {credits} every try will cost 4 criedits wait:\n the real prompt is: {sd_prompt}",
 | 
			
		||||
    )
 | 
			
		||||
    try:
 | 
			
		||||
        r = generate_sd3_image(sd_prompt)
 | 
			
		||||
        if r:
 | 
			
		||||
            with open(f"sd3.jpeg", "rb") as photo:
 | 
			
		||||
                bot.send_photo(
 | 
			
		||||
                    message.chat.id, photo, reply_to_message_id=message.message_id
 | 
			
		||||
                )
 | 
			
		||||
        else:
 | 
			
		||||
            bot.reply_to(message, "prompt error")
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        print(e)
 | 
			
		||||
        bot.reply_to(message, "sd3 error")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if SD_API_KEY and CHATGPT_API_KEY:
 | 
			
		||||
 | 
			
		||||
    def register(bot: TeleBot) -> None:
 | 
			
		||||
        bot.register_message_handler(sd_handler, commands=["sd3"], pass_bot=True)
 | 
			
		||||
        bot.register_message_handler(sd_handler, regexp="^sd3:", pass_bot=True)
 | 
			
		||||
        bot.register_message_handler(
 | 
			
		||||
            sd_pro_handler, commands=["sd3_pro"], pass_bot=True
 | 
			
		||||
        )
 | 
			
		||||
        bot.register_message_handler(sd_pro_handler, regexp="^sd3_pro:", pass_bot=True)
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user