Spaces:
Running
on
Zero
Running
on
Zero
| import spaces # for ZeroGPU support | |
| import gradio as gr | |
| import pandas as pd | |
| import numpy as np | |
| import torch | |
| from threading import Thread | |
| from transformers import ( | |
| AutoModelForCausalLM, | |
| AutoTokenizer, | |
| AutoProcessor, | |
| TextIteratorStreamer | |
| ) | |
| # βββ MODEL SETUP ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| MODEL_NAME = "bytedance-research/ChatTS-14B" | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| MODEL_NAME, trust_remote_code=True | |
| ) | |
| processor = AutoProcessor.from_pretrained( | |
| MODEL_NAME, trust_remote_code=True, tokenizer=tokenizer | |
| ) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_NAME, | |
| trust_remote_code=True, | |
| device_map="auto", | |
| torch_dtype=torch.float16 | |
| ) | |
| model.eval() | |
| # βββ VISUAL THEME (Dark) βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| CSS = """ | |
| .gradio-container { | |
| box-shadow: none !important; | |
| border: none !important; | |
| } | |
| :root{ | |
| --bg:#0b1020; --bg-2:#10172e; --panel:#141b34; --panel-2:#0f172a; | |
| --border:#263154; --fg:#e7ecf5; --muted:#b9c2d3; --placeholder:#7c8195; | |
| } | |
| /* App background */ | |
| .gradio-container{background: linear-gradient(180deg,var(--bg),var(--bg) 40%, var(--bg-2)); color: var(--fg);} | |
| /* Generic block panels */ | |
| .gradio-container .block, .gradio-container .form, .gradio-container .padded{ | |
| background: linear-gradient(180deg,var(--panel),var(--panel-2)); | |
| border: 1px solid var(--border); | |
| border-radius: 14px; | |
| } | |
| /* Inputs */ | |
| input[type="text"], textarea, select, .gr-textbox, .gr-dropdown, .gr-textbox textarea{ | |
| background: #0f1630 !important; color: var(--fg) !important; | |
| border: 1px solid var(--border) !important; border-radius: 12px !important; | |
| } | |
| .gr-textbox textarea::placeholder{ color: var(--placeholder) !important; } | |
| label, .svelte-1gfkn6j, .svelte-i3tvor, .label{ color: var(--muted) !important; } | |
| /* Upload panel */ | |
| button.svelte-1x5qevo{ background:#0f1630 !important; color:var(--muted) !important; border-color:var(--border) !important; } | |
| .svelte-12ioyct{ color: var(--muted) !important; } | |
| /* Line plot container */ | |
| .vega-embed, canvas.marks{ background:#0b1020 !important; border-radius: 10px; } | |
| /* Buttons base */ | |
| .gr-button{ border-radius:14px !important; border:1px solid var(--border) !important; color:#0b1020 !important; font-weight:700; } | |
| .gr-button:hover{ transform: translateY(-1px); box-shadow:0 6px 18px rgba(0,0,0,.35); } | |
| .gr-button-primary{ background: linear-gradient(180deg,#8ab4ff,#7aa2ff) !important; } | |
| .gr-button-secondary{ background: linear-gradient(180deg,#a78bfa,#7aa2ff) !important; color:#0b1020 !important; } | |
| .btn-chip{ padding:12px 16px !important; } | |
| /* Example cards row */ | |
| .examples-row{ display:grid; grid-template-columns:repeat(auto-fit,minmax(180px,1fr)); gap:12px; margin:12px 0 8px; } | |
| .cardbtn{ border:1px solid var(--border) !important; color:#0b1020 !important; } | |
| .grad-blue{ background: linear-gradient(180deg,#93c5fd,#60a5fa) !important; } | |
| .grad-purple{ background: linear-gradient(180deg,#c4b5fd,#a78bfa) !important; } | |
| .grad-green{ background: linear-gradient(180deg,#bbf7d0,#34d399) !important; } | |
| .grad-amber{ background: linear-gradient(180deg,#fde68a,#f59e0b) !important; } | |
| .grad-rose{ background: linear-gradient(180deg,#fecaca,#fb7185) !important; } | |
| .cardbtn:hover{ filter:brightness(1.02); transform:translateY(-1px); } | |
| /* Subtle spacing fixes */ | |
| .gap, .padded{ padding:12px !important; } | |
| """ | |
| # βββ HELPERS βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def create_default_timeseries(): | |
| x = np.arange(256) | |
| ts1 = np.sin(x / 10) * 5.0 | |
| ts1[103:] -= 10.0 | |
| ts2 = x * 0.01 | |
| ts2[100] += 10.0 | |
| return pd.DataFrame({"TS1": ts1, "TS2": ts2}) | |
| def process_csv_file(csv_file): | |
| if csv_file is None: | |
| return None, "No file uploaded" | |
| try: | |
| df = pd.read_csv(csv_file.name) | |
| df.columns = [str(c).strip() for c in df.columns] | |
| df = df.loc[:, [c for c in df.columns if c]] | |
| df = df.dropna(axis=1, how="all") | |
| if df.shape[1] == 0: | |
| return None, "No valid time-series columns found." | |
| if df.shape[1] > 15: | |
| return None, f"Too many series ({df.shape[1]}). Max allowed = 15." | |
| ts_names = [] | |
| ts_list = [] | |
| for name in df.columns: | |
| series = df[name] | |
| if not pd.api.types.is_float_dtype(series): | |
| series = pd.to_numeric(series, errors='coerce') | |
| last_valid = series.last_valid_index() | |
| if last_valid is None: | |
| continue | |
| trimmed = series.loc[:last_valid].to_numpy(dtype=np.float32) | |
| L = trimmed.shape[0] | |
| if L < 64 or L > 1024: | |
| return None, f"Series '{name}' length {L} invalid. Must be 64 to 1024." | |
| ts_names.append(name); ts_list.append(trimmed) | |
| if not ts_list: | |
| return None, "All time series are empty after trimming NaNs." | |
| return df, f"Loaded {len(ts_names)} series: {', '.join(ts_names)}" | |
| except Exception as e: | |
| return None, f"Error processing file: {str(e)}" | |
| def preview_csv(csv_file, use_default): | |
| if csv_file is None: | |
| return gr.LinePlot(value=pd.DataFrame()), "Please upload a CSV file first", gr.Dropdown(), False | |
| df, message = process_csv_file(csv_file) | |
| if df is None: | |
| return gr.LinePlot(value=pd.DataFrame()), message, gr.Dropdown(), False | |
| choices = list(df.columns) | |
| first = choices[0] | |
| df_idx = df.copy(); df_idx["_i"] = np.arange(len(df[first].values)) | |
| plot = gr.LinePlot(df_idx, x="_i", y=first, title=f"Time Series: {first}") | |
| dropdown = gr.Dropdown(choices=choices, value=first, label="Select a Column to Visualize") | |
| return plot, message, dropdown, False | |
| def clear_csv(): | |
| return gr.LinePlot(value=pd.DataFrame()), "Cleared.", gr.Dropdown() | |
| def update_plot(csv_file, selected_column, use_default_state): | |
| if (csv_file is None and not use_default_state) or selected_column is None: | |
| return gr.LinePlot(value=pd.DataFrame()) | |
| if csv_file is None and use_default_state: | |
| df = create_default_timeseries() | |
| else: | |
| df, _ = process_csv_file(csv_file) | |
| if df is None: | |
| return gr.LinePlot(value=pd.DataFrame()) | |
| df_idx = df.copy(); df_idx["_i"] = np.arange(len(df[selected_column].values)) | |
| return gr.LinePlot(df_idx, x="_i", y=selected_column, title=f"Time Series: {selected_column}") | |
| def initialize_interface(): | |
| df = create_default_timeseries() | |
| choices = list(df.columns); first = choices[0] | |
| df_idx = df.copy(); df_idx["_i"] = np.arange(len(df[first].values)) | |
| plot = gr.LinePlot(df_idx, x="_i", y=first, title=f"Time Series: {first}") | |
| dropdown = gr.Dropdown(choices=choices, value=first, label="Select a Column to Visualize") | |
| msg = "Using default time series (TS1 and TS2). Select a series from the dropdown for visualization." | |
| return plot, msg, dropdown, True | |
| # βββ INFERENCE βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def infer_chatts_stream(prompt: str, csv_file, use_default): | |
| if not prompt.strip(): | |
| yield "Please enter a prompt" | |
| return | |
| if csv_file is None and use_default: | |
| df = create_default_timeseries() | |
| error_msg = None | |
| else: | |
| df, error_msg = process_csv_file(csv_file) | |
| if df is None: | |
| yield "Please upload a CSV file first or the file contains errors" | |
| return | |
| try: | |
| ts_names, ts_list = [], [] | |
| for name in df.columns: | |
| series = df[name]; last_valid = series.last_valid_index() | |
| if last_valid is not None: | |
| trimmed = series.loc[:last_valid].to_numpy(dtype=np.float32) | |
| ts_names.append(name); ts_list.append(trimmed) | |
| if not ts_list: | |
| yield "No valid time series data found. Please upload time series first." | |
| return | |
| clean_prompt = prompt.replace("<ts>", "").replace("<ts/>", "") | |
| prefix = f"I have {len(ts_list)} time series:\n" | |
| for name, arr in zip(ts_names, ts_list): | |
| prefix += f"The {name} is of length {len(arr)}: <ts><ts/>\n" | |
| full_prompt = ( | |
| "<|im_start|>system\nYou are a helpful assistant. Your name is ChatTS. " | |
| "You can analyze time series data and provide insights. If user asks who you are, you should give your name and capabilities " | |
| "in the language of the prompt. If no time series are provided, you should say 'I cannot answer this question as you haven't provide the timeseries I need' " | |
| "in the language of the prompt. Always check if the user has provided at least one time series data before answering." | |
| "<|im_end|><|im_start|>user\n" | |
| f"{prefix}{clean_prompt} Please output a step-by-step analysis about the time series attributes that mentioned in the question first, " | |
| "and then give a detailed result about this question. Always remember to carefully double check the values before answer the results." | |
| "<|im_end|><|im_start|>assistant\n" | |
| ) | |
| inputs = processor(text=[full_prompt], timeseries=ts_list, padding=True, return_tensors="pt") | |
| inputs = {k: v.to(model.device) for k, v in inputs.items()} | |
| streamer = TextIteratorStreamer(tokenizer, timeout=10., skip_prompt=True, skip_special_tokens=True) | |
| inputs.update({"max_new_tokens": 512, "streamer": streamer, "temperature": 0.3}) | |
| thread = Thread(target=model.generate, kwargs=inputs); thread.start() | |
| out = "" | |
| for new_text in streamer: | |
| out += new_text | |
| yield out | |
| except Exception as e: | |
| yield f"Error during inference: {str(e)}" | |
| # βββ EXAMPLES (semantic names) βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| EXAMPLE_PROMPTS = { | |
| "π Detect Spikes": "Identify all spikes in each series and report timestamps and magnitudes. Explain briefly.", | |
| "π Trend & Seasonality": "Describe the trend and seasonality for the provided time series TS1.", | |
| "π Compare Metrics": "Compare the two series (TS1 and TS2). Are there lagged correlations? Estimate the lag and correlation strength.", | |
| "β‘ Local Change Analysis": "Find intervals with >10% rise or drop relative to the prior 20 points for TS1. Return intervals and reasons.", | |
| "π Correlation Strength": "Quantify Pearson correlation between each pair of series (TS1 and TS2) and highlight the strongest positive/negative pairs." | |
| } | |
| # βββ UI ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Blocks(title="ChatTS Demo", css=CSS) as demo: | |
| use_default_state = gr.State(value=True) | |
| gr.HTML("<h3 style='text-align:center; color:#e8ebfa; font-size:20px; font-weight:700; margin-top:8px;'>π‘ Click an example below and click Run ChatTS to try different questions with default time series or upload your own CSV file.</h3>") | |
| with gr.Row(elem_classes="examples-row"): | |
| btns = {} | |
| grads = ["grad-blue","grad-purple","grad-green","grad-amber","grad-rose"] | |
| for (title, prompt), grad in zip(EXAMPLE_PROMPTS.items(), grads): | |
| btns[title] = gr.Button(title, elem_classes=f"cardbtn btn-chip {grad}") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| upload = gr.File(label="", file_types=[".csv"], type="filepath") | |
| prompt_input = gr.Textbox( | |
| lines=6, | |
| placeholder="Enter your question here...", | |
| label="Analysis Prompt", | |
| value="Find the maximum and minimum values in each series and comment on them." | |
| ) | |
| run_btn = gr.Button("Run ChatTS", variant="primary") | |
| with gr.Column(scale=2): | |
| series_selector = gr.Dropdown(label="Select a Column to Visualize", choices=[], value=None) | |
| plot_out = gr.LinePlot(value=pd.DataFrame(), label="Time Series Visualization") | |
| file_status = gr.Textbox(label="File Status", interactive=False, lines=2) | |
| text_out = gr.Textbox(lines=10, label="ChatTS Analysis Results", interactive=False) | |
| demo.load(fn=initialize_interface, outputs=[plot_out, file_status, series_selector, use_default_state]) | |
| for title, prompt in EXAMPLE_PROMPTS.items(): | |
| btns[title].click(fn=lambda p=prompt: p, outputs=prompt_input) | |
| upload.upload(fn=preview_csv, inputs=[upload, use_default_state], | |
| outputs=[plot_out, file_status, series_selector, use_default_state]) | |
| upload.clear(fn=clear_csv, outputs=[plot_out, file_status, series_selector]) | |
| series_selector.change(fn=update_plot, inputs=[upload, series_selector, use_default_state], outputs=[plot_out]) | |
| run_btn.click(fn=infer_chatts_stream, inputs=[prompt_input, upload, use_default_state], outputs=[text_out]) | |
| if __name__ == '__main__': | |
| demo.launch() | |