70 lines
2.7 KiB
Python
70 lines
2.7 KiB
Python
from fastapi import FastAPI, BackgroundTasks, Depends
|
|
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
|
|
from sqlalchemy.orm import sessionmaker, declarative_base
|
|
from sqlalchemy import Column, Integer, String, DateTime
|
|
import aiohttp
|
|
from bs4 import BeautifulSoup
|
|
from datetime import datetime
|
|
import os
|
|
|
|
# Récupère les informations des secrets via les variables d'environnement
|
|
DATABASE_URL = os.getenv("DB_URL")
|
|
DATABASE_USERNAME = os.getenv("DB_USERNAME")
|
|
DATABASE_PASSWORD = os.getenv("DB_PASSWORD")
|
|
DATABASE_NAME = os.getenv("DB_NAME")
|
|
|
|
# Créer l'URL complète pour la connexion à la base de données
|
|
DATABASE_URL_FULL = f"postgresql+asyncpg://{DATABASE_USERNAME}:{DATABASE_PASSWORD}@{DATABASE_URL}/{DATABASE_NAME}"
|
|
|
|
engine = create_async_engine(DATABASE_URL_FULL, echo=True)
|
|
AsyncSessionLocal = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)
|
|
Base = declarative_base()
|
|
|
|
app = FastAPI()
|
|
|
|
class Loi(Base):
|
|
__tablename__ = "lois"
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
titre = Column(String, index=True)
|
|
url = Column(String, unique=True)
|
|
date = Column(DateTime, default=datetime.utcnow)
|
|
|
|
async def get_db():
|
|
async with AsyncSessionLocal() as session:
|
|
yield session
|
|
|
|
@app.on_event("startup")
|
|
async def startup():
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(Base.metadata.create_all)
|
|
|
|
async def scrape_legifrance(db: AsyncSession):
|
|
url = "https://www.legifrance.gouv.fr/jorf/"
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as response:
|
|
html = await response.text()
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
lois = []
|
|
for article in soup.select(".mainListJorf .titleArticle"):
|
|
titre = article.get_text(strip=True)
|
|
lien = article.find("a")["href"]
|
|
full_url = f"https://www.legifrance.gouv.fr{lien}"
|
|
lois.append((titre, full_url))
|
|
|
|
for titre, full_url in lois:
|
|
existing = await db.execute("SELECT id FROM lois WHERE url = :url", {"url": full_url})
|
|
if existing.fetchone() is None:
|
|
new_loi = Loi(titre=titre, url=full_url)
|
|
db.add(new_loi)
|
|
await db.commit()
|
|
|
|
@app.get("/scrape")
|
|
async def trigger_scrape(background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db)):
|
|
background_tasks.add_task(scrape_legifrance, db)
|
|
return {"message": "Scraping en cours"}
|
|
|
|
@app.get("/lois")
|
|
async def list_lois(db: AsyncSession = Depends(get_db)):
|
|
result = await db.execute("SELECT * FROM lois ORDER BY date DESC LIMIT 10")
|
|
return result.fetchall()
|