Merge pull request #37 from alterxyz/main

feat: Cohere Telegraph
This commit is contained in:
yihong 2024-06-25 19:22:27 +08:00 committed by GitHub
commit 4b1e1a2683
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 591 additions and 1 deletions

3
.gitignore vendored
View File

@ -168,4 +168,5 @@ nohup.out
*.mp4 *.mp4
*.pdf *.pdf
.pdm-python .pdm-python
*.wav *.wav
token_key.json

View File

@ -75,6 +75,34 @@ Note, if you are using third party service, you need to `export OPENAI_API_BASE=
Note, currently its support dify Chatbot with instructions(System prompt) and different MODEL with its parameters. Note, currently its support dify Chatbot with instructions(System prompt) and different MODEL with its parameters.
## Bot -> Cohere
1. visit https://dashboard.cohere.com/api-keys get the key
2. export COHERE_API_KEY=${the_key}
3. use `cohere: ${message}` to ask
## Function -> Telegraph
### Skip token (default)
You do not need to do anything.
But you may not be able to edit any generated post since you do not have the token.
### Store token (recommended)
Change "Store_Token" to "True" in "handlers/__init__.py" TelegraphAPI/_create_ph_account. It will store the token in "token_key.json".
### Get token manually from Telegram account
1. https://t.me/telegraph Create or login Telegraph account
2. `Log in as ${Account} on this device`
3. On Browser at https://telegra.ph/, press F12 or right click and inspect
4. Go to Application -> Storage -> Cookies -> https://telegra.ph/
5. The token at `tph_token` is the token for telegra.ph API
Do not share the token with others, it's like a password.
## HOW TO Install and Run ## HOW TO Install and Run
### Manually install ### Manually install

View File

@ -200,10 +200,232 @@ def image_to_data_uri(file_path):
return f"data:image/png;base64,{encoded_image}" return f"data:image/png;base64,{encoded_image}"
import json
import requests
import os
from bs4 import BeautifulSoup
import markdown
class TelegraphAPI:
def __init__(
self,
access_token=None,
short_name="tg_bot_collections",
author_name="Telegram Bot Collections",
author_url=None,
):
self.access_token = (
access_token
if access_token
else self._create_ph_account(short_name, author_name, author_url)
)
self.base_url = "https://api.telegra.ph"
# Get account info on initialization
account_info = self.get_account_info()
self.short_name = account_info.get("short_name")
self.author_name = account_info.get("author_name")
self.author_url = account_info.get("author_url")
def _create_ph_account(self, short_name, author_name, author_url):
Store_Token = False
TELEGRAPH_API_URL = "https://api.telegra.ph/createAccount"
TOKEN_FILE = "token_key.json"
# Try to load existing token information
try:
with open(TOKEN_FILE, "r") as f:
tokens = json.load(f)
if "TELEGRA_PH_TOKEN" in tokens and tokens["TELEGRA_PH_TOKEN"] != "example":
return tokens["TELEGRA_PH_TOKEN"]
except FileNotFoundError:
tokens = {}
# If no existing valid token in TOKEN_FILE, create a new account
data = {
"short_name": short_name,
"author_name": author_name,
"author_url": author_url,
}
# Make API request
response = requests.post(TELEGRAPH_API_URL, data=data)
response.raise_for_status()
account = response.json()
access_token = account["result"]["access_token"]
# Update the token in the dictionary
tokens["TELEGRA_PH_TOKEN"] = access_token
# Store the updated tokens
if Store_Token:
with open(TOKEN_FILE, "w") as f:
json.dump(tokens, f, indent=4)
else:
print(f"Token not stored to file, but here is your token:\n{access_token}")
# Store it to the environment variable
os.environ["TELEGRA_PH_TOKEN"] = access_token
return access_token
def create_page(
self, title, content, author_name=None, author_url=None, return_content=False
):
url = f"{self.base_url}/createPage"
data = {
"access_token": self.access_token,
"title": title,
"content": json.dumps(content),
"return_content": return_content,
"author_name": author_name if author_name else self.author_name,
"author_url": author_url if author_url else self.author_url,
}
response = requests.post(url, data=data)
response.raise_for_status()
response = response.json()
page_url = response["result"]["url"]
return page_url
def get_account_info(self):
url = f'{self.base_url}/getAccountInfo?access_token={self.access_token}&fields=["short_name","author_name","author_url","auth_url"]'
response = requests.get(url)
if response.status_code == 200:
return response.json()["result"]
else:
print(f"Fail getting telegra.ph token info: {response.status_code}")
return None
def edit_page(
self,
path,
title,
content,
author_name=None,
author_url=None,
return_content=False,
):
url = f"{self.base_url}/editPage"
data = {
"access_token": self.access_token,
"path": path,
"title": title,
"content": json.dumps(content),
"return_content": return_content,
"author_name": author_name if author_name else self.author_name,
"author_url": author_url if author_url else self.author_url,
}
response = requests.post(url, data=data)
response.raise_for_status()
response = response.json()
page_url = response["result"]["url"]
return page_url
def get_page(self, path):
url = f"{self.base_url}/getPage/{path}?return_content=true"
response = requests.get(url)
response.raise_for_status()
return response.json()["result"]
def create_page_md(
self,
title,
markdown_text,
author_name=None,
author_url=None,
return_content=False,
):
content = self._md_to_dom(markdown_text)
return self.create_page(title, content, author_name, author_url, return_content)
def edit_page_md(
self,
path,
title,
markdown_text,
author_name=None,
author_url=None,
return_content=False,
):
content = self._md_to_dom(markdown_text)
return self.edit_page(
path, title, content, author_name, author_url, return_content
)
def authorize_browser(self):
url = f'{self.base_url}/getAccountInfo?access_token={self.access_token}&fields=["auth_url"]'
response = requests.get(url)
response.raise_for_status()
return response.json()["result"]["auth_url"]
def _md_to_dom(self, markdown_text):
html = markdown.markdown(
markdown_text,
extensions=["markdown.extensions.extra", "markdown.extensions.sane_lists"],
)
soup = BeautifulSoup(html, "html.parser")
def parse_element(element):
tag_dict = {"tag": element.name}
if element.name in ["h1", "h2", "h3", "h4", "h5", "h6"]:
if element.name == "h1":
tag_dict["tag"] = "h3"
elif element.name == "h2":
tag_dict["tag"] = "h4"
else:
tag_dict["tag"] = "p"
tag_dict["children"] = [
{"tag": "strong", "children": element.contents}
]
if element.attrs:
tag_dict["attributes"] = element.attrs
if element.contents:
children = []
for child in element.contents:
if isinstance(child, str):
children.append(child.strip())
else:
children.append(parse_element(child))
tag_dict["children"] = children
else:
if element.attrs:
tag_dict["attributes"] = element.attrs
if element.contents:
children = []
for child in element.contents:
if isinstance(child, str):
children.append(child.strip())
else:
children.append(parse_element(child))
if children:
tag_dict["children"] = children
return tag_dict
new_dom = []
for element in soup.contents:
if isinstance(element, str) and not element.strip():
continue
elif isinstance(element, str):
new_dom.append({"tag": "text", "content": element.strip()})
else:
new_dom.append(parse_element(element))
return new_dom
# `import *` will give you these # `import *` will give you these
__all__ = [ __all__ = [
"bot_reply_first", "bot_reply_first",
"bot_reply_markdown", "bot_reply_markdown",
"enrich_text_with_urls", "enrich_text_with_urls",
"image_to_data_uri", "image_to_data_uri",
"TelegraphAPI",
] ]

231
handlers/cohere.py Normal file
View File

@ -0,0 +1,231 @@
from os import environ
import time
import datetime
from telebot import TeleBot
from telebot.types import Message
from expiringdict import ExpiringDict
from . import *
import cohere
from telegramify_markdown import convert
from telegramify_markdown.customize import markdown_symbol
markdown_symbol.head_level_1 = "📌" # If you want, Customizing the head level 1 symbol
markdown_symbol.link = "🔗" # If you want, Customizing the link symbol
COHERE_API_KEY = environ.get("COHERE_API_KEY")
COHERE_MODEL = "command-r-plus"
if COHERE_API_KEY:
co = cohere.Client(api_key=COHERE_API_KEY)
TELEGRA_PH_TOKEN = environ.get("TELEGRA_PH_TOKEN")
ph = TelegraphAPI(TELEGRA_PH_TOKEN)
# Global history cache
cohere_player_dict = ExpiringDict(max_len=1000, max_age_seconds=300)
def cohere_handler_direct(message: Message, bot: TeleBot) -> None:
"""cohere : /cohere <question>"""
m = message.text.strip()
player_message = []
if str(message.from_user.id) not in cohere_player_dict:
cohere_player_dict[str(message.from_user.id)] = player_message
else:
player_message = cohere_player_dict[str(message.from_user.id)]
if m.strip() == "clear":
bot.reply_to(
message,
"Just cleared your Cohere messages history",
)
player_message.clear()
return
if m[:4].lower() == "new ":
m = m[4:].strip()
player_message.clear()
m = enrich_text_with_urls(m)
who = "Command R Plus"
reply_id = bot_reply_first(message, who, bot)
player_message.append({"role": "User", "message": m})
# keep the last 5, every has two ask and answer.
if len(player_message) > 10:
player_message = player_message[2:]
try:
stream = co.chat_stream(
model=COHERE_MODEL,
message=m,
temperature=0.8,
chat_history=player_message,
prompt_truncation="AUTO",
connectors=[{"id": "web-search"}],
citation_quality="accurate",
preamble=f"You are Command R+, a large language model trained to have polite, helpful, inclusive conversations with people. The current time in Tornoto is {datetime.datetime.now(datetime.timezone.utc).astimezone().strftime('%Y-%m-%d %H:%M:%S')}, in Los Angeles is {datetime.datetime.now(datetime.timezone.utc).astimezone().astimezone(datetime.timezone(datetime.timedelta(hours=-7))).strftime('%Y-%m-%d %H:%M:%S')}, and in China is {datetime.datetime.now(datetime.timezone.utc).astimezone(datetime.timezone(datetime.timedelta(hours=8))).strftime('%Y-%m-%d %H:%M:%S')}.",
)
s = ""
source = ""
start = time.time()
for event in stream:
if event.event_type == "stream-start":
bot_reply_markdown(reply_id, who, "Thinking...", bot)
elif event.event_type == "search-queries-generation":
bot_reply_markdown(reply_id, who, "Searching online...", bot)
elif event.event_type == "search-results":
bot_reply_markdown(reply_id, who, "Reading...", bot)
for doc in event.documents:
source += f"\n[{doc['title']}]({doc['url']})"
elif event.event_type == "text-generation":
s += event.text.encode("utf-8").decode("utf-8")
if time.time() - start > 0.4:
start = time.time()
bot_reply_markdown(
reply_id,
who,
f"\nStill thinking{len(s)}...",
bot,
split_text=True,
)
elif event.event_type == "stream-end":
break
s += "\n" + source + "\n"
try:
bot_reply_markdown(reply_id, who, s, bot, split_text=True)
except:
pass
player_message.append(
{
"role": "Chatbot",
"message": convert(s),
}
)
except Exception as e:
print(e)
bot_reply_markdown(reply_id, who, "Answer wrong", bot)
player_message.clear()
return
def cohere_handler(message: Message, bot: TeleBot) -> None:
"""cohere : /cohere <question> This will return a telegraph link"""
m = message.text.strip()
player_message = []
if str(message.from_user.id) not in cohere_player_dict:
cohere_player_dict[str(message.from_user.id)] = player_message
else:
player_message = cohere_player_dict[str(message.from_user.id)]
if m.strip() == "clear":
bot.reply_to(
message,
"Just cleared your Cohere messages history",
)
player_message.clear()
return
if m[:4].lower() == "new ":
m = m[4:].strip()
player_message.clear()
m = enrich_text_with_urls(m)
who = "Command R Plus"
reply_id = bot_reply_first(message, who, bot)
player_message.append({"role": "User", "message": m})
# keep the last 5, every has two ask and answer.
if len(player_message) > 10:
player_message = player_message[2:]
try:
stream = co.chat_stream(
model=COHERE_MODEL,
message=m,
temperature=0.8,
chat_history=player_message,
prompt_truncation="AUTO",
connectors=[{"id": "web-search"}],
citation_quality="accurate",
preamble=f"You are Command R+, a large language model trained to have polite, helpful, inclusive conversations with people. The current time in Tornoto is {datetime.datetime.now(datetime.timezone.utc).astimezone().strftime('%Y-%m-%d %H:%M:%S')}, in Los Angeles is {datetime.datetime.now(datetime.timezone.utc).astimezone().astimezone(datetime.timezone(datetime.timedelta(hours=-7))).strftime('%Y-%m-%d %H:%M:%S')}, and in China is {datetime.datetime.now(datetime.timezone.utc).astimezone(datetime.timezone(datetime.timedelta(hours=8))).strftime('%Y-%m-%d %H:%M:%S')}.",
)
s = ""
source = ""
start = time.time()
for event in stream:
if event.event_type == "stream-start":
bot_reply_markdown(reply_id, who, "Thinking...", bot)
elif event.event_type == "search-queries-generation":
bot_reply_markdown(reply_id, who, "Searching online...", bot)
elif event.event_type == "search-results":
bot_reply_markdown(reply_id, who, "Reading...", bot)
for doc in event.documents:
source += f"\n{doc['title']}\n{doc['url']}\n"
elif event.event_type == "text-generation":
s += event.text.encode("utf-8").decode("utf-8")
if time.time() - start > 0.4:
start = time.time()
bot_reply_markdown(
reply_id,
who,
f"\nStill thinking{len(s)}...",
bot,
split_text=True,
)
elif event.event_type == "stream-end":
break
content = (
s
+ "\n------\n------\n"
+ source
+ f"\n------\n------\nLast Update{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
ph_s = ph.create_page_md(
title="Cohere", markdown_text=content
) # or edit_page with get_page so not producing massive pages
s += f"\n\n[View]({ph_s})"
try:
bot_reply_markdown(reply_id, who, s, bot, split_text=True)
except:
pass
player_message.append(
{
"role": "Chatbot",
"message": convert(s),
}
)
except Exception as e:
print(e)
bot_reply_markdown(reply_id, who, "Answer wrong", bot)
player_message.clear()
return
if COHERE_API_KEY:
def register(bot: TeleBot) -> None:
bot.register_message_handler(
cohere_handler_direct, commands=["cohere_no_ph"], pass_bot=True
)
bot.register_message_handler(
cohere_handler_direct, regexp="^cohere_no_ph:", pass_bot=True
)
def register(bot: TeleBot) -> None:
bot.register_message_handler(cohere_handler, commands=["cohere"], pass_bot=True)
bot.register_message_handler(cohere_handler, regexp="^cohere:", pass_bot=True)

View File

@ -5,6 +5,7 @@ from telebot.types import Message
from expiringdict import ExpiringDict from expiringdict import ExpiringDict
from os import environ from os import environ
import time import time
import datetime
from openai import OpenAI from openai import OpenAI
import google.generativeai as genai import google.generativeai as genai
@ -18,6 +19,21 @@ from . import *
from telegramify_markdown.customize import markdown_symbol from telegramify_markdown.customize import markdown_symbol
#### Cohere init ####
import cohere
COHERE_API_KEY = environ.get("COHERE_API_KEY")
COHERE_MODEL = "command-r-plus"
# if you want to use cohere for answer it, set it to True
USE_CHHERE = False
if COHERE_API_KEY:
co = cohere.Client(api_key=COHERE_API_KEY)
#### Telegraph init ####
TELEGRA_PH_TOKEN = environ.get("TELEGRA_PH_TOKEN")
ph = TelegraphAPI(TELEGRA_PH_TOKEN)
#### Telegraph done ####
chat_message_dict = ExpiringDict(max_len=100, max_age_seconds=120) chat_message_dict = ExpiringDict(max_len=100, max_age_seconds=120)
chat_user_dict = ExpiringDict(max_len=100, max_age_seconds=20) chat_user_dict = ExpiringDict(max_len=100, max_age_seconds=20)
@ -91,6 +107,7 @@ def latest_handle_messages(message: Message, bot: TeleBot):
"sd", "sd",
"map", "map",
"yi", "yi",
"cohere",
) )
): ):
return return
@ -120,6 +137,7 @@ def answer_it_handler(message: Message, bot: TeleBot):
latest_message = chat_message_dict.get(chat_id) latest_message = chat_message_dict.get(chat_id)
m = latest_message.text.strip() m = latest_message.text.strip()
m = enrich_text_with_urls(m) m = enrich_text_with_urls(m)
full = ""
##### Gemini ##### ##### Gemini #####
who = "Gemini Pro" who = "Gemini Pro"
# show something, make it more responsible # show something, make it more responsible
@ -141,6 +159,8 @@ def answer_it_handler(message: Message, bot: TeleBot):
convo.history.clear() convo.history.clear()
bot_reply_markdown(reply_id, who, "Error", bot) bot_reply_markdown(reply_id, who, "Error", bot)
full += f"{who}:\n{s}"
chat_id_list = [reply_id.message_id]
##### ChatGPT ##### ##### ChatGPT #####
who = "ChatGPT Pro" who = "ChatGPT Pro"
reply_id = bot_reply_first(latest_message, who, bot) reply_id = bot_reply_first(latest_message, who, bot)
@ -173,6 +193,94 @@ def answer_it_handler(message: Message, bot: TeleBot):
print(e) print(e)
bot_reply_markdown(reply_id, who, "answer wrong", bot) bot_reply_markdown(reply_id, who, "answer wrong", bot)
full += f"\n---\n{who}:\n{s}"
chat_id_list.append(reply_id.message_id)
##### Cohere #####
if USE_CHHERE and COHERE_API_KEY:
full, chat_id = cohere_answer(latest_message, bot, full, m)
chat_id_list.append(chat_id)
else:
pass
##### Telegraph #####
final_answer(latest_message, bot, full, chat_id_list)
def cohere_answer(latest_message: Message, bot: TeleBot, full, m):
"""cohere answer"""
who = "Command R Plus"
reply_id = bot_reply_first(latest_message, who, bot)
player_message = [{"role": "User", "message": m}]
try:
stream = co.chat_stream(
model=COHERE_MODEL,
message=m,
temperature=0.3,
chat_history=player_message,
prompt_truncation="AUTO",
connectors=[{"id": "web-search"}],
citation_quality="accurate",
preamble=f"You are Command R+, a large language model trained to have polite, helpful, inclusive conversations with people. The current time in Tornoto is {datetime.datetime.now(datetime.timezone.utc).astimezone().strftime('%Y-%m-%d %H:%M:%S')}, in Los Angeles is {datetime.datetime.now(datetime.timezone.utc).astimezone().astimezone(datetime.timezone(datetime.timedelta(hours=-7))).strftime('%Y-%m-%d %H:%M:%S')}, and in China is {datetime.datetime.now(datetime.timezone.utc).astimezone(datetime.timezone(datetime.timedelta(hours=8))).strftime('%Y-%m-%d %H:%M:%S')}.",
)
s = ""
source = ""
start = time.time()
for event in stream:
if event.event_type == "stream-start":
bot_reply_markdown(reply_id, who, "Thinking...", bot)
elif event.event_type == "search-queries-generation":
bot_reply_markdown(reply_id, who, "Searching online...", bot)
elif event.event_type == "search-results":
bot_reply_markdown(reply_id, who, "Reading...", bot)
for doc in event.documents:
source += f"\n{doc['title']}\n{doc['url']}\n"
elif event.event_type == "text-generation":
s += event.text.encode("utf-8").decode("utf-8")
if time.time() - start > 0.4:
start = time.time()
bot_reply_markdown(
reply_id,
who,
f"\nStill thinking{len(s)}...",
bot,
split_text=True,
)
elif event.event_type == "stream-end":
break
content = (
s
+ "\n------\n------\n"
+ source
+ f"\n------\n------\nLast Update{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
try:
bot_reply_markdown(reply_id, who, s, bot, split_text=True)
except:
pass
except Exception as e:
print(e)
bot_reply_markdown(reply_id, who, "Answer wrong", bot)
player_message.clear()
return full, reply_id.message_id
full += f"\n---\n{who}:\n{content}"
return full, reply_id.message_id
def final_answer(latest_message: Message, bot: TeleBot, full, answers_list):
"""final answer"""
who = "Answer"
reply_id = bot_reply_first(latest_message, who, bot)
ph_s = ph.create_page_md(title="Answer it", markdown_text=full)
bot_reply_markdown(reply_id, who, f"[View]({ph_s})", bot)
# delete the chat message, only leave a telegra.ph link
for i in answers_list:
bot.delete_message(latest_message.chat.id, i)
if GOOGLE_GEMINI_KEY and CHATGPT_API_KEY: if GOOGLE_GEMINI_KEY and CHATGPT_API_KEY: