Spaces:
Sleeping
Sleeping
| from typing import Annotated, Optional | |
| from fastapi import FastAPI, Header, Query | |
| import html2text | |
| import requests | |
| import httpx | |
| import re | |
| import json | |
| import newspaper | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from bs4 import BeautifulSoup | |
| import googleapiclient | |
| import googleapiclient.discovery | |
| from datetime import datetime | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def news_details(url: str): | |
| article = newspaper.Article(url) | |
| article.download() | |
| article.parse() | |
| return { | |
| "title": article.title, | |
| "description": article.text, | |
| "author": article.authors, | |
| "date": article.publish_date, | |
| } | |
| async def linkedin_post_details(post_id: Optional[str] = None, url: Optional[str] = None): | |
| if not url: | |
| url = "https://www.linkedin.com/posts/"+post_id | |
| res = requests.get(url, headers={"user-agent":"Googlebot", "accept-language": "en-US"}) | |
| soup = BeautifulSoup(res.content, "html.parser") | |
| script_tags = soup.find_all("script") | |
| for script_tag in script_tags: | |
| try: | |
| script_tag = json.loads(script_tag.string) | |
| if script_tag.get("datePublished"): | |
| desc = script_tag.get("articleBody") | |
| if not desc: | |
| desc = script_tag.get("description") | |
| author = script_tag.get("author") | |
| full_name = author.get("name") | |
| username = author.get("url").rsplit("/", 1)[-1] | |
| user_type = author.get("@type").lower() | |
| date = script_tag.get("datePublished") | |
| except Exception as e: | |
| continue | |
| spans = soup.find_all( | |
| "span", {"data-test-id": "social-actions__reaction-count"} | |
| ) | |
| if spans: | |
| reactions = spans[0].text.strip() | |
| else: | |
| reactions = '0' | |
| try: | |
| comments = str(soup.find("a", {"data-test-id": "social-actions__comments"}).get( | |
| "data-num-comments" | |
| )) | |
| except: | |
| comments = '0' | |
| return { | |
| "insights": { | |
| "likeCount": None, | |
| # "commentCount": int(comments.replace(",", "")), | |
| "commentCount": comments, | |
| "shareCount": None, | |
| # "reactionCount": int(reactions.replace(",", "")), | |
| "reactionCount":reactions, | |
| "reactions": [], | |
| }, | |
| "description": desc, | |
| "username": username, | |
| "name": full_name, | |
| "userType": user_type, | |
| "date": date, | |
| } | |
| # async def linkedin_post_details(post_id: str): | |
| # url = "https://www.linkedin.com/posts/"+post_id | |
| # res = requests.get(url, headers={"user-agent":"Googlebot", "accept-language": "en-US"}) | |
| # text_maker = html2text.HTML2Text() | |
| # text_maker.ignore_links = True | |
| # text_maker.ignore_images = True | |
| # text_maker.bypass_tables = False | |
| # docs = text_maker.handle(res.content.decode("utf-8")) | |
| # chunks = docs.split("\n\n#") | |
| # linkedin_content = chunks[1] | |
| # user = linkedin_content.split("\n\n", 5) | |
| # full_name = user[1] | |
| # bio = user[2] | |
| # try: | |
| # date, edited = user[3].split(" ") | |
| # edited = True | |
| # except: | |
| # date = user[3].strip() | |
| # edited = False | |
| # content = "\n\n".join(user[5:]) | |
| # insights = chunks[3].split("\n\n")[2] | |
| # likes = insights.split(" ", 1)[0].strip() | |
| # comments = insights.rsplit(" ", 2)[1].strip() | |
| # username = url.rsplit("/",1)[-1].split("_")[0] | |
| # return { | |
| # "userDetails": {"full_name": full_name, "username":username,"bio": bio}, | |
| # "content": content, | |
| # "date": date, | |
| # "is_edited": edited, | |
| # "insights": {"likeCount": likes, "commentCount": comments, "shareCount": None, "viewCount":None}, | |
| # "username":username | |
| # } | |
| async def ig_post_detail(post_id: Optional[str] = None, url: Optional[str] = None): | |
| if not url: | |
| url = f"https://www.instagram.com/p/{post_id}" | |
| res = requests.get( | |
| url, | |
| headers={ | |
| "user-agent": "Googlebot", | |
| "accept-language": "en-US" | |
| }, | |
| timeout=(10, 27), | |
| ) | |
| soup = BeautifulSoup(res.content, "html.parser") | |
| meta = soup.find("meta", {"name": "description"}) | |
| content = meta.get("content") | |
| like_split = content.split(" likes, ") | |
| likes = like_split[0] | |
| comment_split = like_split[1].split(" comments - ") | |
| comments = comment_split[0] | |
| author_split = comment_split[1].split(": "") | |
| author_date = author_split[0].split(" on ") | |
| username = author_date[0] | |
| date = author_date[1].split(":")[0] | |
| name_desc = ( | |
| soup.find("meta", {"property": "og:title"}) | |
| .get("content") | |
| .split(" on Instagram: ", 1) | |
| ) | |
| full_name = name_desc[0] | |
| desc = name_desc[-1] | |
| return { | |
| "insights": { | |
| "likeCount": likes, | |
| "commentCount": comments, | |
| "shareCount": None, | |
| }, | |
| "description": desc, | |
| "username": username, | |
| "name": full_name, | |
| "date": date, | |
| } | |
| async def ig_post_detail_api(post_id: Optional[str] = None, url: Optional[str] = None): | |
| if not post_id: | |
| # url = f"https://www.instagram.com/p/{post_id}" | |
| post_id = url.split("/")[-1] | |
| query_hash = "2b0673e0dc4580674a88d426fe00ea90" | |
| variables = { | |
| "shortcode": post_id | |
| } | |
| variables_json = json.dumps(variables, separators=(',', ':')) | |
| url = f"https://www.instagram.com/graphql/query/?query_hash={query_hash}&variables={variables_json}" | |
| res = requests.get(url, headers={"user-agent":"Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Googlebot/2.1; +http://www.google.com/bot.html) Chrome/W.X.Y.Z Safari/537.36"}).json() | |
| print(res) | |
| res = res["data"]["shortcut_media"] | |
| if res.get("edge_media_preview_like"): | |
| likes = res.get("edge_media_preview_like").get("count", 0) | |
| else: | |
| likes = 0 | |
| if res.get("edge_media_to_comment"): | |
| comments = res.get("edge_media_to_comment").get("count", 0) | |
| else: | |
| comments = 0 | |
| if res.get("edge_media_to_caption"): | |
| desc = "" | |
| for x in res.get("edge_media_to_caption"): | |
| if x.get("node"): | |
| desc += x.get("node").get("text", "") + "\n\n" | |
| desc = desc[:-4] | |
| username = res["owner"].get("username") | |
| full_name = res["owner"].get("full_name") | |
| date = str(datetime.fromtimestamp(int(res.get("taken_at_timestamp")))) | |
| # res = requests.get( | |
| # url, | |
| # headers={ | |
| # "user-agent": "Googlebot", | |
| # "accept-language": "en-US" | |
| # }, | |
| # timeout=(10, 27), | |
| # ) | |
| # soup = BeautifulSoup(res.content, "html.parser") | |
| # meta = soup.find("meta", {"name": "description"}) | |
| # content = meta.get("content") | |
| # like_split = content.split(" likes, ") | |
| # likes = like_split[0] | |
| # comment_split = like_split[1].split(" comments - ") | |
| # comments = comment_split[0] | |
| # author_split = comment_split[1].split(": "") | |
| # author_date = author_split[0].split(" on ") | |
| # username = author_date[0] | |
| # date = author_date[1].split(":")[0] | |
| # name_desc = ( | |
| # soup.find("meta", {"property": "og:title"}) | |
| # .get("content") | |
| # .split(" on Instagram: ", 1) | |
| # ) | |
| # full_name = name_desc[0] | |
| # desc = name_desc[-1] | |
| return { | |
| "insights": { | |
| "likeCount": likes, | |
| "commentCount": comments, | |
| "shareCount": None, | |
| }, | |
| "description": desc, | |
| "username": username, | |
| "name": full_name, | |
| "date": date, | |
| } | |
| async def fb_post_detail(username: Optional[str] = None, post_id: Optional[str] = None, url: Optional[str] = None, api_access_key: Optional[str] = None): | |
| if not url: | |
| url = f"https://www.facebook.com/{username}/posts/{post_id}" | |
| else: | |
| username = url.split("//www.facebook.com/",1)[-1].split("/",1)[0] | |
| user_agent = "Googlebot" | |
| res = requests.get( | |
| url, | |
| headers={ | |
| "user-agent": user_agent, | |
| "accept-language": "en-US" | |
| }, | |
| timeout=(10, 27), | |
| ) | |
| soup = BeautifulSoup(res.content, "html.parser") | |
| script_tags = soup.find_all("script") | |
| print(len(script_tags)) | |
| for script_tag in script_tags: | |
| try: | |
| if "important_reactors" in script_tag.string: | |
| splitter = '"reaction_count":{"count":' | |
| total_react, reaction_split = script_tag.string.split(splitter, 2)[1].split("},", 1) | |
| total_react = total_react.split(',"')[0] | |
| pattern = r"\[.*?\]" | |
| reactions = re.search(pattern, reaction_split) | |
| if reactions: | |
| reactions = json.loads(reactions.group(0)) | |
| else: | |
| reactions = [] | |
| reactions = [ | |
| dict( | |
| name=reaction["node"]["localized_name"].lower(), | |
| count=reaction["reaction_count"], | |
| is_visible=reaction["visible_in_bling_bar"], | |
| ) | |
| for reaction in reactions | |
| ] | |
| splitter = '"share_count":{"count":' | |
| shares = script_tag.string.split(splitter, 2)[1].split(",")[0] | |
| splitter = '"comments":{"total_count":' | |
| comments = script_tag.string.split(splitter, 2)[1].split("}")[0] | |
| likes = [x.get("count") for x in reactions if x.get("name") == "like"] | |
| likes = likes[0] if likes else 0 | |
| print(total_react, reactions, shares, comments, likes) | |
| if '"message":{"text":"' in script_tag.string: | |
| desc = script_tag.string.split('"message":{"text":"', 1)[-1].split('"},')[0] | |
| except Exception as e: | |
| print(e) | |
| continue | |
| name = soup.find("meta", {"property": "og:title"}).get("content") | |
| date = None | |
| if api_access_key: | |
| if not post_id: | |
| post_id = url.split("/")[-1] | |
| try: | |
| post_details = requests.get( | |
| f"https://graph.facebook.com/v20.0/1066512588151225_{post_id}?fields=place,shares,targeting,updated_time,created_time,description,child_attachments,caption,event,message,message_tags,story,status_type,source,coordinates,backdated_time,story_tags,scheduled_publish_time,properties,attachments&access_token={api_access_key}", | |
| headers={"user-agent": "Mozilla/5.0 AppleWebKit/537.36 (KHTML, like Gecko; compatible; Googlebot/2.1; +http://www.google.com/bot.html) Chrome/W.X.Y.Z Safari/537.36",} | |
| ).json() | |
| if post_details.get("updated_time"): | |
| date = post_details.get("updated_time") | |
| else: | |
| date = post_details.get("created_time") | |
| except Exception as e: | |
| print(e) | |
| else: | |
| post_details = None | |
| return { | |
| "insights": { | |
| "likeCount": likes, | |
| "commentCount": int(comments), | |
| "shareCount": int(shares), | |
| "reactionCount": int(total_react), | |
| "reactions": reactions, | |
| }, | |
| "description": desc, | |
| "username": username, | |
| "name": name, | |
| "date": date, | |
| "details":post_details | |
| } | |
| async def google_search(q: str, delimiter: str = "\n---\n", sites: Annotated[list[str] | None, Query()] = None): | |
| print(sites) | |
| print(type(sites)) | |
| url = f"https://www.google.com/search?q={q} " | |
| if sites: | |
| url += " OR ".join(["site:"+site for site in sites]) | |
| texts = "" | |
| soup = BeautifulSoup(requests.get(url).content, "html.parser") | |
| for div in soup.find_all("div")[24:]: | |
| if len(div.find_parents("div")) == 8: # Depth 4 means 3 parent divs (0-indexed) | |
| # print(div.get_text().strip()) | |
| href = div.find(href=True, recursive=True) | |
| text = div.find(text=True, recursive=False) | |
| if href and text: | |
| print(text) | |
| text = f'[{text}]({href["href"].split("/url?q=")[-1]})' | |
| if text != None and text.strip(): | |
| texts += text + delimiter | |
| return {"results":texts} | |
| async def google_search_url(q: str, sites: Annotated[list[str] | None, Query()] = None, start:int = 0, user_agent="Twitterbot"): | |
| url = f"https://www.google.com/search?start={start}&q={q} " | |
| if sites: | |
| url += " OR ".join(["site:"+site for site in sites]) | |
| res = requests.get( | |
| url, | |
| headers={ | |
| "user-agent": user_agent, | |
| "accept-language": "en-US" | |
| }, | |
| timeout=(10, 27), | |
| ) | |
| soup = BeautifulSoup(res.content, "html.parser") | |
| prefix = "/url?q=h" | |
| len_prefix = len(prefix) | |
| docs = [] | |
| for div in soup.find_all(True): | |
| if len(div.find_parents()) == 2: # Depth 4 means 3 parent divs (0-indexed) | |
| a_tags = div.find_all("a") | |
| for a in a_tags: | |
| doc = a.get("href") | |
| if ( | |
| doc[:len_prefix] == prefix | |
| and "google.com" not in doc[len_prefix - 1 :] | |
| ): | |
| docs.append( | |
| doc[len_prefix - 1 :] | |
| .split("&")[0] | |
| .replace("%3F", "?") | |
| .replace("%3D", "=") | |
| ) | |
| return {"results":docs} | |
| async def tiktok_video_details(username: Optional[str] = None, video_id:Optional[str] = None, url: Optional[str] = None): | |
| if not url: | |
| if username[0] != "@": | |
| username = "@" + username | |
| url = f"https://www.tiktok.com/{username}/video/{video_id}" | |
| else: | |
| username = url.split("//www.tiktok.com/",1)[-1].split("/")[0] | |
| # user_agent = "LinkedInBot" | |
| user_agent = "Mozilla/5.0 (compatible; YandexBot/3.0; +http://yandex.com/bots)" | |
| res = requests.get(url, headers={"user-agent": user_agent}) | |
| # soup = BeautifulSoup(res.content, "html.parser") | |
| # insights = soup.find("meta", {"property": "og:description"}).get("content") | |
| # likes = insights.split(" ", 1)[0] | |
| # desc = insights.rsplit(" comments. “", 1)[-1][:-1] | |
| # comments = insights.split(", ", 1)[-1].split(" ", 1)[0] | |
| # name = soup.find("meta", {"property": "og:title"}).get("content")[9:] | |
| # return { | |
| # "insights": {"likeCount": likes, "commentCount": comments, "shareCount":None, "viewCount":None}, | |
| # "description": desc, | |
| # "username": username, | |
| # "name": name, | |
| # } | |
| text_maker = html2text.HTML2Text() | |
| text_maker.ignore_links = True | |
| text_maker.ignore_images = True | |
| text_maker.bypass_tables = False | |
| print("RESPONSE DETAIlL", res.content.decode("utf-8")) | |
| docs = text_maker.handle(res.content.decode("utf-8")) | |
| print("DOCS", docs) | |
| content_detail = docs.split("###")[5] | |
| likes, comments, bookmarks, shares = re.findall(r'\*\*([\w.]+)\*\*', content_detail) | |
| profile = [x.strip() for x in content_detail.split("\n\nSpeed\n\n", 1)[1].split("\n", 6) if x.strip()] | |
| username = profile[0] | |
| date = profile[1].rsplit(" · ", 1)[-1] | |
| desc = profile[-1][2:].replace("**", "") | |
| return { | |
| "insights":{ | |
| "likeCount":likes, | |
| "commentCount":comments, | |
| "bookmarkCount":bookmarks, | |
| "shareCount":shares | |
| }, | |
| "username":username, | |
| "date":date, | |
| "description":desc | |
| } | |
| async def yt_vid_detail(api_key:str, video_id: Optional[str] = None, url: Optional[str] = None): | |
| # yt_ids = [doc.split("?v=")[-1] for doc in docs] | |
| if url: | |
| video_id = url.split("?v=")[-1] | |
| youtube = googleapiclient.discovery.build( | |
| "youtube", "v3", developerKey=api_key | |
| ) | |
| # request = youtube.search().list(part="snippet", q="sari roti", type="video") | |
| request = youtube.videos().list( | |
| part="snippet,statistics,topicDetails", | |
| # id=",".join(yt_ids), | |
| id = video_id, | |
| ) | |
| return request.execute()["items"] | |