#!/usr/bin/env python3 """Centralized access to Hugging Face models with ensemble sentiment.""" from __future__ import annotations import logging import threading from dataclasses import dataclass from typing import Any, Dict, List, Mapping, Optional, Sequence from config import HUGGINGFACE_MODELS, get_settings # Set environment variables to avoid TensorFlow/Keras issues # We'll force PyTorch framework instead import os import sys # Completely disable TensorFlow to force PyTorch os.environ.setdefault('TRANSFORMERS_NO_ADVISORY_WARNINGS', '1') os.environ.setdefault('TRANSFORMERS_VERBOSITY', 'error') os.environ.setdefault('TF_CPP_MIN_LOG_LEVEL', '3') os.environ.setdefault('TRANSFORMERS_FRAMEWORK', 'pt') # Mock tf_keras to prevent transformers from trying to import it # This prevents the broken tf-keras installation from causing errors class TfKerasMock: """Mock tf_keras to prevent import errors when transformers checks for TensorFlow""" pass # Add mock to sys.modules before transformers imports sys.modules['tf_keras'] = TfKerasMock() sys.modules['tf_keras.src'] = TfKerasMock() sys.modules['tf_keras.src.utils'] = TfKerasMock() try: from transformers import pipeline TRANSFORMERS_AVAILABLE = True except ImportError: TRANSFORMERS_AVAILABLE = False logger = logging.getLogger(__name__) settings = get_settings() HF_MODE = os.getenv("HF_MODE", "off").lower() HF_TOKEN_ENV = os.getenv("HF_TOKEN") if HF_MODE not in ("off", "public", "auth"): HF_MODE = "off" logger.warning(f"Invalid HF_MODE, defaulting to 'off'") if HF_MODE == "auth" and not HF_TOKEN_ENV: HF_MODE = "off" logger.warning("HF_MODE='auth' but HF_TOKEN not set, defaulting to 'off'") ACTIVE_MODELS = [ "ElKulako/cryptobert", "kk08/CryptoBERT", "ProsusAI/finbert" ] LEGACY_MODELS = [ "burakutf/finetuned-finbert-crypto", "mathugo/crypto_news_bert", "svalabs/twitter-xlm-roberta-bitcoin-sentiment", "mayurjadhav/crypto-sentiment-model", "cardiffnlp/twitter-roberta-base-sentiment", "mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis", "agarkovv/CryptoTrader-LM" ] CRYPTO_SENTIMENT_MODELS = ACTIVE_MODELS[:2] + LEGACY_MODELS[:2] SOCIAL_SENTIMENT_MODELS = LEGACY_MODELS[2:4] FINANCIAL_SENTIMENT_MODELS = [ACTIVE_MODELS[2]] + [LEGACY_MODELS[4]] NEWS_SENTIMENT_MODELS = [LEGACY_MODELS[5]] DECISION_MODELS = [LEGACY_MODELS[6]] @dataclass(frozen=True) class PipelineSpec: key: str task: str model_id: str requires_auth: bool = False category: str = "sentiment" MODEL_SPECS: Dict[str, PipelineSpec] = {} # Legacy models for lk in ["sentiment_twitter", "sentiment_financial", "summarization", "crypto_sentiment"]: if lk in HUGGINGFACE_MODELS: MODEL_SPECS[lk] = PipelineSpec( key=lk, task="sentiment-analysis" if "sentiment" in lk else "summarization", model_id=HUGGINGFACE_MODELS[lk], category="legacy" ) for i, mid in enumerate(ACTIVE_MODELS): MODEL_SPECS[f"active_{i}"] = PipelineSpec( key=f"active_{i}", task="sentiment-analysis", model_id=mid, category="crypto_sentiment" if i < 2 else "financial_sentiment", requires_auth=("ElKulako" in mid) ) for i, mid in enumerate(CRYPTO_SENTIMENT_MODELS): MODEL_SPECS[f"crypto_sent_{i}"] = PipelineSpec( key=f"crypto_sent_{i}", task="sentiment-analysis", model_id=mid, category="crypto_sentiment", requires_auth=("ElKulako" in mid) ) for i, mid in enumerate(SOCIAL_SENTIMENT_MODELS): MODEL_SPECS[f"social_sent_{i}"] = PipelineSpec( key=f"social_sent_{i}", task="sentiment-analysis", model_id=mid, category="social_sentiment" ) for i, mid in enumerate(FINANCIAL_SENTIMENT_MODELS): MODEL_SPECS[f"financial_sent_{i}"] = PipelineSpec( key=f"financial_sent_{i}", task="sentiment-analysis", model_id=mid, category="financial_sentiment" ) for i, mid in enumerate(NEWS_SENTIMENT_MODELS): MODEL_SPECS[f"news_sent_{i}"] = PipelineSpec( key=f"news_sent_{i}", task="sentiment-analysis", model_id=mid, category="news_sentiment" ) class ModelNotAvailable(RuntimeError): pass class ModelRegistry: def __init__(self): self._pipelines = {} self._lock = threading.Lock() self._initialized = False def get_pipeline(self, key: str): if not TRANSFORMERS_AVAILABLE: raise ModelNotAvailable("transformers not installed") if key not in MODEL_SPECS: raise ModelNotAvailable(f"Unknown key: {key}") spec = MODEL_SPECS[key] if key in self._pipelines: return self._pipelines[key] with self._lock: if key in self._pipelines: return self._pipelines[key] if HF_MODE == "off": raise ModelNotAvailable("HF_MODE=off") token_value = None if HF_MODE == "auth": token_value = HF_TOKEN_ENV or settings.hf_token elif HF_MODE == "public": token_value = None if spec.requires_auth and not token_value: raise ModelNotAvailable("Model requires auth but no token available") logger.info(f"Loading model: {spec.model_id} (mode: {HF_MODE})") try: pipeline_kwargs = { 'task': spec.task, 'model': spec.model_id, 'tokenizer': spec.model_id, 'framework': 'pt', 'device': -1, } pipeline_kwargs['token'] = token_value self._pipelines[key] = pipeline(**pipeline_kwargs) except Exception as e: error_msg = str(e) error_lower = error_msg.lower() try: from huggingface_hub.errors import RepositoryNotFoundError, HfHubHTTPError hf_errors = (RepositoryNotFoundError, HfHubHTTPError) except ImportError: hf_errors = () is_auth_error = any(kw in error_lower for kw in ['401', 'unauthorized', 'repository not found', 'expired', 'token']) is_hf_error = isinstance(e, hf_errors) or is_auth_error if is_hf_error: logger.warning(f"HF error for {spec.model_id}: {type(e).__name__}") raise ModelNotAvailable(f"HF error: {spec.model_id}") from e if any(kw in error_lower for kw in ['keras', 'tensorflow', 'tf_keras', 'framework']): try: pipeline_kwargs['torch_dtype'] = 'float32' self._pipelines[key] = pipeline(**pipeline_kwargs) return self._pipelines[key] except Exception: raise ModelNotAvailable(f"Framework error: {spec.model_id}") from e raise ModelNotAvailable(f"Load failed: {spec.model_id}") from e return self._pipelines[key] def get_loaded_models(self): """Get list of all loaded model keys""" return list(self._pipelines.keys()) def get_available_sentiment_models(self): """Get list of all available sentiment model keys""" return [key for key in MODEL_SPECS.keys() if "sent" in key or "sentiment" in key] def initialize_models(self): if self._initialized: return {"status": "already_initialized", "mode": HF_MODE, "models_loaded": len(self._pipelines)} if HF_MODE == "off": self._initialized = True return {"status": "disabled", "mode": "off", "models_loaded": 0, "loaded": [], "failed": []} if not TRANSFORMERS_AVAILABLE: return {"status": "transformers_not_available", "mode": HF_MODE, "models_loaded": 0} loaded, failed = [], [] active_keys = [f"active_{i}" for i in range(len(ACTIVE_MODELS))] for key in active_keys: try: self.get_pipeline(key) loaded.append(key) except ModelNotAvailable as e: failed.append((key, str(e)[:100])) except Exception as e: error_msg = str(e)[:100] failed.append((key, error_msg)) self._initialized = True status = "initialized" if loaded else "partial" return {"status": status, "mode": HF_MODE, "models_loaded": len(loaded), "loaded": loaded, "failed": failed} _registry = ModelRegistry() AI_MODELS_SUMMARY = {"status": "not_initialized", "mode": "off", "models_loaded": 0, "loaded": [], "failed": []} def initialize_models(): global AI_MODELS_SUMMARY result = _registry.initialize_models() AI_MODELS_SUMMARY = result return result def ensemble_crypto_sentiment(text: str) -> Dict[str, Any]: if not TRANSFORMERS_AVAILABLE or HF_MODE == "off": return {"label": "neutral", "confidence": 0.0, "scores": {}, "model_count": 0, "error": "HF disabled" if HF_MODE == "off" else "transformers N/A"} results, labels_count, total_conf = {}, {"bullish": 0, "bearish": 0, "neutral": 0}, 0.0 loaded_keys = _registry.get_loaded_models() available_keys = [key for key in loaded_keys if "sent" in key or "sentiment" in key or key.startswith("active_")] if not available_keys: return {"label": "neutral", "confidence": 0.0, "scores": {}, "model_count": 0, "error": "No models loaded"} for key in available_keys: try: pipe = _registry.get_pipeline(key) res = pipe(text[:512]) if isinstance(res, list) and res: res = res[0] label = res.get("label", "NEUTRAL").upper() score = res.get("score", 0.5) mapped = "bullish" if "POSITIVE" in label or "BULLISH" in label else ("bearish" if "NEGATIVE" in label or "BEARISH" in label else "neutral") spec = MODEL_SPECS.get(key) if spec: results[spec.model_id] = {"label": mapped, "score": score} else: results[key] = {"label": mapped, "score": score} labels_count[mapped] += 1 total_conf += score except ModelNotAvailable: continue except Exception as e: logger.warning(f"Ensemble failed for {key}: {e}") if not results: return {"label": "neutral", "confidence": 0.0, "scores": {}, "model_count": 0, "error": "All models failed"} final = max(labels_count, key=labels_count.get) avg_conf = total_conf / len(results) return {"label": final, "confidence": avg_conf, "scores": results, "model_count": len(results)} def analyze_crypto_sentiment(text: str): return ensemble_crypto_sentiment(text) def analyze_financial_sentiment(text: str): if not TRANSFORMERS_AVAILABLE: return {"label": "neutral", "score": 0.5, "error": "transformers N/A"} try: pipe = _registry.get_pipeline("financial_sent_0") res = pipe(text[:512]) if isinstance(res, list) and res: res = res[0] return {"label": res.get("label", "neutral").lower(), "score": res.get("score", 0.5)} except Exception as e: logger.error(f"Financial sentiment failed: {e}") return {"label": "neutral", "score": 0.5, "error": str(e)} def analyze_social_sentiment(text: str): if not TRANSFORMERS_AVAILABLE: return {"label": "neutral", "score": 0.5, "error": "transformers N/A"} try: pipe = _registry.get_pipeline("social_sent_0") res = pipe(text[:512]) if isinstance(res, list) and res: res = res[0] return {"label": res.get("label", "neutral").lower(), "score": res.get("score", 0.5)} except Exception as e: logger.error(f"Social sentiment failed: {e}") return {"label": "neutral", "score": 0.5, "error": str(e)} def analyze_market_text(text: str): return ensemble_crypto_sentiment(text) def analyze_chart_points(data: Sequence[Mapping[str, Any]], indicators: Optional[List[str]] = None): if not data: return {"trend": "neutral", "strength": 0, "analysis": "No data"} prices = [float(p.get("price", 0)) for p in data if p.get("price")] if not prices: return {"trend": "neutral", "strength": 0, "analysis": "No price data"} first, last = prices[0], prices[-1] change = ((last - first) / first * 100) if first > 0 else 0 if change > 5: trend, strength = "bullish", min(abs(change) / 10, 1.0) elif change < -5: trend, strength = "bearish", min(abs(change) / 10, 1.0) else: trend, strength = "neutral", abs(change) / 5 return {"trend": trend, "strength": strength, "change_pct": change, "support": min(prices), "resistance": max(prices), "analysis": f"Price moved {change:.2f}% showing {trend} trend"} def analyze_news_item(item: Dict[str, Any]): text = item.get("title", "") + " " + item.get("description", "") sent = ensemble_crypto_sentiment(text) return {**item, "sentiment": sent["label"], "sentiment_confidence": sent["confidence"], "sentiment_details": sent} def get_model_info(): return { "transformers_available": TRANSFORMERS_AVAILABLE, "hf_mode": HF_MODE, "hf_token_configured": bool(HF_TOKEN_ENV or settings.hf_token) if HF_MODE == "auth" else False, "models_initialized": _registry._initialized, "models_loaded": len(_registry._pipelines), "active_models": ACTIVE_MODELS, "total_models": len(MODEL_SPECS) } def registry_status(): return { "initialized": _registry._initialized, "pipelines_loaded": len(_registry._pipelines), "available_models": list(MODEL_SPECS.keys()), "transformers_available": TRANSFORMERS_AVAILABLE }