From 9c8638279e84527f03aed237faea99da8c7094c4 Mon Sep 17 00:00:00 2001 From: Alter-xyz <88554920+alterxyz@users.noreply.github.com> Date: Mon, 24 Jun 2024 21:26:44 -0400 Subject: [PATCH] feat: Default skip telegraph account It will print out the token in local terminal by default. If Store_Token = True, it will store token in "token_key.json" View my lite ver and try at --- .gitignore | 2 +- README.md | 14 +- handlers/__init__.py | 313 ++++++++++++++++--------------------------- handlers/cohere.py | 13 +- handlers/useful.py | 18 ++- 5 files changed, 141 insertions(+), 219 deletions(-) diff --git a/.gitignore b/.gitignore index b468ea3..d4d2196 100644 --- a/.gitignore +++ b/.gitignore @@ -169,4 +169,4 @@ nohup.out *.pdf .pdm-python *.wav -telegraph_token.json +token_key.json diff --git a/README.md b/README.md index 790eb95..2eced2d 100644 --- a/README.md +++ b/README.md @@ -81,7 +81,19 @@ Note, currently its support dify Chatbot with instructions(System prompt) and di 2. export COHERE_API_KEY=${the_key} 3. use `cohere: ${message}` to ask -## Bot -> `Telegra.ph` +## Function -> Telegraph + +### Skip token (default) + +You do not need to do anything. + +But you may not be able to edit any generated post since you do not have the token. + +### Store token (recommended) + +Change "Store_Token" to "True" in "handlers/__init__.py" TelegraphAPI/_create_ph_account. It will store the token in "token_key.json". + +### Get token manually from Telegram account 1. https://t.me/telegraph Create or login Telegraph account 2. `Log in as ${Account} on this device` diff --git a/handlers/__init__.py b/handlers/__init__.py index 0788c4b..f2867cc 100644 --- a/handlers/__init__.py +++ b/handlers/__init__.py @@ -200,66 +200,26 @@ def image_to_data_uri(file_path): return f"data:image/png;base64,{encoded_image}" -import requests import json -import markdown # pip install Markdown -from bs4 import BeautifulSoup # pip install beautifulsoup4 - - -def create_ph_account(short_name: str, author_name: str, author_url: str = None) -> str: - """ - Creates a new account on the Telegra.ph platform. - If an account already exists (stored in a local JSON file), returns the existing access token. - Otherwise, creates a new account and stores the information locally. - Sample request - https://api.telegra.ph/editAccountInfo?access_token=d3b25feccb89e508a9114afb82aa421fe2a9712b963b387cc5ad71e58722&short_name=Sandbox&author_name=Anonymous - - Args: - short_name (str): The short name of the account. - author_name (str): The name of the author. - author_url (str, optional): The URL of the author's profile. Defaults to None. - - Returns: - str: The access token for the account. - - Raises: - requests.RequestException: If the API request fails. - json.JSONDecodeError: If the API response is not valid JSON. - KeyError: If the API response does not contain the expected data. - """ - TELEGRAPH_API_URL = "https://api.telegra.ph/createAccount" - - # Try to load existing account information - try: - with open("telegraph_token.json", "r") as f: - account = json.load(f) - return account["result"]["access_token"] - except FileNotFoundError: - # If no existing account, create a new one - data = { - "short_name": short_name, - "author_name": author_name, - } - if author_url: - data["author_url"] = author_url - - # Make API request - response = requests.post(TELEGRAPH_API_URL, data=data) - response.raise_for_status() # Raises an HTTPError for bad responses - - account = response.json() - access_token = account["result"]["access_token"] - - # Store the new account information - with open("telegraph_token.json", "w") as f: - json.dump(account, f) - - return access_token +import requests +import os +from bs4 import BeautifulSoup +import markdown class TelegraphAPI: - def __init__(self, access_token): - self.access_token = access_token + def __init__( + self, + access_token=None, + short_name="tg_bot_collections", + author_name="Telegram Bot Collections", + author_url=None, + ): + self.access_token = ( + access_token + if access_token + else self._create_ph_account(short_name, author_name, author_url) + ) self.base_url = "https://api.telegra.ph" # Get account info on initialization @@ -268,34 +228,58 @@ class TelegraphAPI: self.author_name = account_info.get("author_name") self.author_url = account_info.get("author_url") + def _create_ph_account(self, short_name, author_name, author_url): + Store_Token = False + TELEGRAPH_API_URL = "https://api.telegra.ph/createAccount" + TOKEN_FILE = "token_key.json" + + # Try to load existing token information + try: + with open(TOKEN_FILE, "r") as f: + tokens = json.load(f) + if "TELEGRA_PH_TOKEN" in tokens and tokens["TELEGRA_PH_TOKEN"] != "example": + return tokens["TELEGRA_PH_TOKEN"] + except FileNotFoundError: + tokens = {} + + # If no existing valid token in TOKEN_FILE, create a new account + data = { + "short_name": short_name, + "author_name": author_name, + "author_url": author_url, + } + + # Make API request + response = requests.post(TELEGRAPH_API_URL, data=data) + response.raise_for_status() + + account = response.json() + access_token = account["result"]["access_token"] + + # Update the token in the dictionary + tokens["TELEGRA_PH_TOKEN"] = access_token + + # Store the updated tokens + if Store_Token: + with open(TOKEN_FILE, "w") as f: + json.dump(tokens, f, indent=4) + else: + print(f"Token not stored to file, but here is your token:\n{access_token}") + + # Store it to the environment variable + os.environ["TELEGRA_PH_TOKEN"] = access_token + + return access_token + def create_page( self, title, content, author_name=None, author_url=None, return_content=False ): - """ - Creates a new Telegraph page. - - Args: - title (str): Page title (1-256 characters). - content (list): Content of the page as a list of Node dictionaries. - author_name (str, optional): Author name (0-128 characters). Defaults to account's author_name. - author_url (str, optional): Profile link (0-512 characters). Defaults to account's author_url. - return_content (bool, optional): If True, return the content field in the response. - - Returns: - str: URL of the created page. - - Raises: - requests.exceptions.RequestException: If the request fails. - - - """ url = f"{self.base_url}/createPage" data = { "access_token": self.access_token, "title": title, "content": json.dumps(content), "return_content": return_content, - # Use provided author info or fall back to account info "author_name": author_name if author_name else self.author_name, "author_url": author_url if author_url else self.author_url, } @@ -307,14 +291,7 @@ class TelegraphAPI: return page_url def get_account_info(self): - """ - Gets information about the Telegraph account. - - Returns: - dict: Account information including short_name, author_name, and author_url. - Returns None if there's an error. - """ - url = f"{self.base_url}/getAccountInfo?access_token={self.access_token}" # &fields=[\"author_name\",\"author_url\"] for specific fields + url = f'{self.base_url}/getAccountInfo?access_token={self.access_token}&fields=["short_name","author_name","author_url","auth_url"]' response = requests.get(url) if response.status_code == 200: @@ -332,23 +309,6 @@ class TelegraphAPI: author_url=None, return_content=False, ): - """ - Edits an existing Telegraph page. - - Args: - path (str): Path of the page to edit. - title (str): New page title (1-256 characters). - content (list): New content of the page as a list of Node dictionaries. - author_name (str, optional): Author name (0-128 characters). Defaults to account's author_name. - author_url (str, optional): Profile link (0-512 characters). Defaults to account's author_url. - return_content (bool, optional): If True, return the content field in the response. - - Returns: - str: URL of the edited page. - - Raises: - requests.exceptions.RequestException: If the request fails. - """ url = f"{self.base_url}/editPage" data = { "access_token": self.access_token, @@ -356,7 +316,6 @@ class TelegraphAPI: "title": title, "content": json.dumps(content), "return_content": return_content, - # Use provided author info or fall back to account info "author_name": author_name if author_name else self.author_name, "author_url": author_url if author_url else self.author_url, } @@ -369,15 +328,6 @@ class TelegraphAPI: return page_url def get_page(self, path): - """ - Gets information about a Telegraph page. - - Args: - path (str): Path of the page to get. - - Returns: - dict: Information about the page. - """ url = f"{self.base_url}/getPage/{path}?return_content=true" response = requests.get(url) response.raise_for_status() @@ -391,23 +341,7 @@ class TelegraphAPI: author_url=None, return_content=False, ): - """ - Creates a new Telegraph page from markdown text. - - Args: - title (str): Page title (1-256 characters). - markdown_text (str): Markdown text to convert to HTML. - author_name (str, optional): Author name (0-128 characters). Defaults to account's author_name. - author_url (str, optional): Profile link (0-512 characters). Defaults to account's author_url. - return_content (bool, optional): If True, return the content field in the response. - - Returns: - str: URL of the created page. - - Raises: - requests.exceptions.RequestException: If the request fails. - """ - content = md_to_dom(markdown_text) + content = self._md_to_dom(markdown_text) return self.create_page(title, content, author_name, author_url, return_content) def edit_page_md( @@ -419,86 +353,72 @@ class TelegraphAPI: author_url=None, return_content=False, ): - content = md_to_dom(markdown_text) + content = self._md_to_dom(markdown_text) return self.edit_page( path, title, content, author_name, author_url, return_content ) + def authorize_browser(self): + url = f'{self.base_url}/getAccountInfo?access_token={self.access_token}&fields=["auth_url"]' + response = requests.get(url) + response.raise_for_status() + return response.json()["result"]["auth_url"] -def md_to_dom(markdown_text): - """Converts markdown text to a Python dictionary representing the DOM, - limiting heading levels to h3 and h4. + def _md_to_dom(self, markdown_text): + html = markdown.markdown( + markdown_text, + extensions=["markdown.extensions.extra", "markdown.extensions.sane_lists"], + ) - Args: - markdown_text: The markdown text to convert. + soup = BeautifulSoup(html, "html.parser") - Returns: - A Python list representing the DOM, where each element is a dictionary - with the following keys: - - 'tag': The tag name of the element. - - 'attributes': A dictionary of attributes for the element (optional). - - 'children': A list of child elements (optional). - """ + def parse_element(element): + tag_dict = {"tag": element.name} + if element.name in ["h1", "h2", "h3", "h4", "h5", "h6"]: + if element.name == "h1": + tag_dict["tag"] = "h3" + elif element.name == "h2": + tag_dict["tag"] = "h4" + else: + tag_dict["tag"] = "p" + tag_dict["children"] = [ + {"tag": "strong", "children": element.contents} + ] - # Convert markdown to HTML - html = markdown.markdown( - markdown_text, - extensions=["markdown.extensions.extra", "markdown.extensions.sane_lists"], - ) - - # Parse the HTML with BeautifulSoup - soup = BeautifulSoup(html, "html.parser") - - def parse_element(element): - tag_dict = {"tag": element.name} - if element.name in ["h1", "h2", "h3", "h4", "h5", "h6"]: - if element.name == "h1": - tag_dict["tag"] = "h3" - elif element.name == "h2": - tag_dict["tag"] = "h4" - else: - tag_dict["tag"] = "p" - tag_dict["children"] = [{"tag": "strong", "children": element.contents}] - - # Correctly handle children for h1-h6 - if element.attrs: - tag_dict["attributes"] = element.attrs - if element.contents: - children = [] - for child in element.contents: - if isinstance(child, str): - # Remove leading/trailing whitespace from text nodes - children.append(child.strip()) - else: # it's another tag - children.append(parse_element(child)) - tag_dict["children"] = children - else: - if element.attrs: - tag_dict["attributes"] = element.attrs - if element.contents: - children = [] - for child in element.contents: - if isinstance(child, str): - # Remove leading/trailing whitespace from text nodes - children.append(child.strip()) - else: # it's another tag - children.append(parse_element(child)) - if children: + if element.attrs: + tag_dict["attributes"] = element.attrs + if element.contents: + children = [] + for child in element.contents: + if isinstance(child, str): + children.append(child.strip()) + else: + children.append(parse_element(child)) tag_dict["children"] = children - return tag_dict + else: + if element.attrs: + tag_dict["attributes"] = element.attrs + if element.contents: + children = [] + for child in element.contents: + if isinstance(child, str): + children.append(child.strip()) + else: + children.append(parse_element(child)) + if children: + tag_dict["children"] = children + return tag_dict - new_dom = [] - for element in soup.contents: - if isinstance(element, str) and not element.strip(): - # Skip empty text nodes - continue - elif isinstance(element, str): - # Treat remaining text nodes as separate elements for clarity - new_dom.append({"tag": "text", "content": element.strip()}) - else: - new_dom.append(parse_element(element)) + new_dom = [] + for element in soup.contents: + if isinstance(element, str) and not element.strip(): + continue + elif isinstance(element, str): + new_dom.append({"tag": "text", "content": element.strip()}) + else: + new_dom.append(parse_element(element)) - return new_dom + return new_dom # `import *` will give you these @@ -508,5 +428,4 @@ __all__ = [ "enrich_text_with_urls", "image_to_data_uri", "TelegraphAPI", - "create_ph_account", ] diff --git a/handlers/cohere.py b/handlers/cohere.py index b437fca..ddc0082 100644 --- a/handlers/cohere.py +++ b/handlers/cohere.py @@ -17,19 +17,12 @@ markdown_symbol.link = "🔗" # If you want, Customizing the link symbol COHERE_API_KEY = environ.get("COHERE_API_KEY") COHERE_MODEL = "command-r-plus" - -TELEGRA_PH_TOKEN = environ.get("TELEGRA_PH_TOKEN") -if TELEGRA_PH_TOKEN: - ph = TelegraphAPI(TELEGRA_PH_TOKEN) -else: - TELEGRA_PH_TOKEN = create_ph_account( - short_name="cohere", author_name="A Telegram Bot" - ) - ph = TelegraphAPI(TELEGRA_PH_TOKEN) - if COHERE_API_KEY: co = cohere.Client(api_key=COHERE_API_KEY) +TELEGRA_PH_TOKEN = environ.get("TELEGRA_PH_TOKEN") +ph = TelegraphAPI(TELEGRA_PH_TOKEN) + # Global history cache cohere_player_dict = ExpiringDict(max_len=1000, max_age_seconds=300) diff --git a/handlers/useful.py b/handlers/useful.py index 835adab..75bc760 100644 --- a/handlers/useful.py +++ b/handlers/useful.py @@ -19,20 +19,18 @@ from . import * from telegramify_markdown.customize import markdown_symbol +#### Cohere init #### import cohere COHERE_API_KEY = environ.get("COHERE_API_KEY") -TELEGRA_PH_TOKEN = environ.get("TELEGRA_PH_TOKEN") +COHERE_MODEL = "command-r-plus" if COHERE_API_KEY: co = cohere.Client(api_key=COHERE_API_KEY) -if TELEGRA_PH_TOKEN: - ph = TelegraphAPI(TELEGRA_PH_TOKEN) -else: - TELEGRA_PH_TOKEN = create_ph_account( - short_name="Answer it", author_name="A Telegram Bot" - ) - ph = TelegraphAPI(TELEGRA_PH_TOKEN) -COHERE_MODEL = "command-r-plus" + +#### Telegraph init #### +TELEGRA_PH_TOKEN = environ.get("TELEGRA_PH_TOKEN") +ph = TelegraphAPI(TELEGRA_PH_TOKEN) +#### Telegraph done #### chat_message_dict = ExpiringDict(max_len=100, max_age_seconds=120) chat_user_dict = ExpiringDict(max_len=100, max_age_seconds=20) @@ -203,7 +201,7 @@ def answer_it_handler(message: Message, bot: TeleBot): else: pass - ##### Answer ##### + ##### Telegraph ##### final_answer(latest_message, bot, full, chat_id_list)