diff --git a/README.md b/README.md index e682b09..790eb95 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,22 @@ Note, if you are using third party service, you need to `export OPENAI_API_BASE= Note, currently its support dify Chatbot with instructions(System prompt) and different MODEL with its parameters. +## Bot -> Cohere + +1. visit https://dashboard.cohere.com/api-keys get the key +2. export COHERE_API_KEY=${the_key} +3. use `cohere: ${message}` to ask + +## Bot -> `Telegra.ph` + +1. https://t.me/telegraph Create or login Telegraph account +2. `Log in as ${Account} on this device` +3. On Browser at https://telegra.ph/, press F12 or right click and inspect +4. Go to Application -> Storage -> Cookies -> https://telegra.ph/ +5. The token at `tph_token` is the token for telegra.ph API + +Do not share the token with others, it's like a password. + ## HOW TO Install and Run ### Manually install diff --git a/handlers/__init__.py b/handlers/__init__.py index 9a960ea..58074e3 100644 --- a/handlers/__init__.py +++ b/handlers/__init__.py @@ -200,10 +200,261 @@ def image_to_data_uri(file_path): return f"data:image/png;base64,{encoded_image}" +import requests +import json +import markdown # pip install Markdown +from bs4 import BeautifulSoup # pip install beautifulsoup4 + + +class TelegraphAPI: + def __init__(self, access_token): + self.access_token = access_token + self.base_url = "https://api.telegra.ph" + + # Get account info on initialization + account_info = self.get_account_info() + self.short_name = account_info.get("short_name") + self.author_name = account_info.get("author_name") + self.author_url = account_info.get("author_url") + + def create_page( + self, title, content, author_name=None, author_url=None, return_content=False + ): + """ + Creates a new Telegraph page. + + Args: + title (str): Page title (1-256 characters). + content (list): Content of the page as a list of Node dictionaries. + author_name (str, optional): Author name (0-128 characters). Defaults to account's author_name. + author_url (str, optional): Profile link (0-512 characters). Defaults to account's author_url. + return_content (bool, optional): If True, return the content field in the response. + + Returns: + str: URL of the created page. + + Raises: + requests.exceptions.RequestException: If the request fails. + + + """ + url = f"{self.base_url}/createPage" + data = { + "access_token": self.access_token, + "title": title, + "content": json.dumps(content), + "return_content": return_content, + # Use provided author info or fall back to account info + "author_name": author_name if author_name else self.author_name, + "author_url": author_url if author_url else self.author_url, + } + + response = requests.post(url, data=data) + response.raise_for_status() + response = response.json() + page_url = response["result"]["url"] + return page_url + + def get_account_info(self): + """ + Gets information about the Telegraph account. + + Returns: + dict: Account information including short_name, author_name, and author_url. + Returns None if there's an error. + """ + url = f"{self.base_url}/getAccountInfo?access_token={self.access_token}" # &fields=[\"author_name\",\"author_url\"] for specific fields + response = requests.get(url) + + if response.status_code == 200: + return response.json()["result"] + else: + print(f"Fail getting telegra.ph token info: {response.status_code}") + return None + + def edit_page( + self, + path, + title, + content, + author_name=None, + author_url=None, + return_content=False, + ): + """ + Edits an existing Telegraph page. + + Args: + path (str): Path of the page to edit. + title (str): New page title (1-256 characters). + content (list): New content of the page as a list of Node dictionaries. + author_name (str, optional): Author name (0-128 characters). Defaults to account's author_name. + author_url (str, optional): Profile link (0-512 characters). Defaults to account's author_url. + return_content (bool, optional): If True, return the content field in the response. + + Returns: + str: URL of the edited page. + + Raises: + requests.exceptions.RequestException: If the request fails. + """ + url = f"{self.base_url}/editPage" + data = { + "access_token": self.access_token, + "path": path, + "title": title, + "content": json.dumps(content), + "return_content": return_content, + # Use provided author info or fall back to account info + "author_name": author_name if author_name else self.author_name, + "author_url": author_url if author_url else self.author_url, + } + + response = requests.post(url, data=data) + response.raise_for_status() + response = response.json() + + page_url = response["result"]["url"] + return page_url + + def get_page(self, path): + """ + Gets information about a Telegraph page. + + Args: + path (str): Path of the page to get. + + Returns: + dict: Information about the page. + """ + url = f"{self.base_url}/getPage/{path}?return_content=true" + response = requests.get(url) + response.raise_for_status() + return response.json()["result"] + + def create_page_md( + self, + title, + markdown_text, + author_name=None, + author_url=None, + return_content=False, + ): + """ + Creates a new Telegraph page from markdown text. + + Args: + title (str): Page title (1-256 characters). + markdown_text (str): Markdown text to convert to HTML. + author_name (str, optional): Author name (0-128 characters). Defaults to account's author_name. + author_url (str, optional): Profile link (0-512 characters). Defaults to account's author_url. + return_content (bool, optional): If True, return the content field in the response. + + Returns: + str: URL of the created page. + + Raises: + requests.exceptions.RequestException: If the request fails. + """ + content = md_to_dom(markdown_text) + return self.create_page(title, content, author_name, author_url, return_content) + + def edit_page_md( + self, + path, + title, + markdown_text, + author_name=None, + author_url=None, + return_content=False, + ): + content = md_to_dom(markdown_text) + return self.edit_page( + path, title, content, author_name, author_url, return_content + ) + + +def md_to_dom(markdown_text): + """Converts markdown text to a Python dictionary representing the DOM, + limiting heading levels to h3 and h4. + + Args: + markdown_text: The markdown text to convert. + + Returns: + A Python list representing the DOM, where each element is a dictionary + with the following keys: + - 'tag': The tag name of the element. + - 'attributes': A dictionary of attributes for the element (optional). + - 'children': A list of child elements (optional). + """ + + # Convert markdown to HTML + html = markdown.markdown( + markdown_text, + extensions=["markdown.extensions.extra", "markdown.extensions.sane_lists"], + ) + + # Parse the HTML with BeautifulSoup + soup = BeautifulSoup(html, "html.parser") + + def parse_element(element): + tag_dict = {"tag": element.name} + if element.name in ["h1", "h2", "h3", "h4", "h5", "h6"]: + if element.name == "h1": + tag_dict["tag"] = "h3" + elif element.name == "h2": + tag_dict["tag"] = "h4" + else: + tag_dict["tag"] = "p" + tag_dict["children"] = [{"tag": "strong", "children": element.contents}] + + # Correctly handle children for h1-h6 + if element.attrs: + tag_dict["attributes"] = element.attrs + if element.contents: + children = [] + for child in element.contents: + if isinstance(child, str): + # Remove leading/trailing whitespace from text nodes + children.append(child.strip()) + else: # it's another tag + children.append(parse_element(child)) + tag_dict["children"] = children + else: + if element.attrs: + tag_dict["attributes"] = element.attrs + if element.contents: + children = [] + for child in element.contents: + if isinstance(child, str): + # Remove leading/trailing whitespace from text nodes + children.append(child.strip()) + else: # it's another tag + children.append(parse_element(child)) + if children: + tag_dict["children"] = children + return tag_dict + + new_dom = [] + for element in soup.contents: + if isinstance(element, str) and not element.strip(): + # Skip empty text nodes + continue + elif isinstance(element, str): + # Treat remaining text nodes as separate elements for clarity + new_dom.append({"tag": "text", "content": element.strip()}) + else: + new_dom.append(parse_element(element)) + + return new_dom + + # `import *` will give you these __all__ = [ "bot_reply_first", "bot_reply_markdown", "enrich_text_with_urls", "image_to_data_uri", + "TelegraphAPI", ] diff --git a/handlers/cohere.py b/handlers/cohere.py new file mode 100644 index 0000000..69b5fc1 --- /dev/null +++ b/handlers/cohere.py @@ -0,0 +1,211 @@ +from os import environ +import time + +from telebot import TeleBot +from telebot.types import Message +from expiringdict import ExpiringDict + +from . import * + +import cohere +from telegramify_markdown import convert +from telegramify_markdown.customize import markdown_symbol + +markdown_symbol.head_level_1 = "📌" # If you want, Customizing the head level 1 symbol +markdown_symbol.link = "🔗" # If you want, Customizing the link symbol + +COHERE_API_KEY = environ.get("COHERE_API_KEY") +COHERE_MODEL = "command-r-plus" + +TELEGRA_PH_TOKEN = environ.get("TELEGRA_PH_TOKEN") +if TELEGRA_PH_TOKEN: + ph = TelegraphAPI(TELEGRA_PH_TOKEN) + +if COHERE_API_KEY: + co = cohere.Client(api_key=COHERE_API_KEY) + +# Global history cache +cohere_player_dict = ExpiringDict(max_len=1000, max_age_seconds=300) + + +def cohere_handler_direct(message: Message, bot: TeleBot) -> None: + """cohere : /cohere """ + m = message.text.strip() + + player_message = [] + if str(message.from_user.id) not in cohere_player_dict: + cohere_player_dict[str(message.from_user.id)] = player_message + else: + player_message = cohere_player_dict[str(message.from_user.id)] + + if m.strip() == "clear": + bot.reply_to( + message, + "Just cleared your Cohere messages history", + ) + player_message.clear() + return + + if m[:4].lower() == "new ": + m = m[4:].strip() + player_message.clear() + + m = enrich_text_with_urls(m) + + who = "Command R Plus" + reply_id = bot_reply_first(message, who, bot) + + player_message.append({"role": "User", "message": m}) + # keep the last 5, every has two ask and answer. + if len(player_message) > 10: + player_message = player_message[2:] + + try: + stream = co.chat_stream( + model=COHERE_MODEL, + message=m, + temperature=0.8, + chat_history=player_message, + prompt_truncation="AUTO", + connectors=[{"id": "web-search"}], + citation_quality="accurate", + ) + + s = "" + source = "" + start = time.time() + for event in stream: + if event.event_type == "text-generation": + s += event.text.encode("utf-8").decode("utf-8") + if time.time() - start > 1.2: + start = time.time() + bot_reply_markdown(reply_id, who, s, bot, split_text=True) + elif event.event_type == "search-results": + for doc in event.documents: + source += f"\n[{doc['title']}]({doc['url']})" + elif event.event_type == "stream-end": + break + + s += "\n" + source + "\n" + + if not bot_reply_markdown(reply_id, who, s, bot): + # maybe not complete + # maybe the same message + player_message.clear() + return + + player_message.append( + { + "role": "Chatbot", + "message": convert(s), + } + ) + + except Exception as e: + print(e) + bot_reply_markdown(reply_id, who, "Answer wrong", bot) + player_message.clear() + return + + +def cohere_handler(message: Message, bot: TeleBot) -> None: + """cohere : /cohere """ + m = message.text.strip() + + player_message = [] + if str(message.from_user.id) not in cohere_player_dict: + cohere_player_dict[str(message.from_user.id)] = player_message + else: + player_message = cohere_player_dict[str(message.from_user.id)] + + if m.strip() == "clear": + bot.reply_to( + message, + "Just cleared your Cohere messages history", + ) + player_message.clear() + return + + if m[:4].lower() == "new ": + m = m[4:].strip() + player_message.clear() + + m = enrich_text_with_urls(m) + + who = "Command R Plus" + reply_id = bot_reply_first(message, who, bot) + + player_message.append({"role": "User", "message": m}) + # keep the last 5, every has two ask and answer. + if len(player_message) > 10: + player_message = player_message[2:] + + try: + stream = co.chat_stream( + model=COHERE_MODEL, + message=m, + temperature=0.8, + chat_history=player_message, + prompt_truncation="AUTO", + connectors=[{"id": "web-search"}], + citation_quality="accurate", + ) + + s = "" + source = "" + start = time.time() + for event in stream: + if event.event_type == "text-generation": + s += event.text.encode("utf-8").decode("utf-8") + if time.time() - start > 1.2: + start = time.time() + bot_reply_markdown(reply_id, who, s, bot, split_text=False) + elif event.event_type == "search-results": + for doc in event.documents: + source += f"\n{doc['title']}\n{doc['url']}\n" + elif event.event_type == "stream-end": + break + content = s + "\n------\n------\n" + source + ph_s = ph.create_page_md(title="Cohere", markdown_text=content) + s += f"\n\n[View]({ph_s})" + + if not bot_reply_markdown(reply_id, who, s, bot): + # maybe not complete + # maybe the same message + player_message.clear() + return + + player_message.append( + { + "role": "Chatbot", + "message": convert(s), + } + ) + + except Exception as e: + print(e) + bot_reply_markdown(reply_id, who, "Answer wrong", bot) + player_message.clear() + return + + +if COHERE_API_KEY: + if not TELEGRA_PH_TOKEN: + + def register(bot: TeleBot) -> None: + bot.register_message_handler( + cohere_handler_direct, commands=["cohere"], pass_bot=True + ) + bot.register_message_handler( + cohere_handler_direct, regexp="^cohere:", pass_bot=True + ) + + else: + + def register(bot: TeleBot) -> None: + bot.register_message_handler( + cohere_handler_direct, commands=["cohere"], pass_bot=True + ) + bot.register_message_handler( + cohere_handler_direct, regexp="^cohere:", pass_bot=True + ) diff --git a/handlers/useful.py b/handlers/useful.py index 686f000..0416502 100644 --- a/handlers/useful.py +++ b/handlers/useful.py @@ -18,6 +18,14 @@ from . import * from telegramify_markdown.customize import markdown_symbol +import cohere + +COHERE_API_KEY = environ.get("COHERE_API_KEY") +TELEGRA_PH_TOKEN = environ.get("TELEGRA_PH_TOKEN") +co = cohere.Client(api_key=COHERE_API_KEY) +ph = TelegraphAPI(TELEGRA_PH_TOKEN) +COHERE_MODEL = "command-r-plus" + chat_message_dict = ExpiringDict(max_len=100, max_age_seconds=120) chat_user_dict = ExpiringDict(max_len=100, max_age_seconds=20) @@ -91,6 +99,7 @@ def latest_handle_messages(message: Message, bot: TeleBot): "sd", "map", "yi", + "cohere", ) ): return @@ -120,6 +129,7 @@ def answer_it_handler(message: Message, bot: TeleBot): latest_message = chat_message_dict.get(chat_id) m = latest_message.text.strip() m = enrich_text_with_urls(m) + full = "" ##### Gemini ##### who = "Gemini Pro" # show something, make it more responsible @@ -141,6 +151,8 @@ def answer_it_handler(message: Message, bot: TeleBot): convo.history.clear() bot_reply_markdown(reply_id, who, "Error", bot) + full += f"{who}:\n{s}" + chat_id_list = [reply_id.message_id] ##### ChatGPT ##### who = "ChatGPT Pro" reply_id = bot_reply_first(latest_message, who, bot) @@ -173,6 +185,83 @@ def answer_it_handler(message: Message, bot: TeleBot): print(e) bot_reply_markdown(reply_id, who, "answer wrong", bot) + full += f"\n---\n{who}:\n{s}" + chat_id_list.append(reply_id.message_id) + + ##### Cohere ##### + if COHERE_API_KEY: + full, chat_id = cohere_answer(latest_message, bot, full, m) + chat_id_list.append(chat_id) + else: + pass + + ##### Answer ##### + if TELEGRA_PH_TOKEN: + final_answer(latest_message, bot, full, chat_id_list) + else: + pass + + +def cohere_answer(latest_message: Message, bot: TeleBot, full, m): + """cohere answer""" + who = "Command R Plus" + reply_id = bot_reply_first(latest_message, who, bot) + + player_message = [{"role": "User", "message": m}] + + try: + r = co.chat_stream( + model=COHERE_MODEL, + message=m, + temperature=0.4, + chat_history=player_message, + prompt_truncation="AUTO", + connectors=[{"id": "web-search"}], + citation_quality="fast", + ) + s = "" + source = "" + start = time.time() + for event in r: + if event.event_type == "text-generation": + s += event.text.encode("utf-8").decode("utf-8") + if time.time() - start > 1.2: + start = time.time() + bot_reply_markdown(reply_id, who, s, bot, split_text=False) + elif event.event_type == "search-results": + for doc in event.documents: + source += f"\n[{doc['title']}]({doc['url']})" + elif event.event_type == "stream-end": + break + + # maybe not complete + # maybe the same message + try: + bot_reply_markdown(reply_id, who, s, bot) + except: + pass + + except Exception as e: + print(e) + bot_reply_markdown(reply_id, who, "Answer wrong", bot) + + content = s + "\n------\n" + source + full += f"\n---\n{who}:\n{content}" + chat_id = reply_id.chat.id + return full, chat_id + + +def final_answer(latest_message: Message, bot: TeleBot, full, list): + """final answer""" + who = "Answer" + reply_id = bot_reply_first(latest_message, who, bot) + ph_s = ph.create_page_md(title="Answer it", markdown_text=full) + bot_reply_markdown(reply_id, who, f"[View]({ph_s})", bot) + # delete the chat message, only leave a telegra.ph link + + # for i in list: + # bot.delete_message(chat_id=chat_id, message_id=i) + if GOOGLE_GEMINI_KEY and CHATGPT_API_KEY: