boyinfuture's picture
addind the news classifier and parser
c6fb015
raw
history blame
1.75 kB
# backend/main.py
from fastapi import FastAPI, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from uuid import UUID
import models.analysis_job as model
import schemas
from core.database import SessionLocal, engine
from tasks.data_tasks import run_data_analysis
from tasks.news_tasks import run_intelligence_analysis
from celery import chain
model.Base.metadata.create_all(bind=engine)
app = FastAPI(title="Quantitative Analysis Platform API", version="0.1.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]
)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/jobs", response_model=schemas.Job, status_code=201)
def create_analysis_job(job_request: schemas.JobCreate, db: Session = Depends(get_db)):
db_job = model.AnalysisJob(ticker=job_request.ticker.upper())
db.add(db_job)
db.commit()
db.refresh(db_job)
# THE CRITICAL CHANGE IS HERE
analysis_chain = chain(
run_data_analysis.s(str(db_job.id), db_job.ticker),
# By making the signature immutable, we tell Celery to ignore
# the result of the previous task and only use the arguments we provide.
run_intelligence_analysis.s(str(db_job.id)).set(immutable=True)
)
analysis_chain.apply_async()
return db_job
@app.get("/jobs/{job_id}", response_model=schemas.Job)
def get_job_status(job_id: UUID, db: Session = Depends(get_db)):
db_job = db.query(model.AnalysisJob).filter(model.AnalysisJob.id == job_id).first()
if db_job is None: raise HTTPException(status_code=404, detail="Job not found")
return db_job