|
|
|
|
|
import gradio as gr |
|
|
from parrot import Parrot |
|
|
import nltk |
|
|
from nltk.tokenize import sent_tokenize, word_tokenize |
|
|
import re |
|
|
import time |
|
|
import os |
|
|
from ddgs import DDGS |
|
|
from googleapiclient.discovery import build |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
nltk.data.path.append("./nltk_data") |
|
|
parrot = None |
|
|
|
|
|
|
|
|
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") |
|
|
GOOGLE_CX = os.getenv("GOOGLE_CX") |
|
|
|
|
|
if not GOOGLE_API_KEY or not GOOGLE_CX: |
|
|
print("β οΈ Warning: GOOGLE_API_KEY or GOOGLE_CX not set. Google fallback may fail.") |
|
|
|
|
|
MAX_TOKENS = 150 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_parrot(): |
|
|
global parrot |
|
|
if parrot is None: |
|
|
print("β³ Loading Parrot model...") |
|
|
parrot = Parrot(model_tag="prithivida/parrot_paraphraser_on_T5", use_gpu=False) |
|
|
print("β
Parrot model loaded!") |
|
|
return parrot |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def clean_sentence(sent): |
|
|
sent = sent.strip() |
|
|
sent = re.sub(r"[.!?]+$", "", sent) |
|
|
if sent: |
|
|
sent = sent[0].upper() + sent[1:] |
|
|
if not sent.endswith("."): |
|
|
sent += "." |
|
|
return sent |
|
|
|
|
|
|
|
|
def clean_sentences(sentences): |
|
|
cleaned = [clean_sentence(s) for s in sentences if s.strip()] |
|
|
return " ".join(cleaned) |
|
|
|
|
|
|
|
|
def split_long_sentence(sentence, max_tokens=MAX_TOKENS): |
|
|
words = word_tokenize(sentence) |
|
|
return [" ".join(words[i:i + max_tokens]) for i in range(0, len(words), max_tokens)] |
|
|
|
|
|
|
|
|
def with_retry(func, *args, retries=1, delay=3, **kwargs): |
|
|
for attempt in range(retries + 1): |
|
|
try: |
|
|
return func(*args, **kwargs) |
|
|
except Exception as e: |
|
|
print(f"β οΈ Attempt {attempt + 1} failed: {e}") |
|
|
if attempt < retries: |
|
|
print("π Retrying...") |
|
|
time.sleep(delay) |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def rephrase(text): |
|
|
model = get_parrot() |
|
|
sentences = sent_tokenize(text) |
|
|
rephrased = [] |
|
|
|
|
|
for s in sentences: |
|
|
chunks = split_long_sentence(s) |
|
|
paraphrased_chunks = [] |
|
|
for c in chunks: |
|
|
p = with_retry( |
|
|
model.augment, |
|
|
input_phrase=c, |
|
|
do_diverse=True, |
|
|
adequacy_threshold=0.7, |
|
|
fluency_threshold=0.7, |
|
|
) |
|
|
if p: |
|
|
paraphrased_chunks.append(p[0][0]) |
|
|
else: |
|
|
paraphrased_chunks.append(c) |
|
|
rephrased.append(" ".join(paraphrased_chunks)) |
|
|
|
|
|
return clean_sentences(rephrased) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_unique_paraphrases(sentence, N_OPTIONS=3): |
|
|
model = get_parrot() |
|
|
paraphrases = with_retry( |
|
|
model.augment, |
|
|
input_phrase=sentence, |
|
|
do_diverse=True, |
|
|
adequacy_threshold=0.7, |
|
|
fluency_threshold=0.7, |
|
|
) |
|
|
if not paraphrases: |
|
|
return [sentence] |
|
|
texts = [p[0] for p in paraphrases] |
|
|
unique = [] |
|
|
for t in texts: |
|
|
if t not in unique: |
|
|
unique.append(t) |
|
|
if len(unique) == N_OPTIONS: |
|
|
break |
|
|
return unique |
|
|
|
|
|
|
|
|
def rephrase_sentencewise_unique(text, N_OPTIONS=3): |
|
|
sentences = sent_tokenize(text.strip()) |
|
|
results = [] |
|
|
for idx, s in enumerate(sentences, 1): |
|
|
paraphrases = generate_unique_paraphrases(s, N_OPTIONS) |
|
|
paraphrases = [clean_sentence(p) for p in paraphrases] |
|
|
formatted = f"Sentence {idx}: {s}\n" |
|
|
for i, opt in enumerate(paraphrases, 1): |
|
|
formatted += f" Option {i}: {opt}\n" |
|
|
results.append(formatted) |
|
|
return "\n".join(results) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def search_duckduckgo(query): |
|
|
try: |
|
|
with DDGS() as ddgs: |
|
|
results = list(ddgs.text(f'"{query}"', max_results=3)) |
|
|
return results, len(results) |
|
|
except Exception as e: |
|
|
print(f"β οΈ DDG error: {e}") |
|
|
return [], 0 |
|
|
|
|
|
from difflib import SequenceMatcher |
|
|
|
|
|
def similarity(a, b): |
|
|
return SequenceMatcher(None, a, b).ratio() |
|
|
|
|
|
def detect_duckduckgo(text, threshold=0.8): |
|
|
sentences = sent_tokenize(text) |
|
|
matches = [] |
|
|
for sent in sentences: |
|
|
if len(sent.split()) < 6: |
|
|
continue |
|
|
results, _ = search_duckduckgo(sent) |
|
|
for res in results: |
|
|
snippet = res.get("body", "") |
|
|
sim = similarity(sent.lower(), snippet.lower()) |
|
|
if sim >= threshold: |
|
|
matches.append((sent, sim, res.get("href", "Unknown"))) |
|
|
break |
|
|
time.sleep(1) |
|
|
return matches |
|
|
|
|
|
|
|
|
def search_google(query): |
|
|
if not GOOGLE_API_KEY or not GOOGLE_CX: |
|
|
return [], 0 |
|
|
try: |
|
|
service = build("customsearch", "v1", developerKey=GOOGLE_API_KEY) |
|
|
res = service.cse().list(q=f'"{query}"', cx=GOOGLE_CX, num=3).execute() |
|
|
items = res.get("items", []) |
|
|
total = int(res.get("searchInformation", {}).get("totalResults", 0)) |
|
|
return items, total |
|
|
except Exception as e: |
|
|
print(f"β οΈ Google search error: {e}") |
|
|
return [], 0 |
|
|
|
|
|
|
|
|
def detect_google(text): |
|
|
sentences = sent_tokenize(text) |
|
|
matches = [] |
|
|
for sent in sentences: |
|
|
results, total = search_google(sent) |
|
|
if total > 0: |
|
|
url = results[0].get("link", "Unknown") |
|
|
matches.append((sent, total, url)) |
|
|
time.sleep(0.3) |
|
|
return matches |
|
|
|
|
|
|
|
|
def hybrid_detect(text): |
|
|
ddg_matches = detect_duckduckgo(text) |
|
|
matches = ddg_matches if ddg_matches else detect_google(text) |
|
|
highlighted = text |
|
|
urls = [] |
|
|
for sent, _, url in matches: |
|
|
highlighted = highlighted.replace(sent, f"**{sent}**") |
|
|
urls.append(url) |
|
|
return {"highlighted_text": highlighted, "sources": urls} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def warmup(): |
|
|
try: |
|
|
print("π₯ Warming up Parrot model...") |
|
|
model = get_parrot() |
|
|
_ = model.augment(input_phrase="hello world", do_diverse=False) |
|
|
print("β
Warmup complete.") |
|
|
except Exception as e: |
|
|
print(f"β οΈ Warmup skipped: {e}") |
|
|
|
|
|
|
|
|
warmup() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
rephrase_iface = gr.Interface( |
|
|
fn=rephrase, |
|
|
inputs=gr.Textbox(lines=10, placeholder="Paste your text here..."), |
|
|
outputs="text", |
|
|
title="π¦ Parrot Rephraser (Long Text)", |
|
|
description="Rephrase paragraphs while maintaining meaning.", |
|
|
) |
|
|
|
|
|
sentencewise_iface = gr.Interface( |
|
|
fn=rephrase_sentencewise_unique, |
|
|
inputs=gr.Textbox(lines=10, placeholder="Paste text here..."), |
|
|
outputs="text", |
|
|
title="π§© Sentence-wise Paraphraser", |
|
|
description="Generates top 3 diverse rephrases per sentence.", |
|
|
) |
|
|
|
|
|
plagiarism_iface = gr.Interface( |
|
|
fn=hybrid_detect, |
|
|
inputs=gr.Textbox(lines=10, placeholder="Paste text to check plagiarism..."), |
|
|
outputs=gr.JSON(), |
|
|
title="π Hybrid Plagiarism Detector", |
|
|
description="Detects copied sentences using DuckDuckGo & Google Custom Search.", |
|
|
) |
|
|
|
|
|
demo = gr.TabbedInterface( |
|
|
[rephrase_iface, sentencewise_iface, plagiarism_iface], |
|
|
["Full Text Rephraser", "Sentence-wise Paraphrases", "Plagiarism Checker"], |
|
|
) |
|
|
|
|
|
demo.launch(server_port=7860, server_name="0.0.0.0", show_error=True) |
|
|
|