315 lines
11 KiB
Python
315 lines
11 KiB
Python
import os
|
|
import json
|
|
import shutil
|
|
from uuid import uuid4
|
|
from typing import Annotated, Optional
|
|
from fastapi import APIRouter, HTTPException, status, Depends, UploadFile, File, Form
|
|
from sqlmodel import Session, select
|
|
from app.core.database import get_session
|
|
from app.core.security import verify_password, get_password_hash, create_access_token, get_token_payload
|
|
from app.models.user import User, DriverProfile, UserRole, VehicleType
|
|
from app.api.deps import oauth2_scheme
|
|
from app.schemas.user import PassengerCreate, Token, UserResponse, LoginRequest, GoogleLoginRequest
|
|
import firebase_admin
|
|
from firebase_admin import auth as firebase_auth, credentials
|
|
|
|
# Initialize Firebase Admin SDK
|
|
# Supports two methods:
|
|
# 1. FIREBASE_SERVICE_ACCOUNT_JSON env var with the full JSON as a string (recommended for Render/production)
|
|
# 2. GOOGLE_APPLICATION_CREDENTIALS env var pointing to a JSON file path (local dev)
|
|
try:
|
|
if not firebase_admin._apps:
|
|
sa_json = os.environ.get("FIREBASE_SERVICE_ACCOUNT_JSON")
|
|
if sa_json:
|
|
# Parse the JSON string directly from environment variable
|
|
sa_dict = json.loads(sa_json)
|
|
cred = credentials.Certificate(sa_dict)
|
|
firebase_admin.initialize_app(cred)
|
|
print("DEBUG: Firebase Admin initialized from FIREBASE_SERVICE_ACCOUNT_JSON env var")
|
|
elif os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"):
|
|
# Use file path from GOOGLE_APPLICATION_CREDENTIALS
|
|
firebase_admin.initialize_app()
|
|
print("DEBUG: Firebase Admin initialized from GOOGLE_APPLICATION_CREDENTIALS file")
|
|
else:
|
|
print("WARNING: No Firebase credentials found. Set FIREBASE_SERVICE_ACCOUNT_JSON or GOOGLE_APPLICATION_CREDENTIALS.")
|
|
firebase_admin.initialize_app()
|
|
except Exception as e:
|
|
print(f"WARNING: Firebase Admin could not be initialized: {e}")
|
|
|
|
router = APIRouter(prefix="/api/auth", tags=["auth"])
|
|
|
|
UPLOAD_DIR = "uploads"
|
|
|
|
|
|
@router.post("/login", response_model=Token)
|
|
async def login(
|
|
data: LoginRequest,
|
|
session: Session = Depends(get_session)
|
|
):
|
|
print(f"DEBUG: Login attempt for email: {data.email}")
|
|
user = session.exec(select(User).where(User.email == data.email)).first()
|
|
if not user:
|
|
print(f"DEBUG: User not found: {data.email}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect email or password",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
if not verify_password(data.password, user.hashed_password):
|
|
print(f"DEBUG: Invalid password for: {data.email}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect email or password",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
print(f"DEBUG: Successful login for: {data.email} as {user.role}")
|
|
|
|
# Token expiration can be extended if keep_session is true
|
|
import datetime
|
|
expires = datetime.timedelta(days=30) if data.keep_session else datetime.timedelta(days=1)
|
|
|
|
access_token = create_access_token(
|
|
subject=user.id,
|
|
role=user.role,
|
|
full_name=user.full_name,
|
|
expires_delta=expires
|
|
)
|
|
return {
|
|
"access_token": access_token,
|
|
"token_type": "bearer",
|
|
"role": user.role,
|
|
"full_name": user.full_name,
|
|
"profile_photo_url": user.profile_photo_url
|
|
}
|
|
|
|
|
|
@router.post("/google", response_model=Token)
|
|
async def google_login(
|
|
data: GoogleLoginRequest,
|
|
session: Session = Depends(get_session)
|
|
):
|
|
try:
|
|
# Verify the ID token sent by the frontend
|
|
decoded_token = firebase_auth.verify_id_token(data.id_token)
|
|
email = decoded_token.get("email")
|
|
full_name = decoded_token.get("name", "")
|
|
profile_photo = decoded_token.get("picture", "")
|
|
|
|
# Check if user exists
|
|
user = session.exec(select(User).where(User.email == email)).first()
|
|
|
|
if not user:
|
|
# Create new user if it doesn't exist (Passenger as default)
|
|
user = User(
|
|
email=email,
|
|
full_name=full_name,
|
|
hashed_password=get_password_hash(str(uuid4())), # Random pass, won't be used
|
|
role=UserRole.PASSENGER,
|
|
profile_photo_url=profile_photo,
|
|
is_verified=True
|
|
)
|
|
session.add(user)
|
|
session.commit()
|
|
session.refresh(user)
|
|
print(f"DEBUG: Created new user via Google: {email}")
|
|
|
|
# Create access token
|
|
import datetime
|
|
expires = datetime.timedelta(days=30)
|
|
access_token = create_access_token(
|
|
subject=user.id,
|
|
role=user.role,
|
|
full_name=user.full_name,
|
|
expires_delta=expires
|
|
)
|
|
|
|
return {
|
|
"access_token": access_token,
|
|
"token_type": "bearer",
|
|
"role": user.role,
|
|
"full_name": user.full_name,
|
|
"profile_photo_url": user.profile_photo_url
|
|
}
|
|
except Exception as e:
|
|
print(f"DEBUG: Google Login failed: {e}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail=f"Invalid Google Token: {str(e)}",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
|
|
@router.post("/register/passenger", response_model=UserResponse)
|
|
async def register_passenger(
|
|
data: PassengerCreate,
|
|
session: Session = Depends(get_session)
|
|
):
|
|
# Check if user exists
|
|
existing_user = session.exec(select(User).where(User.email == data.email)).first()
|
|
if existing_user:
|
|
raise HTTPException(status_code=400, detail="Email already registered")
|
|
|
|
new_user = User(
|
|
email=data.email,
|
|
full_name=data.full_name,
|
|
hashed_password=get_password_hash(data.password),
|
|
role=UserRole.PASSENGER
|
|
)
|
|
session.add(new_user)
|
|
session.commit()
|
|
session.refresh(new_user)
|
|
return new_user
|
|
|
|
|
|
@router.post("/register/driver", response_model=UserResponse)
|
|
async def register_driver(
|
|
full_name: str = Form(...),
|
|
email: str = Form(...),
|
|
phone_number: str = Form(...),
|
|
password: str = Form(...),
|
|
cedula: str = Form(...),
|
|
vehicle_type: VehicleType = Form(...),
|
|
license_plate: str = Form(...),
|
|
cooperative_name: Optional[str] = Form(None),
|
|
profile_photo: Optional[UploadFile] = File(None),
|
|
vehicle_photo: UploadFile = File(...),
|
|
shift: Optional[str] = Form(None),
|
|
payment_methods: Optional[str] = Form(None),
|
|
speaks_english: bool = Form(False),
|
|
session: Session = Depends(get_session)
|
|
):
|
|
# Check if user exists
|
|
existing_user = session.exec(select(User).where(User.email == email)).first()
|
|
if existing_user:
|
|
raise HTTPException(status_code=400, detail="Email already registered")
|
|
|
|
# Save photos
|
|
profile_photo_url = None
|
|
if profile_photo:
|
|
ext = os.path.splitext(profile_photo.filename)[1]
|
|
filename = f"{uuid4()}{ext}"
|
|
path = os.path.join(UPLOAD_DIR, "profiles", filename)
|
|
with open(path, "wb") as buffer:
|
|
shutil.copyfileobj(profile_photo.file, buffer)
|
|
profile_photo_url = f"/uploads/profiles/{filename}"
|
|
|
|
ext_v = os.path.splitext(vehicle_photo.filename)[1]
|
|
v_filename = f"{uuid4()}{ext_v}"
|
|
v_path = os.path.join(UPLOAD_DIR, "vehicles", v_filename)
|
|
with open(v_path, "wb") as buffer:
|
|
shutil.copyfileobj(vehicle_photo.file, buffer)
|
|
vehicle_photo_url = f"/uploads/vehicles/{v_filename}"
|
|
|
|
# Create User
|
|
new_user = User(
|
|
email=email,
|
|
full_name=full_name,
|
|
hashed_password=get_password_hash(password),
|
|
role=UserRole.DRIVER,
|
|
is_verified=True, # Auto verify since it's admin registered now
|
|
profile_photo_url=profile_photo_url
|
|
)
|
|
session.add(new_user)
|
|
session.commit()
|
|
session.refresh(new_user)
|
|
|
|
# Create Driver Profile
|
|
profile = DriverProfile(
|
|
user_id=new_user.id,
|
|
cedula=cedula,
|
|
vehicle_type=vehicle_type,
|
|
license_plate=license_plate,
|
|
photo_url=profile_photo_url,
|
|
vehicle_photo_url=vehicle_photo_url,
|
|
cooperative_name=cooperative_name,
|
|
shift=shift,
|
|
payment_methods=payment_methods,
|
|
speaks_english=speaks_english
|
|
)
|
|
session.add(profile)
|
|
session.commit()
|
|
|
|
return new_user
|
|
|
|
@router.get("/me")
|
|
async def get_current_user(
|
|
token: Annotated[str, Depends(oauth2_scheme)],
|
|
session: Session = Depends(get_session)
|
|
):
|
|
"""Get current logged in user details."""
|
|
payload = get_token_payload(token)
|
|
user_id = payload.get("sub")
|
|
|
|
user = session.get(User, user_id)
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
result = {
|
|
"id": user.id,
|
|
"email": user.email,
|
|
"full_name": user.full_name,
|
|
"role": user.role,
|
|
"is_verified": user.is_verified,
|
|
"profile_photo_url": user.profile_photo_url,
|
|
"driver_profile": None
|
|
}
|
|
|
|
if user.driver_profile:
|
|
dp = user.driver_profile
|
|
result["driver_profile"] = {
|
|
"cedula": dp.cedula,
|
|
"vehicle_type": dp.vehicle_type,
|
|
"license_plate": dp.license_plate,
|
|
"cooperative_name": dp.cooperative_name,
|
|
"photo_url": dp.photo_url,
|
|
"shift": dp.shift,
|
|
"payment_methods": dp.payment_methods,
|
|
"speaks_english": dp.speaks_english
|
|
}
|
|
|
|
return result
|
|
|
|
@router.patch("/me", response_model=UserResponse)
|
|
async def update_me(
|
|
full_name: Optional[str] = Form(None),
|
|
password: Optional[str] = Form(None),
|
|
profile_photo: Optional[UploadFile] = File(None),
|
|
token: Annotated[str, Depends(oauth2_scheme)] = None,
|
|
session: Session = Depends(get_session)
|
|
):
|
|
"""Update current user profile info."""
|
|
payload = get_token_payload(token)
|
|
user_id = payload.get("sub")
|
|
user = session.get(User, user_id)
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
if full_name:
|
|
user.full_name = full_name
|
|
if password:
|
|
user.hashed_password = get_password_hash(password)
|
|
|
|
if profile_photo:
|
|
# Create directory if not exists
|
|
profile_dir = os.path.join(UPLOAD_DIR, "profiles")
|
|
os.makedirs(profile_dir, exist_ok=True)
|
|
|
|
ext = os.path.splitext(profile_photo.filename)[1]
|
|
filename = f"{uuid4()}{ext}"
|
|
path = os.path.join(profile_dir, filename)
|
|
with open(path, "wb") as buffer:
|
|
shutil.copyfileobj(profile_photo.file, buffer)
|
|
user.profile_photo_url = f"/uploads/profiles/{filename}"
|
|
|
|
# If user is driver, also update driver profile photo_url for backwards compatibility/sync
|
|
if user.driver_profile:
|
|
user.driver_profile.photo_url = user.profile_photo_url
|
|
session.add(user.driver_profile)
|
|
|
|
session.add(user)
|
|
session.commit()
|
|
session.refresh(user)
|
|
return user
|