import warnings warnings.filterwarnings("ignore") import os import re import numpy as np import pandas as pd import matplotlib.pyplot as plt import gradio as gr from statsmodels.tsa.holtwinters import ExponentialSmoothing, Holt from transformers import AutoModelForCausalLM, AutoTokenizer import torch from typing import List, Tuple, Optional MODEL_CHOICES = { "Qwen 0.5B Instruct": "Qwen/Qwen2.5-0.5B-Instruct", "Llama 3.2 1B Instruct": "meta-llama/Llama-3.2-1B-Instruct", "Llama 3.2 3B Instruct": "meta-llama/Llama-3.2-3B-Instruct", "Phi-3.5 mini Instruct": "microsoft/Phi-3.5-mini-instruct", } CURRENT_MODEL_NAME = "Qwen 0.5B Instruct" _tokenizer = None _model = None _device = torch.device("cpu") def _get_hf_token() -> str: return os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACEHUB_API_TOKEN") or "" def load_llm(): global _tokenizer, _model model_id = MODEL_CHOICES[CURRENT_MODEL_NAME] if _tokenizer is not None and getattr(_tokenizer, "_model_id", None) == model_id: return _tokenizer, _model needs_token = model_id.startswith("meta-llama/") token = _get_hf_token() if needs_token and not token: raise RuntimeError( "Для моделей Meta Llama нужен HF токен: примите лицензию на huggingface.co " "и установите переменную окружения HF_TOKEN=<ваш_токен>." ) try: _tokenizer = AutoTokenizer.from_pretrained(model_id, token=token or None) _tokenizer._model_id = model_id _model = AutoModelForCausalLM.from_pretrained( model_id, token=token or None, torch_dtype=torch.float32, low_cpu_mem_usage=True, ) if _tokenizer.pad_token_id is None: _tokenizer.pad_token_id = _tokenizer.eos_token_id _model.to(_device).eval() return _tokenizer, _model except Exception as e: hint = "" el = str(e).lower() if "gated" in el or "unauthorized" in el or "forbidden" in el: hint = " Нет доступа к модели (примите лицензию и используйте HF_TOKEN)." elif "out of memory" in el or "oom" in el or "ram" in el: hint = " Недостаточно памяти: выберите Qwen 0.5B или Llama 1B." raise RuntimeError(f"Не удалось загрузить {model_id}: {e}.{hint}") _KEEP = re.compile(r"[^А-Яа-яЁё0-9 ,.!?:;()«»\"'–—\-•\n]") def _clean_ru(text: str) -> str: text = _KEEP.sub(" ", text) text = re.sub(r"\s+", " ", text).strip() text = text.replace(" • ", "\n• ").replace(" - ", "\n- ") return text GLOBAL_DF_CACHE: Optional[pd.DataFrame] = None def _normalize_columns(df: pd.DataFrame) -> pd.DataFrame: work = df.copy() for col in list(work.columns): lc = col.lower() if lc in ["date", "дата"]: work.rename(columns={col: "date"}, inplace=True) elif lc in ["amount", "сумма"]: work.rename(columns={col: "amount"}, inplace=True) elif lc in ["category", "категория"]: work.rename(columns={col: "category"}, inplace=True) elif lc in ["type", "тип"]: work.rename(columns={col: "type"}, inplace=True) required = {"date", "amount", "type"} missing = required - set(map(str, work.columns)) if missing: raise ValueError(f"Отсутствуют колонки: {', '.join(missing)}") work["date"] = pd.to_datetime(work["date"], errors="coerce") work = work.dropna(subset=["date"]) return work def _is_expense(t: str) -> bool: t = str(t).strip().lower() return t in ["expense", "расход", "расходы", "-", "e", "exp"] def _is_income(t: str) -> bool: t = str(t).strip().lower() return t in ["income", "доход", "+", "i", "inc"] def _prepare_components_series(df: pd.DataFrame, freq: str = "M"): if df is None or df.empty: raise ValueError("Пустая таблица транзакций.") work = _normalize_columns(df) work["amount"] = pd.to_numeric(work["amount"], errors="coerce").fillna(0.0) work["is_expense"] = work["type"].apply(_is_expense) work["is_income"] = work["type"].apply(_is_income) inc = work.loc[work["is_income"]].set_index("date")["amount"].resample(freq).sum().sort_index() exp = work.loc[work["is_expense"]].set_index("date")["amount"].abs().mul(-1).resample(freq).sum().sort_index() if not inc.empty or not exp.empty: start = min([x.index.min() for x in [inc, exp] if not x.empty]) end = max([x.index.max() for x in [inc, exp] if not x.empty]) full_idx = pd.date_range(start, end, freq=freq) inc = inc.reindex(full_idx, fill_value=0.0) exp = exp.reindex(full_idx, fill_value=0.0) inc.index.name = exp.index.name = "period_end" net = inc + exp return inc, exp, net def _fit_and_forecast(history: pd.Series, steps: int, freq: str) -> pd.Series: if len(history) < 3: last = float(history.iloc[-1]) if len(history) else 0.0 start = (history.index[-1] if len(history) else pd.Timestamp.today().normalize()) + \ pd.tseries.frequencies.to_offset(freq) idx = pd.date_range(start, periods=steps, freq=freq) return pd.Series([last]*steps, index=idx, name="forecast") try: if freq.startswith("A"): model = Holt(history, initialization_method="estimated") elif len(history) >= 24: model = ExponentialSmoothing(history, trend="add", seasonal="add", seasonal_periods=12, initialization_method="estimated") else: model = Holt(history, initialization_method="estimated") fit = model.fit(optimized=True) fc = fit.forecast(steps) if not isinstance(fc.index, pd.DatetimeIndex) or len(fc.index) != steps: start = history.index[-1] + pd.tseries.frequencies.to_offset(freq) idx = pd.date_range(start, periods=steps, freq=freq) fc = pd.Series(np.asarray(fc), index=idx, name="forecast") return fc except Exception: tail = min(6, len(history)) baseline = float(history.tail(tail).mean()) start = history.index[-1] + pd.tseries.frequencies.to_offset(freq) idx = pd.date_range(start, periods=steps, freq=freq) return pd.Series([baseline]*steps, index=idx, name="forecast") def build_split_plot(inc_hist: pd.Series, inc_fc: pd.Series, exp_hist: pd.Series, exp_fc: pd.Series): fig, ax = plt.subplots(figsize=(9, 4.8)) ax.plot(inc_hist.index, inc_hist.values, label="Доходы (история)", color="blue", linewidth=2) ax.plot(inc_fc.index, inc_fc.values, label="Доходы (прогноз)", color="blue", linestyle="--", linewidth=2) if len(inc_hist) and len(inc_fc): ax.plot([inc_hist.index[-1], inc_fc.index[0]], [inc_hist.values[-1], inc_fc.values[0]], color="blue", linestyle="--", linewidth=2) ax.plot(exp_hist.index, exp_hist.values, label="Расходы (история)", color="red", linewidth=2) ax.plot(exp_fc.index, exp_fc.values, label="Расходы (прогноз)", color="red", linestyle="--", linewidth=2) if len(exp_hist) and len(exp_fc): ax.plot([exp_hist.index[-1], exp_fc.index[0]], [exp_hist.values[-1], exp_fc.values[0]], color="red", linestyle="--", linewidth=2) ax.axhline(0, linewidth=1, alpha=0.6) ax.set_title("Доходы и расходы (конец периода)") ax.set_xlabel("Период") ax.set_ylabel("Сумма") ax.legend(ncol=2) fig.tight_layout() return fig def _current_month_summary(df: pd.DataFrame) -> dict: if df is None or df.empty: return {} work = _normalize_columns(df) if "category" not in work.columns: work["category"] = "Без категории" work["is_income"] = work["type"].apply(_is_income) work["is_expense"] = work["type"].apply(_is_expense) last_period = work["date"].dt.to_period("M").max() m_mask = work["date"].dt.to_period("M") == last_period cur = work.loc[m_mask].copy() if cur.empty: return {} income_total = float(cur.loc[cur["is_income"], "amount"].sum()) expense_total = -float(cur.loc[cur["is_expense"], "amount"].abs().sum()) net = income_total + expense_total exp_df = cur.loc[cur["is_expense"], ["category", "amount"]].copy() exp_df["amount"] = -exp_df["amount"].abs() top = exp_df.groupby("category")["amount"].sum().sort_values().head(5) top_cats = [(str(k), float(v)) for k, v in top.items()] return { "month": str(last_period), "income_total": income_total, "expense_total": expense_total, "net": net, "top_expense_categories": top_cats, } EXAMPLE_DF = pd.DataFrame( [ {"date": "2024-12-05", "amount": 120000, "category": "Зарплата", "type": "income"}, {"date": "2025-01-10", "amount": 30000, "category": "Проект", "type": "income"}, {"date": "2025-01-12", "amount": 15000, "category": "Кварплата","type": "expense"}, {"date": "2025-01-22", "amount": 8000, "category": "Связь", "type": "expense"}, {"date": "2025-02-05", "amount": 120000, "category": "Зарплата", "type": "income"}, {"date": "2025-02-14", "amount": 17000, "category": "Еда", "type": "expense"}, {"date": "2025-02-20", "amount": 6000, "category": "Транспорт","type": "expense"}, {"date": "2025-03-05", "amount": 120000, "category": "Зарплата", "type": "income"}, {"date": "2025-03-14", "amount": 19000, "category": "Еда", "type": "expense"}, {"date": "2025-03-21", "amount": 7000, "category": "Подписки", "type": "expense"}, ] ) GLOBAL_DF_CACHE = EXAMPLE_DF.copy() def forecast_ui(df: pd.DataFrame, horizon_choice: str, granularity: str): global GLOBAL_DF_CACHE GLOBAL_DF_CACHE = df.copy() if df is not None else None use_year = granularity.lower().endswith("годам") freq = "A-DEC" if use_year else "M" inc, exp, _ = _prepare_components_series(df, freq=freq) steps = 1 if "месяц" in horizon_choice.lower() else (1 if use_year else 12) inc_fc = _fit_and_forecast(inc, steps, freq) exp_fc = _fit_and_forecast(exp, steps, freq) fig = build_split_plot(inc, inc_fc, exp, exp_fc) out_df = pd.DataFrame({ "period_end": inc_fc.index.strftime("%Y-%m-%d"), "income_forecast": inc_fc.values, "expense_forecast": exp_fc.values }) tip = "Прогноз построен отдельно для доходов и расходов." return fig, out_df, tip def _chat_generate(messages, tok, mdl, max_new_tokens=260, deterministic=True): text = tok.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) inputs = tok(text, return_tensors="pt", padding=True, truncation=True, max_length=1400).to(_device) with torch.no_grad(): if deterministic: out = mdl.generate( **inputs, max_new_tokens=max_new_tokens, do_sample=False, num_beams=4, repetition_penalty=1.08, no_repeat_ngram_size=5, eos_token_id=tok.eos_token_id, pad_token_id=tok.pad_token_id, ) else: out = mdl.generate( **inputs, max_new_tokens=max_new_tokens, do_sample=True, temperature=0.8, top_p=0.9, top_k=50, repetition_penalty=1.15, no_repeat_ngram_size=6, eos_token_id=tok.eos_token_id, pad_token_id=tok.pad_token_id, ) return tok.decode(out[0], skip_special_tokens=True) def _to_bullets(text: str) -> str: if not text: return "" m = re.search(r"(\n\s*[-*]\s+|\n\s*\d+[\).\s]+)", "\n" + text) if m: text = text[m.start():] text = re.sub(r"^\s*[*•]\s+", "- ", text, flags=re.M) text = re.sub(r"^\s*\d+[\).\s]+", "- ", text, flags=re.M) kill = re.compile(r"(?i)(учитывай данные|данные пользователя|месяц:|доход:|расход:|нетто:|топ стат|вопрос:|assistant)") only_punct = re.compile(r"^[-•\s\.\,\;\:\!\?]+$") too_short = re.compile(r"^-\s{0,2}.$") lines = [] for ln in text.split("\n"): s = ln.strip() if not s or not s.startswith("- "): continue if kill.search(s): continue if only_punct.match(s) or too_short.match(s): continue s = re.sub(r"\s{2,}", " ", s) s = re.sub(r"\.\s*\.+$", ".", s) lines.append(s) uniq, seen = [], set() for s in lines: key = s.lower() if key in seen: continue seen.add(key) uniq.append(s) uniq = uniq[:7] return "\n".join(s.replace("- ", "• ", 1) for s in uniq) def llm_reply(history: List[Tuple[str, str]], user_msg: str, df_state: Optional[pd.DataFrame]): s = _current_month_summary(df_state) system_msg = ( "Ты финансовый помощник. Отвечай по-русски. " "Верни ТОЛЬКО список из 5–7 конкретных шагов экономии с цифрами (лимиты, проценты, частота). " "Каждая строка должна начинаться с символов \"- \". Никаких вступлений, пояснений, заголовков." ) if s: ctx_lines = [ f"Месяц: {s['month']}", f"Доход: {s['income_total']:.0f}", f"Расход: {abs(s['expense_total']):.0f}", f"Нетто: {s['net']:.0f}", ] if s.get("top_expense_categories"): ctx_lines.append("Топ статей расходов:") for cat, val in s["top_expense_categories"]: ctx_lines.append(f"- {cat}: {abs(val):.0f}") context = "\n".join(ctx_lines) else: context = "Данных за текущий месяц нет." messages = [ {"role": "system", "content": system_msg}, {"role": "user", "content": ( f"Мои данные за текущий месяц:\n{context}\n\n" f"Вопрос: {user_msg}\n" "Начни ответ сразу со строки, которая начинается с \"- \". Верни только список из 5–7 пунктов." )}, ] tok, mdl = load_llm() raw1 = _chat_generate(messages, tok, mdl, max_new_tokens=300, deterministic=True) out = _to_bullets(_clean_ru(raw1)) if out.count("\n") + 1 < 3: raw2 = _chat_generate(messages, tok, mdl, max_new_tokens=300, deterministic=False) out2 = _to_bullets(_clean_ru(raw2)) if out2.count("\n") + 1 >= 3: return out2 return out with gr.Blocks(title="Бюджетный прогноз + чат-советник") as demo: gr.Markdown(""" # Бюджетный прогноз + чат-советник График разделён на **доходы** (синие) и **расходы** (красные). История — сплошная линия, прогноз — пунктир. Период — конец месяца/года. """) with gr.Tab("Прогноз бюджета"): gr.Markdown(""" **Как пользоваться:** 1) Отредактируйте таблицу или загрузите свою (колонки: `date`, `amount`, `category`, `type`). 2) Выберите горизонт и гранулярность. 3) Нажмите **Построить прогноз**. """) df_input = gr.Dataframe( value=EXAMPLE_DF, headers=["date", "amount", "category", "type"], datatype=["date", "number", "str", "str"], row_count=(10, "dynamic"), col_count=(4, "fixed"), label="Транзакции", ) cache_ping = gr.Textbox(visible=False) def _set_cache(df): global GLOBAL_DF_CACHE GLOBAL_DF_CACHE = df return "ok" df_input.change(_set_cache, inputs=df_input, outputs=cache_ping, queue=False) horizon = gr.Radio(["Следующий месяц", "Следующий год"], value="Следующий месяц", label="Горизонт прогноза") granularity = gr.Radio(["По месяцам", "По годам"], value="По месяцам", label="Гранулярность графика") run_btn = gr.Button("Построить прогноз") plot = gr.Plot(label="Доходы и расходы (конец периода)") table = gr.Dataframe(label="Таблица прогноза", interactive=False) note = gr.Markdown() run_btn.click(forecast_ui, inputs=[df_input, horizon, granularity], outputs=[plot, table, note]) with gr.Tab("Чат-советник"): model_choice = gr.Dropdown(choices=list(MODEL_CHOICES.keys()), value=CURRENT_MODEL_NAME, label="Модель для советов") model_status = gr.Markdown() def _switch_model(name): global CURRENT_MODEL_NAME, _tokenizer, _model CURRENT_MODEL_NAME = name _tokenizer = None _model = None try: load_llm() return f"Модель активна: **{name}**" except Exception as e: CURRENT_MODEL_NAME = "Qwen 0.5B Instruct" _tokenizer = None _model = None try: load_llm() except Exception: pass return f"Ошибка при загрузке «{name}»: {e}\n↩️ Откат на **Qwen 0.5B Instruct**." model_choice.change(_switch_model, inputs=model_choice, outputs=model_status, queue=False) gr.Markdown("Советы формируются с учётом **данных текущего месяца** (доход/расход/нетто и топ-категории).") chatbot = gr.Chatbot(height=360) msg = gr.Textbox(placeholder="Например: «Как мне сэкономить?» или «Как сократить траты на транспорт?»", label="Сообщение") send = gr.Button("Отправить") clear = gr.Button("Очистить") def user_send(user_message, history): if not user_message: return gr.update(), history reply = llm_reply([(u, a or "") for u, a in (history or []) if u], user_message, GLOBAL_DF_CACHE) history = (history or []) + [(user_message, reply)] return "", history send.click(user_send, inputs=[msg, chatbot], outputs=[msg, chatbot]) msg.submit(user_send, inputs=[msg, chatbot], outputs=[msg, chatbot]) clear.click(lambda: None, None, chatbot, queue=False) if __name__ == "__main__": demo.launch()