Files
SIB/backend/app/api/auth/__init__.py

299 lines
10 KiB
Python

import os
import shutil
from uuid import uuid4
from typing import Annotated, Optional
from fastapi import APIRouter, HTTPException, status, Depends, UploadFile, File, Form
from sqlmodel import Session, select
from app.core.database import get_session
from app.core.security import verify_password, get_password_hash, create_access_token, get_token_payload
from app.models.user import User, DriverProfile, UserRole, VehicleType
from app.api.deps import oauth2_scheme
from app.schemas.user import PassengerCreate, Token, UserResponse, LoginRequest, GoogleLoginRequest
import firebase_admin
from firebase_admin import auth as firebase_auth, credentials
# Initialize Firebase Admin
try:
if not firebase_admin._apps:
# Default initialization (uses GOOGLE_APPLICATION_CREDENTIALS)
firebase_admin.initialize_app()
except Exception as e:
print(f"WARNING: Firebase Admin could not be initialized: {e}")
router = APIRouter(prefix="/api/auth", tags=["auth"])
UPLOAD_DIR = "uploads"
@router.post("/login", response_model=Token)
async def login(
data: LoginRequest,
session: Session = Depends(get_session)
):
print(f"DEBUG: Login attempt for email: {data.email}")
user = session.exec(select(User).where(User.email == data.email)).first()
if not user:
print(f"DEBUG: User not found: {data.email}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not verify_password(data.password, user.hashed_password):
print(f"DEBUG: Invalid password for: {data.email}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
print(f"DEBUG: Successful login for: {data.email} as {user.role}")
# Token expiration can be extended if keep_session is true
import datetime
expires = datetime.timedelta(days=30) if data.keep_session else datetime.timedelta(days=1)
access_token = create_access_token(
subject=user.id,
role=user.role,
full_name=user.full_name,
expires_delta=expires
)
return {
"access_token": access_token,
"token_type": "bearer",
"role": user.role,
"full_name": user.full_name,
"profile_photo_url": user.profile_photo_url
}
@router.post("/google", response_model=Token)
async def google_login(
data: GoogleLoginRequest,
session: Session = Depends(get_session)
):
try:
# Verify the ID token sent by the frontend
decoded_token = firebase_auth.verify_id_token(data.id_token)
email = decoded_token.get("email")
full_name = decoded_token.get("name", "")
profile_photo = decoded_token.get("picture", "")
# Check if user exists
user = session.exec(select(User).where(User.email == email)).first()
if not user:
# Create new user if it doesn't exist (Passenger as default)
user = User(
email=email,
full_name=full_name,
hashed_password=get_password_hash(str(uuid4())), # Random pass, won't be used
role=UserRole.PASSENGER,
profile_photo_url=profile_photo,
is_verified=True
)
session.add(user)
session.commit()
session.refresh(user)
print(f"DEBUG: Created new user via Google: {email}")
# Create access token
import datetime
expires = datetime.timedelta(days=30)
access_token = create_access_token(
subject=user.id,
role=user.role,
full_name=user.full_name,
expires_delta=expires
)
return {
"access_token": access_token,
"token_type": "bearer",
"role": user.role,
"full_name": user.full_name,
"profile_photo_url": user.profile_photo_url
}
except Exception as e:
print(f"DEBUG: Google Login failed: {e}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid Google Token: {str(e)}",
headers={"WWW-Authenticate": "Bearer"},
)
@router.post("/register/passenger", response_model=UserResponse)
async def register_passenger(
data: PassengerCreate,
session: Session = Depends(get_session)
):
# Check if user exists
existing_user = session.exec(select(User).where(User.email == data.email)).first()
if existing_user:
raise HTTPException(status_code=400, detail="Email already registered")
new_user = User(
email=data.email,
full_name=data.full_name,
hashed_password=get_password_hash(data.password),
role=UserRole.PASSENGER
)
session.add(new_user)
session.commit()
session.refresh(new_user)
return new_user
@router.post("/register/driver", response_model=UserResponse)
async def register_driver(
full_name: str = Form(...),
email: str = Form(...),
phone_number: str = Form(...),
password: str = Form(...),
cedula: str = Form(...),
vehicle_type: VehicleType = Form(...),
license_plate: str = Form(...),
cooperative_name: Optional[str] = Form(None),
profile_photo: Optional[UploadFile] = File(None),
vehicle_photo: UploadFile = File(...),
shift: Optional[str] = Form(None),
payment_methods: Optional[str] = Form(None),
speaks_english: bool = Form(False),
session: Session = Depends(get_session)
):
# Check if user exists
existing_user = session.exec(select(User).where(User.email == email)).first()
if existing_user:
raise HTTPException(status_code=400, detail="Email already registered")
# Save photos
profile_photo_url = None
if profile_photo:
ext = os.path.splitext(profile_photo.filename)[1]
filename = f"{uuid4()}{ext}"
path = os.path.join(UPLOAD_DIR, "profiles", filename)
with open(path, "wb") as buffer:
shutil.copyfileobj(profile_photo.file, buffer)
profile_photo_url = f"/uploads/profiles/{filename}"
ext_v = os.path.splitext(vehicle_photo.filename)[1]
v_filename = f"{uuid4()}{ext_v}"
v_path = os.path.join(UPLOAD_DIR, "vehicles", v_filename)
with open(v_path, "wb") as buffer:
shutil.copyfileobj(vehicle_photo.file, buffer)
vehicle_photo_url = f"/uploads/vehicles/{v_filename}"
# Create User
new_user = User(
email=email,
full_name=full_name,
hashed_password=get_password_hash(password),
role=UserRole.DRIVER,
is_verified=True, # Auto verify since it's admin registered now
profile_photo_url=profile_photo_url
)
session.add(new_user)
session.commit()
session.refresh(new_user)
# Create Driver Profile
profile = DriverProfile(
user_id=new_user.id,
cedula=cedula,
vehicle_type=vehicle_type,
license_plate=license_plate,
photo_url=profile_photo_url,
vehicle_photo_url=vehicle_photo_url,
cooperative_name=cooperative_name,
shift=shift,
payment_methods=payment_methods,
speaks_english=speaks_english
)
session.add(profile)
session.commit()
return new_user
@router.get("/me")
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
session: Session = Depends(get_session)
):
"""Get current logged in user details."""
payload = get_token_payload(token)
user_id = payload.get("sub")
user = session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
result = {
"id": user.id,
"email": user.email,
"full_name": user.full_name,
"role": user.role,
"is_verified": user.is_verified,
"profile_photo_url": user.profile_photo_url,
"driver_profile": None
}
if user.driver_profile:
dp = user.driver_profile
result["driver_profile"] = {
"cedula": dp.cedula,
"vehicle_type": dp.vehicle_type,
"license_plate": dp.license_plate,
"cooperative_name": dp.cooperative_name,
"photo_url": dp.photo_url,
"shift": dp.shift,
"payment_methods": dp.payment_methods,
"speaks_english": dp.speaks_english
}
return result
@router.patch("/me", response_model=UserResponse)
async def update_me(
full_name: Optional[str] = Form(None),
password: Optional[str] = Form(None),
profile_photo: Optional[UploadFile] = File(None),
token: Annotated[str, Depends(oauth2_scheme)] = None,
session: Session = Depends(get_session)
):
"""Update current user profile info."""
payload = get_token_payload(token)
user_id = payload.get("sub")
user = session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
if full_name:
user.full_name = full_name
if password:
user.hashed_password = get_password_hash(password)
if profile_photo:
# Create directory if not exists
profile_dir = os.path.join(UPLOAD_DIR, "profiles")
os.makedirs(profile_dir, exist_ok=True)
ext = os.path.splitext(profile_photo.filename)[1]
filename = f"{uuid4()}{ext}"
path = os.path.join(profile_dir, filename)
with open(path, "wb") as buffer:
shutil.copyfileobj(profile_photo.file, buffer)
user.profile_photo_url = f"/uploads/profiles/{filename}"
# If user is driver, also update driver profile photo_url for backwards compatibility/sync
if user.driver_profile:
user.driver_profile.photo_url = user.profile_photo_url
session.add(user.driver_profile)
session.add(user)
session.commit()
session.refresh(user)
return user