Initial commit: SIBU 2.0 MISSION

This commit is contained in:
2026-02-21 09:53:31 -05:00
commit 0c7aa53c8b
400 changed files with 67708 additions and 0 deletions

2
backend/app/__init__.py Normal file
View File

@ -0,0 +1,2 @@
"""SIBU Backend Application."""

View File

@ -0,0 +1,2 @@
"""API routes."""

View File

@ -0,0 +1,122 @@
from fastapi import APIRouter, Depends
from sqlmodel import Session, select, func
from app.core.database import get_session
from app.models.analytics import AnalyticsEvent
from app.models.user import User
from typing import Optional, Dict
from datetime import datetime, timedelta
from app.api.deps import get_current_user_optional
router = APIRouter()
@router.post("/event")
async def log_event(
event: Dict,
session: Session = Depends(get_session),
current_user: Optional[User] = Depends(get_current_user_optional)
):
user_id = current_user.id if current_user else None
new_event = AnalyticsEvent(
event_name=event.get("event_name"),
user_id=user_id,
screen_name=event.get("screen_name"),
item_id=event.get("item_id"),
properties=event.get("properties", {})
)
session.add(new_event)
session.commit()
return {"status": "ok"}
@router.get("/strategic")
async def get_strategic_analysis(
session: Session = Depends(get_session)
):
"""Deep analysis of how businesses and shuttles are performing."""
# 1. SHUTTLE PERFORMANCE
shuttle_previews = session.exec(
select(AnalyticsEvent.item_id, func.count(AnalyticsEvent.id))
.where(AnalyticsEvent.event_name == "shuttle_view")
.group_by(AnalyticsEvent.item_id)
).all()
shuttle_contacts = session.exec(
select(AnalyticsEvent.item_id, func.count(AnalyticsEvent.id))
.where(AnalyticsEvent.event_name == "shuttle_contact")
.group_by(AnalyticsEvent.item_id)
).all()
shuttle_map = {r[0]: {"views": r[1], "contacts": 0} for r in shuttle_previews if r[0]}
for r in shuttle_contacts:
if r[0] in shuttle_map:
shuttle_map[r[0]]["contacts"] = r[1]
else:
shuttle_map[r[0]] = {"views": 0, "contacts": r[1]}
# 2. BUSINESS PERFORMANCE
biz_views = session.exec(
select(AnalyticsEvent.item_id, func.count(AnalyticsEvent.id))
.where(AnalyticsEvent.event_name == "business_view")
.group_by(AnalyticsEvent.item_id)
).all()
promo_clicks = session.exec(
select(AnalyticsEvent.item_id, func.count(AnalyticsEvent.id))
.where(AnalyticsEvent.event_name == "promo_click")
.group_by(AnalyticsEvent.item_id)
).all()
biz_map = {r[0]: {"views": r[1], "promos": 0} for r in biz_views if r[0]}
for r in promo_clicks:
if r[0] in biz_map:
biz_map[r[0]]["promos"] = r[1]
else:
biz_map[r[0]] = {"views": 0, "promos": r[1]}
# 3. TOP STOPS (CASETAS CON MÁS GENTE)
top_stops = session.exec(
select(AnalyticsEvent.item_id, func.count(AnalyticsEvent.id))
.where(AnalyticsEvent.event_name == "stop_selected")
.group_by(AnalyticsEvent.item_id)
.order_by(func.count(AnalyticsEvent.id).desc())
.limit(10)
).all()
# 4. ACTIVE USERS
total_active_users = session.exec(select(func.count(func.distinct(AnalyticsEvent.user_id))).where(AnalyticsEvent.user_id != None)).one()
# 5. PEAK HOURS BY USER TYPE
hour_expr = func.extract('hour', AnalyticsEvent.timestamp)
reg_usage = session.exec(select(hour_expr, func.count(AnalyticsEvent.id)).where(AnalyticsEvent.user_id != None).group_by(hour_expr)).all()
guest_usage = session.exec(select(hour_expr, func.count(AnalyticsEvent.id)).where(AnalyticsEvent.user_id == None).group_by(hour_expr)).all()
usage_patterns = {
"registered": {int(h): c for h, c in reg_usage},
"guests": {int(h): c for h, c in guest_usage}
}
return {
"shuttles": shuttle_map,
"businesses": biz_map,
"top_stops": [{"id": r[0], "count": r[1]} for r in top_stops],
"users": {
"registered_active": total_active_users,
"patterns": usage_patterns
},
"summary": {
"total_shuttle_contacts": sum(sh[1] for sh in shuttle_contacts),
"total_promo_clicks": sum(p[1] for p in promo_clicks),
"total_biz_views": sum(b[1] for b in biz_views)
}
}
@router.get("/dashboard/stats")
async def get_dashboard_stats(
session: Session = Depends(get_session)
):
# Base dashboard stats for general overview
total_events = session.exec(select(func.count(AnalyticsEvent.id))).one()
return {
"total_events": total_events
}

View File

@ -0,0 +1,233 @@
import os
import shutil
from uuid import uuid4
from typing import Annotated, Optional
from fastapi import APIRouter, HTTPException, status, Depends, UploadFile, File, Form
from sqlmodel import Session, select
from app.core.database import get_session
from app.core.security import verify_password, get_password_hash, create_access_token, get_token_payload
from app.models.user import User, DriverProfile, UserRole, VehicleType
from app.api.deps import oauth2_scheme
from app.schemas.user import PassengerCreate, Token, UserResponse, LoginRequest
router = APIRouter(prefix="/api/auth", tags=["auth"])
UPLOAD_DIR = "uploads"
@router.post("/login", response_model=Token)
async def login(
data: LoginRequest,
session: Session = Depends(get_session)
):
print(f"DEBUG: Login attempt for email: {data.email}")
user = session.exec(select(User).where(User.email == data.email)).first()
if not user:
print(f"DEBUG: User not found: {data.email}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not verify_password(data.password, user.hashed_password):
print(f"DEBUG: Invalid password for: {data.email}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
print(f"DEBUG: Successful login for: {data.email} as {user.role}")
# Token expiration can be extended if keep_session is true
import datetime
expires = datetime.timedelta(days=30) if data.keep_session else datetime.timedelta(days=1)
access_token = create_access_token(
subject=user.id,
role=user.role,
full_name=user.full_name,
expires_delta=expires
)
return {
"access_token": access_token,
"token_type": "bearer",
"role": user.role,
"full_name": user.full_name,
"profile_photo_url": user.profile_photo_url
}
@router.post("/register/passenger", response_model=UserResponse)
async def register_passenger(
data: PassengerCreate,
session: Session = Depends(get_session)
):
# Check if user exists
existing_user = session.exec(select(User).where(User.email == data.email)).first()
if existing_user:
raise HTTPException(status_code=400, detail="Email already registered")
new_user = User(
email=data.email,
full_name=data.full_name,
hashed_password=get_password_hash(data.password),
role=UserRole.PASSENGER
)
session.add(new_user)
session.commit()
session.refresh(new_user)
return new_user
@router.post("/register/driver", response_model=UserResponse)
async def register_driver(
full_name: str = Form(...),
email: str = Form(...),
phone_number: str = Form(...),
password: str = Form(...),
cedula: str = Form(...),
vehicle_type: VehicleType = Form(...),
license_plate: str = Form(...),
cooperative_name: Optional[str] = Form(None),
profile_photo: Optional[UploadFile] = File(None),
vehicle_photo: UploadFile = File(...),
shift: Optional[str] = Form(None),
payment_methods: Optional[str] = Form(None),
speaks_english: bool = Form(False),
session: Session = Depends(get_session)
):
# Check if user exists
existing_user = session.exec(select(User).where(User.email == email)).first()
if existing_user:
raise HTTPException(status_code=400, detail="Email already registered")
# Save photos
profile_photo_url = None
if profile_photo:
ext = os.path.splitext(profile_photo.filename)[1]
filename = f"{uuid4()}{ext}"
path = os.path.join(UPLOAD_DIR, "profiles", filename)
with open(path, "wb") as buffer:
shutil.copyfileobj(profile_photo.file, buffer)
profile_photo_url = f"/uploads/profiles/{filename}"
ext_v = os.path.splitext(vehicle_photo.filename)[1]
v_filename = f"{uuid4()}{ext_v}"
v_path = os.path.join(UPLOAD_DIR, "vehicles", v_filename)
with open(v_path, "wb") as buffer:
shutil.copyfileobj(vehicle_photo.file, buffer)
vehicle_photo_url = f"/uploads/vehicles/{v_filename}"
# Create User
new_user = User(
email=email,
full_name=full_name,
hashed_password=get_password_hash(password),
role=UserRole.DRIVER,
is_verified=True, # Auto verify since it's admin registered now
profile_photo_url=profile_photo_url
)
session.add(new_user)
session.commit()
session.refresh(new_user)
# Create Driver Profile
profile = DriverProfile(
user_id=new_user.id,
cedula=cedula,
vehicle_type=vehicle_type,
license_plate=license_plate,
photo_url=profile_photo_url,
vehicle_photo_url=vehicle_photo_url,
cooperative_name=cooperative_name,
shift=shift,
payment_methods=payment_methods,
speaks_english=speaks_english
)
session.add(profile)
session.commit()
return new_user
@router.get("/me")
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
session: Session = Depends(get_session)
):
"""Get current logged in user details."""
payload = get_token_payload(token)
user_id = payload.get("sub")
user = session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
result = {
"id": user.id,
"email": user.email,
"full_name": user.full_name,
"role": user.role,
"is_verified": user.is_verified,
"profile_photo_url": user.profile_photo_url,
"driver_profile": None
}
if user.driver_profile:
dp = user.driver_profile
result["driver_profile"] = {
"cedula": dp.cedula,
"vehicle_type": dp.vehicle_type,
"license_plate": dp.license_plate,
"cooperative_name": dp.cooperative_name,
"photo_url": dp.photo_url,
"shift": dp.shift,
"payment_methods": dp.payment_methods,
"speaks_english": dp.speaks_english
}
return result
@router.patch("/me", response_model=UserResponse)
async def update_me(
full_name: Optional[str] = Form(None),
password: Optional[str] = Form(None),
profile_photo: Optional[UploadFile] = File(None),
token: Annotated[str, Depends(oauth2_scheme)] = None,
session: Session = Depends(get_session)
):
"""Update current user profile info."""
payload = get_token_payload(token)
user_id = payload.get("sub")
user = session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
if full_name:
user.full_name = full_name
if password:
user.hashed_password = get_password_hash(password)
if profile_photo:
# Create directory if not exists
profile_dir = os.path.join(UPLOAD_DIR, "profiles")
os.makedirs(profile_dir, exist_ok=True)
ext = os.path.splitext(profile_photo.filename)[1]
filename = f"{uuid4()}{ext}"
path = os.path.join(profile_dir, filename)
with open(path, "wb") as buffer:
shutil.copyfileobj(profile_photo.file, buffer)
user.profile_photo_url = f"/uploads/profiles/{filename}"
# If user is driver, also update driver profile photo_url for backwards compatibility/sync
if user.driver_profile:
user.driver_profile.photo_url = user.profile_photo_url
session.add(user.driver_profile)
session.add(user)
session.commit()
session.refresh(user)
return user

View File

@ -0,0 +1,91 @@
"""Bus stops API endpoints."""
from fastapi import APIRouter, Depends, HTTPException
from sqlmodel import Session, select
from typing import List
from app.core.database import get_session
from app.models.bus_stop import BusStop
from app.schemas.bus_stop import BusStopResponse, BusStopCreate, BusStopUpdate
from app.api.deps import get_current_admin
router = APIRouter(prefix="/api/bus-stops", tags=["bus-stops"])
@router.get("", response_model=List[BusStopResponse])
async def get_bus_stops(session: Session = Depends(get_session)):
"""Get all bus stops."""
statement = select(BusStop)
stops = session.exec(statement).all()
return stops
@router.get("/{stop_id}", response_model=BusStopResponse)
async def get_bus_stop(stop_id: str, session: Session = Depends(get_session)):
"""Get a single bus stop by ID."""
stop = session.get(BusStop, stop_id)
if not stop:
raise HTTPException(status_code=404, detail="Bus stop not found")
return stop
@router.get("/{stop_id}/routes")
async def get_bus_stop_routes(stop_id: str, session: Session = Depends(get_session)):
"""Get all routes passing through a bus stop."""
from app.models.route_stop import RouteStop
from app.models.route import Route
statement = select(Route).join(
RouteStop, RouteStop.route_id == Route.id
).where(RouteStop.stop_id == stop_id)
routes = session.exec(statement).all()
return routes
@router.post("", response_model=BusStopResponse)
async def create_bus_stop(
bus_stop: BusStopCreate,
session: Session = Depends(get_session),
_: bool = Depends(get_current_admin)
):
"""Create a new bus stop (Admin only)."""
db_stop = BusStop.model_validate(bus_stop)
session.add(db_stop)
session.commit()
session.refresh(db_stop)
return db_stop
@router.put("/{stop_id}", response_model=BusStopResponse)
async def update_bus_stop(
stop_id: str,
stop_update: BusStopUpdate,
session: Session = Depends(get_session),
_: bool = Depends(get_current_admin)
):
"""Update a bus stop (Admin only)."""
db_stop = session.get(BusStop, stop_id)
if not db_stop:
raise HTTPException(status_code=404, detail="Bus stop not found")
stop_data = stop_update.model_dump(exclude_unset=True)
for key, value in stop_data.items():
setattr(db_stop, key, value)
session.add(db_stop)
session.commit()
session.refresh(db_stop)
return db_stop
@router.delete("/{stop_id}")
async def delete_bus_stop(
stop_id: str,
session: Session = Depends(get_session),
_: bool = Depends(get_current_admin)
):
"""Delete a bus stop (Admin only)."""
db_stop = session.get(BusStop, stop_id)
if not db_stop:
raise HTTPException(status_code=404, detail="Bus stop not found")
session.delete(db_stop)
session.commit()
return {"ok": True}

View File

@ -0,0 +1,158 @@
from fastapi import APIRouter, Depends, HTTPException, status, Form, File, UploadFile
from sqlmodel import Session, select
from typing import List, Optional
from app.core.database import get_session
from app.models.business import Business
from app.models.user import User, UserRole
from app.api.deps import get_current_user
router = APIRouter(prefix="/api/businesses", tags=["businesses"])
@router.get("", response_model=List[Business])
async def list_businesses(
*,
session: Session = Depends(get_session)
):
"""List all businesses."""
statement = select(Business)
businesses = session.exec(statement).all()
return businesses
@router.post("", response_model=Business)
async def create_business(
*,
session: Session = Depends(get_session),
name: str = Form(...),
category: str = Form(...),
address: str = Form(...),
phone: Optional[str] = Form(None),
social_media: Optional[str] = Form(None),
latitude: Optional[float] = Form(None),
longitude: Optional[float] = Form(None),
image: Optional[UploadFile] = File(None),
current_user: User = Depends(get_current_user)
):
"""Create a new business (Promoters and Admins only)."""
if current_user.role not in [UserRole.ADMIN, UserRole.PROMOTER]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only promoters and admins can manage businesses"
)
image_url = None
if image:
import os
import shutil
from uuid import uuid4
UPLOAD_DIR = "uploads/businesses"
os.makedirs(UPLOAD_DIR, exist_ok=True)
ext = os.path.splitext(image.filename)[1]
filename = f"{uuid4()}{ext}"
path = os.path.join(UPLOAD_DIR, filename)
with open(path, "wb") as buffer:
shutil.copyfileobj(image.file, buffer)
image_url = f"/uploads/businesses/{filename}"
db_business = Business(
name=name,
category=category,
address=address,
phone=phone,
social_media=social_media,
latitude=latitude,
longitude=longitude,
image_url=image_url
)
session.add(db_business)
session.commit()
session.refresh(db_business)
return db_business
@router.patch("/{business_id}", response_model=Business)
async def update_business(
*,
session: Session = Depends(get_session),
business_id: str,
name: Optional[str] = Form(None),
category: Optional[str] = Form(None),
address: Optional[str] = Form(None),
phone: Optional[str] = Form(None),
social_media: Optional[str] = Form(None),
latitude: Optional[float] = Form(None),
longitude: Optional[float] = Form(None),
image: Optional[UploadFile] = File(None),
current_user: User = Depends(get_current_user)
):
"""Update a business (Promoters and Admins only)."""
if current_user.role not in [UserRole.ADMIN, UserRole.PROMOTER]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only promoters and admins can manage businesses"
)
db_business = session.get(Business, business_id)
if not db_business:
raise HTTPException(status_code=404, detail="Business not found")
if name is not None:
db_business.name = name
if category is not None:
db_business.category = category
if address is not None:
db_business.address = address
if phone is not None:
db_business.phone = phone
if social_media is not None:
db_business.social_media = social_media
if latitude is not None:
db_business.latitude = latitude
if longitude is not None:
db_business.longitude = longitude
if image:
import os
import shutil
from uuid import uuid4
UPLOAD_DIR = "uploads/businesses"
os.makedirs(UPLOAD_DIR, exist_ok=True)
ext = os.path.splitext(image.filename)[1]
filename = f"{uuid4()}{ext}"
path = os.path.join(UPLOAD_DIR, filename)
with open(path, "wb") as buffer:
shutil.copyfileobj(image.file, buffer)
db_business.image_url = f"/uploads/businesses/{filename}"
session.add(db_business)
session.commit()
session.refresh(db_business)
return db_business
@router.get("/{business_id}", response_model=Business)
async def get_business(business_id: str, session: Session = Depends(get_session)):
"""Get a single business by ID."""
business = session.get(Business, business_id)
if not business:
raise HTTPException(status_code=404, detail="Business not found")
return business
@router.delete("/{business_id}")
async def delete_business(
*,
session: Session = Depends(get_session),
business_id: str,
current_user: User = Depends(get_current_user)
):
"""Delete a business (Promoters and Admins only)."""
if current_user.role not in [UserRole.ADMIN, UserRole.PROMOTER]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only promoters and admins can manage businesses"
)
db_business = session.get(Business, business_id)
if not db_business:
raise HTTPException(status_code=404, detail="Business not found")
session.delete(db_business)
session.commit()
return {"status": "success", "message": "Business deleted"}

View File

@ -0,0 +1,94 @@
from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel import Session, select
from sqlalchemy.orm import joinedload
from typing import List
from app.core.database import get_session
from app.models.coupon import Coupon, CouponCreate, CouponUpdate
from app.models.user import User, UserRole
from app.api.deps import get_current_user
router = APIRouter(prefix="/api/coupons", tags=["coupons"])
@router.get("", response_model=List[Coupon])
async def list_coupons(
*,
session: Session = Depends(get_session),
active_only: bool = True
):
"""List all coupons."""
statement = select(Coupon).options(joinedload(Coupon.business))
if active_only:
statement = statement.where(Coupon.is_active)
coupons = session.exec(statement).all()
return coupons
@router.post("", response_model=Coupon)
async def create_coupon(
*,
session: Session = Depends(get_session),
coupon_in: CouponCreate,
current_user: User = Depends(get_current_user)
):
"""Create a new coupon (Promoters and Admins only)."""
if current_user.role not in [UserRole.ADMIN, UserRole.PROMOTER]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only promoters and admins can create coupons"
)
db_coupon = Coupon.from_orm(coupon_in)
session.add(db_coupon)
session.commit()
session.refresh(db_coupon)
return db_coupon
@router.patch("/{coupon_id}", response_model=Coupon)
async def update_coupon(
*,
session: Session = Depends(get_session),
coupon_id: str,
coupon_in: CouponUpdate,
current_user: User = Depends(get_current_user)
):
"""Update a coupon (Promoters and Admins only)."""
if current_user.role not in [UserRole.ADMIN, UserRole.PROMOTER]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only promoters and admins can update coupons"
)
db_coupon = session.get(Coupon, coupon_id)
if not db_coupon:
raise HTTPException(status_code=404, detail="Coupon not found")
coupon_data = coupon_in.dict(exclude_unset=True)
for key, value in coupon_data.items():
setattr(db_coupon, key, value)
session.add(db_coupon)
session.commit()
session.refresh(db_coupon)
return db_coupon
@router.delete("/{coupon_id}")
async def delete_coupon(
*,
session: Session = Depends(get_session),
coupon_id: str,
current_user: User = Depends(get_current_user)
):
"""Delete a coupon (Promoters and Admins only)."""
if current_user.role not in [UserRole.ADMIN, UserRole.PROMOTER]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only promoters and admins can delete coupons"
)
db_coupon = session.get(Coupon, coupon_id)
if not db_coupon:
raise HTTPException(status_code=404, detail="Coupon not found")
session.delete(db_coupon)
session.commit()
return {"status": "success", "message": "Coupon deleted"}

76
backend/app/api/deps.py Normal file
View File

@ -0,0 +1,76 @@
from typing import Annotated, Dict, Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from app.core.config import settings
from app.core.security import ALGORITHM
from sqlmodel import Session
from app.core.database import get_session
from app.models.user import User
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/auth/login", auto_error=False)
def get_token_payload(token: Annotated[str, Depends(oauth2_scheme)]) -> Dict:
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[ALGORITHM])
return payload
except JWTError:
raise credentials_exception
async def get_current_user_token(token: Annotated[str, Depends(oauth2_scheme)]) -> Dict:
return get_token_payload(token)
async def get_current_admin(token: Annotated[str, Depends(oauth2_scheme)]) -> bool:
from app.models.user import UserRole
payload = get_token_payload(token)
role: str = payload.get("role")
# Check for both "admin" and "ADMIN" for robust authorization
if role not in ["admin", "ADMIN", UserRole.ADMIN]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="The user doesn't have enough privileges",
)
return True
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
session: Session = Depends(get_session)
) -> User:
payload = get_token_payload(token)
user_id = payload.get("sub")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
user = session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
async def get_current_user_optional(
token: Optional[str] = Depends(oauth2_scheme),
session: Session = Depends(get_session)
) -> Optional[User]:
if not token:
return None
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[ALGORITHM])
user_id = payload.get("sub")
if not user_id:
return None
return session.get(User, user_id)
except JWTError:
return None

View File

@ -0,0 +1,109 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlmodel import Session, select
from typing import List
from app.core.database import get_session
from app.models.favorite import Favorite
from app.api.deps import get_current_user
from app.models.user import User
from pydantic import BaseModel
router = APIRouter(prefix="/api/favorites", tags=["favorites"])
class FavoriteCreate(BaseModel):
item_type: str # 'coupon', 'business', 'taxi', 'route'
item_id: str
item_name: str | None = None
item_image: str | None = None
@router.get("")
async def get_favorites(
item_type: str | None = None,
session: Session = Depends(get_session),
current_user: User = Depends(get_current_user)
) -> List[Favorite]:
"""Get all favorites for the current user, optionally filtered by type."""
statement = select(Favorite).where(Favorite.user_id == current_user.id)
if item_type:
statement = statement.where(Favorite.item_type == item_type)
statement = statement.order_by(Favorite.created_at.desc())
favorites = session.exec(statement).all()
return list(favorites)
@router.post("")
async def add_favorite(
favorite_data: FavoriteCreate,
session: Session = Depends(get_session),
current_user: User = Depends(get_current_user)
) -> Favorite:
"""Add an item to favorites."""
# Check if already favorited
existing = session.exec(
select(Favorite).where(
Favorite.user_id == current_user.id,
Favorite.item_type == favorite_data.item_type,
Favorite.item_id == favorite_data.item_id
)
).first()
if existing:
raise HTTPException(status_code=400, detail="Item already in favorites")
favorite = Favorite(
user_id=current_user.id,
item_type=favorite_data.item_type,
item_id=favorite_data.item_id,
item_name=favorite_data.item_name,
item_image=favorite_data.item_image
)
session.add(favorite)
session.commit()
session.refresh(favorite)
return favorite
@router.delete("/{item_type}/{item_id}")
async def remove_favorite(
item_type: str,
item_id: str,
session: Session = Depends(get_session),
current_user: User = Depends(get_current_user)
):
"""Remove an item from favorites."""
favorite = session.exec(
select(Favorite).where(
Favorite.user_id == current_user.id,
Favorite.item_type == item_type,
Favorite.item_id == item_id
)
).first()
if not favorite:
raise HTTPException(status_code=404, detail="Favorite not found")
session.delete(favorite)
session.commit()
return {"ok": True}
@router.get("/check/{item_type}/{item_id}")
async def check_favorite(
item_type: str,
item_id: str,
session: Session = Depends(get_session),
current_user: User = Depends(get_current_user)
) -> dict:
"""Check if an item is favorited."""
favorite = session.exec(
select(Favorite).where(
Favorite.user_id == current_user.id,
Favorite.item_type == item_type,
Favorite.item_id == item_id
)
).first()
return {"is_favorite": favorite is not None}

View File

@ -0,0 +1,96 @@
from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel import Session, select
from typing import List, Optional
from uuid import UUID
from app.core.database import get_session
from app.api.deps import get_current_admin, get_current_user_optional
from app.models.report import Report
from app.models.user import User
from app.schemas.report import ReportCreate, ReportUpdate, ReportResponse
router = APIRouter(prefix="/api/reports", tags=["reports"])
@router.post("", response_model=ReportResponse, status_code=status.HTTP_201_CREATED)
async def create_report(
report_in: ReportCreate,
session: Session = Depends(get_session),
current_user: Optional[User] = Depends(get_current_user_optional)
):
"""Create a new user report."""
report = Report(
message=report_in.message,
user_id=current_user.id if current_user else None
)
session.add(report)
session.commit()
session.refresh(report)
return ReportResponse(
id=report.id,
user_id=report.user_id,
user_name=current_user.full_name if current_user else "Anónimo",
message=report.message,
status=report.status,
created_at=report.created_at
)
@router.get("", response_model=List[ReportResponse])
async def get_reports(
session: Session = Depends(get_session),
admin_auth: bool = Depends(get_current_admin)
):
"""Get all reports (Admin only)."""
statement = select(Report)
results = session.exec(statement).all()
reports = []
for report in results:
user_name = "Anónimo"
if report.user_id:
user = session.get(User, report.user_id)
if user:
user_name = user.full_name
reports.append(ReportResponse(
id=report.id,
user_id=report.user_id,
user_name=user_name,
message=report.message,
status=report.status,
created_at=report.created_at
))
return reports
@router.patch("/{report_id}", response_model=ReportResponse)
async def update_report_status(
report_id: UUID,
report_update: ReportUpdate,
session: Session = Depends(get_session),
admin_auth: bool = Depends(get_current_admin)
):
"""Update report status (Admin only)."""
report = session.get(Report, report_id)
if not report:
raise HTTPException(status_code=404, detail="Report not found")
report.status = report_update.status
session.add(report)
session.commit()
session.refresh(report)
user_name = "Anónimo"
if report.user_id:
user = session.get(User, report.user_id)
if user:
user_name = user.full_name
return ReportResponse(
id=report.id,
user_id=report.user_id,
user_name=user_name,
message=report.message,
status=report.status,
created_at=report.created_at
)

View File

@ -0,0 +1,229 @@
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlmodel import Session, select
from typing import List, Optional
from sqlalchemy import func
from app.core.database import get_session
from app.models.route import Route
from app.models.route_stop import RouteStop
from app.schemas.route import RouteResponse, RouteCreate, RouteUpdate
from app.schemas.route_stop import RouteStopCreate, RouteStopUpdate
from app.api.deps import get_current_admin
router = APIRouter(prefix="/api/routes", tags=["routes"])
@router.get("", response_model=List[RouteResponse])
async def get_routes(
origin_city: Optional[str] = Query(None),
destination_city: Optional[str] = Query(None),
session: Session = Depends(get_session)
):
"""Get all routes with optional filtering by origin and destination city."""
statement = select(Route)
if origin_city:
statement = statement.where(Route.origin_city.contains(origin_city))
if destination_city:
statement = statement.where(Route.destination_city.contains(destination_city))
routes = session.exec(statement).all()
return routes
@router.get("/{route_id}", response_model=RouteResponse)
async def get_route(route_id: str, session: Session = Depends(get_session)):
"""Get a single route by ID."""
route = session.get(Route, route_id)
if not route:
raise HTTPException(status_code=404, detail="Route not found")
return route
@router.get("/{route_id}/stops")
async def get_route_stops(route_id: str, session: Session = Depends(get_session)):
"""Get all stops for a route."""
from app.models.route_stop import RouteStop
from app.models.bus_stop import BusStop
statement = select(RouteStop, BusStop).join(
BusStop, RouteStop.stop_id == BusStop.id
).where(RouteStop.route_id == route_id).order_by(RouteStop.stop_order)
results = session.exec(statement).all()
# Merge RouteStop data into BusStop response
stops = []
for route_stop, bus_stop in results:
stop_data = bus_stop.model_dump()
stop_data['stop_order'] = route_stop.stop_order
stop_data['travel_time_minutes'] = route_stop.travel_time_minutes
stop_data['stop_delay_minutes'] = route_stop.stop_delay_minutes
stop_data['is_pickup_point'] = route_stop.is_pickup_point
stop_data['is_dropoff_point'] = route_stop.is_dropoff_point
stops.append(stop_data)
return stops
@router.post("", response_model=RouteResponse)
async def create_route(
route: RouteCreate,
session: Session = Depends(get_session),
_: bool = Depends(get_current_admin)
):
"""Create a new route (Admin only)."""
db_route = Route.model_validate(route)
session.add(db_route)
session.commit()
session.refresh(db_route)
return db_route
@router.put("/{route_id}", response_model=RouteResponse)
async def update_route(
route_id: str,
route_update: RouteUpdate,
session: Session = Depends(get_session),
_: bool = Depends(get_current_admin)
):
"""Update a route (Admin only)."""
db_route = session.get(Route, route_id)
if not db_route:
raise HTTPException(status_code=404, detail="Route not found")
route_data = route_update.model_dump(exclude_unset=True)
for key, value in route_data.items():
setattr(db_route, key, value)
session.add(db_route)
session.commit()
session.refresh(db_route)
return db_route
@router.delete("/{route_id}")
async def delete_route(
route_id: str,
session: Session = Depends(get_session),
_: bool = Depends(get_current_admin)
):
"""Delete a route (Admin only)."""
db_route = session.get(Route, route_id)
if not db_route:
raise HTTPException(status_code=404, detail="Route not found")
session.delete(db_route)
session.commit()
return {"ok": True}
# Route Stop Management with Cascade
@router.post("/{route_id}/stops")
async def add_stop_to_route(
route_id: str,
stop_data: RouteStopCreate,
session: Session = Depends(get_session),
_: bool = Depends(get_current_admin)
):
"""Add a stop to a route with cascading order adjustment."""
# 1. Check if route exists
route = session.get(Route, route_id)
if not route:
raise HTTPException(status_code=404, detail="Route not found")
# 2. Determine stop order
if stop_data.stop_order is None:
# Append to end
max_order = session.exec(select(func.max(RouteStop.stop_order)).where(RouteStop.route_id == route_id)).one()
# handle case where max_order is None (no stops)
stop_data.stop_order = (max_order or 0) + 1
else:
# Shift existing stops equal to or greater than new order
existing_stops = session.exec(
select(RouteStop).where(RouteStop.route_id == route_id, RouteStop.stop_order >= stop_data.stop_order)
).all()
for stop in existing_stops:
stop.stop_order += 1
session.add(stop)
# 3. Create new RouteStop
new_stop = RouteStop(
route_id=route_id,
stop_id=stop_data.stop_id,
stop_order=stop_data.stop_order,
travel_time_minutes=stop_data.travel_time_minutes,
stop_delay_minutes=stop_data.stop_delay_minutes or 0,
is_pickup_point=stop_data.is_pickup_point,
is_dropoff_point=stop_data.is_dropoff_point
)
session.add(new_stop)
session.commit()
session.refresh(new_stop)
return new_stop
@router.put("/{route_id}/stops/{stop_id}")
async def update_route_stop_order(
route_id: str,
stop_id: str,
update_data: RouteStopUpdate,
session: Session = Depends(get_session),
_: bool = Depends(get_current_admin)
):
"""Update a route stop, potentially reordering others."""
# This assumes we find the connection by route_id and stop_id.
# NOTE: If a stop is on a route multiple times, this logic needs ID, but for now assuming unique stop per route.
# Actually RouteStop has its own ID but we are using stop_id in path. Let's find the RouteStop entry.
route_stop = session.exec(
select(RouteStop).where(RouteStop.route_id == route_id, RouteStop.stop_id == stop_id)
).first()
if not route_stop:
raise HTTPException(status_code=404, detail="Stop not found on this route")
old_order = route_stop.stop_order
# Update fields
if update_data.is_pickup_point is not None:
route_stop.is_pickup_point = update_data.is_pickup_point
if update_data.is_dropoff_point is not None:
route_stop.is_dropoff_point = update_data.is_dropoff_point
if update_data.travel_time_minutes is not None:
route_stop.travel_time_minutes = update_data.travel_time_minutes
if update_data.stop_delay_minutes is not None:
route_stop.stop_delay_minutes = update_data.stop_delay_minutes
# Reordering logic
if update_data.stop_order is not None and update_data.stop_order != old_order:
new_order = update_data.stop_order
if new_order > old_order:
# Moving down: shift stops between old+1 and new DOWN (-1)
stops_to_shift = session.exec(
select(RouteStop).where(
RouteStop.route_id == route_id,
RouteStop.stop_order > old_order,
RouteStop.stop_order <= new_order
)
).all()
for s in stops_to_shift:
s.stop_order -= 1
session.add(s)
else:
# Moving up: shift stops between new and old-1 UP (+1)
stops_to_shift = session.exec(
select(RouteStop).where(
RouteStop.route_id == route_id,
RouteStop.stop_order >= new_order,
RouteStop.stop_order < old_order
)
).all()
for s in stops_to_shift:
s.stop_order += 1
session.add(s)
route_stop.stop_order = new_order
session.add(route_stop)
session.commit()
session.refresh(route_stop)
return route_stop

View File

@ -0,0 +1,86 @@
from fastapi import APIRouter, Depends, Query, HTTPException
from sqlmodel import Session, select
from typing import Optional
from uuid import UUID
from app.core.database import get_session
from app.models.bus_schedule import BusSchedule
from app.api.deps import get_current_admin
router = APIRouter(prefix="/api/schedules", tags=["schedules"])
@router.get("")
async def get_schedules(
route_id: Optional[UUID] = Query(None),
stop_id: Optional[UUID] = Query(None),
only_published: bool = Query(True),
session: Session = Depends(get_session)
):
"""Get schedules for a route or stop."""
statement = select(BusSchedule)
if only_published:
statement = statement.where(BusSchedule.is_published)
if route_id:
statement = statement.where(BusSchedule.route_id == route_id)
if stop_id:
from app.models.route_stop import RouteStop
statement = statement.join(
RouteStop, BusSchedule.route_id == RouteStop.route_id
).where(RouteStop.stop_id == stop_id)
schedules = session.exec(statement).all()
return schedules
@router.post("")
async def create_schedule(
schedule: BusSchedule,
session: Session = Depends(get_session),
_: bool = Depends(get_current_admin)
):
"""Create a new bus schedule (Admin only)."""
db_schedule = BusSchedule.model_validate(schedule)
session.add(db_schedule)
session.commit()
session.refresh(db_schedule)
return db_schedule
@router.put("/{schedule_id}")
async def update_schedule(
schedule_id: UUID,
schedule_update: dict,
session: Session = Depends(get_session),
_: bool = Depends(get_current_admin)
):
"""Update a bus schedule (Admin only)."""
db_schedule = session.get(BusSchedule, schedule_id)
if not db_schedule:
raise HTTPException(status_code=404, detail="Schedule not found")
for key, value in schedule_update.items():
if hasattr(db_schedule, key):
setattr(db_schedule, key, value)
session.add(db_schedule)
session.commit()
session.refresh(db_schedule)
return db_schedule
@router.delete("/{schedule_id}")
async def delete_schedule(
schedule_id: UUID,
session: Session = Depends(get_session),
_: bool = Depends(get_current_admin)
):
"""Delete a bus schedule (Admin only)."""
db_schedule = session.get(BusSchedule, schedule_id)
if not db_schedule:
raise HTTPException(status_code=404, detail="Schedule not found")
session.delete(db_schedule)
session.commit()
return {"ok": True}

View File

@ -0,0 +1,161 @@
from fastapi import APIRouter, Depends, Query, HTTPException, UploadFile, File, Form
from sqlmodel import Session, select
from typing import Optional, List
import os
import shutil
from uuid import uuid4, UUID
from app.core.database import get_session
from app.models.shuttle import Shuttle
from app.api.deps import get_current_admin
router = APIRouter(prefix="/api/shuttles", tags=["shuttles"])
UPLOAD_DIR = "uploads"
@router.get("", response_model=List[Shuttle])
async def get_shuttles(
origin: Optional[str] = Query(None),
destination: Optional[str] = Query(None),
company_name: Optional[str] = Query(None),
trip_type: Optional[str] = Query(None),
is_active: bool = Query(True),
session: Session = Depends(get_session)
):
"""Get all shuttles with optional filters."""
statement = select(Shuttle).where(Shuttle.is_active == is_active)
if origin:
statement = statement.where(Shuttle.origin.contains(origin))
if destination:
statement = statement.where(Shuttle.destination.contains(destination))
if company_name:
statement = statement.where(Shuttle.company_name.contains(company_name))
if trip_type:
statement = statement.where(Shuttle.trip_type == trip_type)
shuttles = session.exec(statement).all()
return shuttles
@router.post("", response_model=Shuttle)
async def create_shuttle(
route_name: str = Form(...),
origin: str = Form(...),
destination: str = Form(...),
vehicle_type: str = Form(...),
company_name: Optional[str] = Form(None),
trip_type: str = Form("one_way"),
price_per_person: Optional[float] = Form(None),
price_private_trip: Optional[float] = Form(None),
estimated_duration: str = Form(...),
contact_whatsapp: str = Form(...),
phone_number: Optional[str] = Form(None),
english_speaking: bool = Form(False),
description: Optional[str] = Form(None),
departure_times: Optional[str] = Form(None),
is_active: bool = Form(True),
image: Optional[UploadFile] = File(None),
session: Session = Depends(get_session),
_: bool = Depends(get_current_admin)
):
"""Create a new shuttle trip (Admin only)."""
image_url = None
if image:
ext = os.path.splitext(image.filename)[1]
filename = f"{uuid4()}{ext}"
path = os.path.join(UPLOAD_DIR, "vehicles", filename)
with open(path, "wb") as buffer:
shutil.copyfileobj(image.file, buffer)
image_url = f"/uploads/vehicles/{filename}"
shuttle = Shuttle(
route_name=route_name,
origin=origin,
destination=destination,
vehicle_type=vehicle_type,
company_name=company_name,
trip_type=trip_type,
price_per_person=price_per_person,
price_private_trip=price_private_trip,
estimated_duration=estimated_duration,
contact_whatsapp=contact_whatsapp,
phone_number=phone_number,
english_speaking=english_speaking,
description=description,
departure_times=departure_times,
image_url=image_url,
is_active=is_active
)
session.add(shuttle)
session.commit()
session.refresh(shuttle)
return shuttle
@router.put("/{shuttle_id}", response_model=Shuttle)
async def update_shuttle(
shuttle_id: UUID,
route_name: str = Form(...),
origin: str = Form(...),
destination: str = Form(...),
vehicle_type: str = Form(...),
company_name: Optional[str] = Form(None),
trip_type: str = Form("one_way"),
price_per_person: Optional[float] = Form(None),
price_private_trip: Optional[float] = Form(None),
estimated_duration: str = Form(...),
contact_whatsapp: str = Form(...),
phone_number: Optional[str] = Form(None),
english_speaking: bool = Form(False),
description: Optional[str] = Form(None),
departure_times: Optional[str] = Form(None),
is_active: bool = Form(True),
image: Optional[UploadFile] = File(None),
session: Session = Depends(get_session),
_: bool = Depends(get_current_admin)
):
"""Update a shuttle trip (Admin only)."""
db_shuttle = session.get(Shuttle, shuttle_id)
if not db_shuttle:
raise HTTPException(status_code=404, detail="Shuttle not found")
db_shuttle.route_name = route_name
db_shuttle.origin = origin
db_shuttle.destination = destination
db_shuttle.vehicle_type = vehicle_type
db_shuttle.company_name = company_name
db_shuttle.trip_type = trip_type
db_shuttle.price_per_person = price_per_person
db_shuttle.price_private_trip = price_private_trip
db_shuttle.estimated_duration = estimated_duration
db_shuttle.contact_whatsapp = contact_whatsapp
db_shuttle.phone_number = phone_number
db_shuttle.english_speaking = english_speaking
db_shuttle.description = description
db_shuttle.departure_times = departure_times
db_shuttle.is_active = is_active
if image:
ext = os.path.splitext(image.filename)[1]
filename = f"{uuid4()}{ext}"
path = os.path.join(UPLOAD_DIR, "vehicles", filename)
with open(path, "wb") as buffer:
shutil.copyfileobj(image.file, buffer)
db_shuttle.image_url = f"/uploads/vehicles/{filename}"
session.add(db_shuttle)
session.commit()
session.refresh(db_shuttle)
return db_shuttle
@router.delete("/{shuttle_id}")
async def delete_shuttle(
shuttle_id: UUID,
session: Session = Depends(get_session),
_: bool = Depends(get_current_admin)
):
"""Delete a shuttle trip (Admin only)."""
db_shuttle = session.get(Shuttle, shuttle_id)
if not db_shuttle:
raise HTTPException(status_code=404, detail="Shuttle not found")
session.delete(db_shuttle)
session.commit()
return {"ok": True}

View File

@ -0,0 +1,145 @@
from fastapi import APIRouter, Depends, Query, HTTPException, UploadFile, File, Form
from sqlmodel import Session, select
from typing import Optional
import os
import shutil
from uuid import uuid4
from app.core.database import get_session
from app.models.taxi import Taxi
from app.api.deps import get_current_admin
router = APIRouter(prefix="/api/taxis", tags=["taxis"])
UPLOAD_DIR = "uploads"
@router.get("")
async def get_taxis(
corregimiento: Optional[str] = Query(None),
shift: Optional[str] = Query(None),
english_speaking: Optional[bool] = Query(None),
is_active: Optional[bool] = Query(None),
session: Session = Depends(get_session)
):
"""Get all taxis with optional filters."""
statement = select(Taxi)
if is_active is not None:
statement = statement.where(Taxi.is_active == is_active)
if corregimiento:
statement = statement.where(Taxi.corregimiento.contains(corregimiento))
if shift:
statement = statement.where(Taxi.shift == shift)
if english_speaking is not None:
statement = statement.where(Taxi.english_speaking == english_speaking)
taxis = session.exec(statement).all()
return taxis
@router.post("")
async def create_taxi(
owner_name: str = Form(...),
phone_number: str = Form(...),
license_plate: str = Form(...),
corregimiento: str = Form(...),
shift: str = Form(...),
cooperative: Optional[str] = Form(None),
rating: float = Form(5.0),
english_speaking: bool = Form(False),
is_active: bool = Form(True),
image: Optional[UploadFile] = File(None),
session: Session = Depends(get_session),
_: bool = Depends(get_current_admin)
):
"""Create a new taxi entry (Admin only)."""
image_url = None
if image:
ext = os.path.splitext(image.filename)[1]
filename = f"{uuid4()}{ext}"
path = os.path.join(UPLOAD_DIR, "profiles", filename)
with open(path, "wb") as buffer:
shutil.copyfileobj(image.file, buffer)
image_url = f"/uploads/profiles/{filename}"
taxi = Taxi(
owner_name=owner_name,
phone_number=phone_number,
license_plate=license_plate,
cooperative=cooperative,
corregimiento=corregimiento,
shift=shift,
rating=rating,
english_speaking=english_speaking,
image_url=image_url,
is_active=is_active
)
session.add(taxi)
session.commit()
session.refresh(taxi)
return taxi
@router.put("/{taxi_id}")
async def update_taxi(
taxi_id: str,
owner_name: str = Form(...),
phone_number: str = Form(...),
license_plate: str = Form(...),
corregimiento: str = Form(...),
shift: str = Form(...),
cooperative: Optional[str] = Form(None),
rating: float = Form(5.0),
english_speaking: bool = Form(False),
is_active: bool = Form(True),
image: Optional[UploadFile] = File(None),
session: Session = Depends(get_session),
_: bool = Depends(get_current_admin)
):
"""Update a taxi entry (Admin only)."""
db_taxi = session.get(Taxi, taxi_id)
if not db_taxi:
raise HTTPException(status_code=404, detail="Taxi not found")
# Update fields
db_taxi.owner_name = owner_name
db_taxi.phone_number = phone_number
db_taxi.license_plate = license_plate
db_taxi.corregimiento = corregimiento
db_taxi.shift = shift
db_taxi.cooperative = cooperative
db_taxi.rating = rating
db_taxi.english_speaking = english_speaking
db_taxi.is_active = is_active
# Handle image upload
if image:
ext = os.path.splitext(image.filename)[1]
filename = f"{uuid4()}{ext}"
path = os.path.join(UPLOAD_DIR, "profiles", filename)
with open(path, "wb") as buffer:
shutil.copyfileobj(image.file, buffer)
db_taxi.image_url = f"/uploads/profiles/{filename}"
session.add(db_taxi)
session.commit()
session.refresh(db_taxi)
return db_taxi
@router.delete("/{taxi_id}")
async def delete_taxi(
taxi_id: str,
session: Session = Depends(get_session),
_: bool = Depends(get_current_admin)
):
"""Delete a taxi entry (Admin only)."""
db_taxi = session.get(Taxi, taxi_id)
if not db_taxi:
raise HTTPException(status_code=404, detail="Taxi not found")
session.delete(db_taxi)
session.commit()
return {"ok": True}

View File

@ -0,0 +1,84 @@
from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel import Session, select
from typing import List
from datetime import datetime, timedelta
from app.core.database import get_session
from app.models.telemetry import Telemetry, TelemetryCreate, VehicleStatus
from app.models.user import User
from app.api.deps import get_current_user
router = APIRouter(prefix="/api/telemetry", tags=["telemetry"])
@router.post("", response_model=Telemetry)
async def create_telemetry_record(
*,
session: Session = Depends(get_session),
telemetry_in: TelemetryCreate,
current_user: User = Depends(get_current_user)
):
"""
Create a new telemetry record for the current driver.
"""
if current_user.role != "driver":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only drivers can send telemetry data"
)
db_telemetry = Telemetry(
user_id=current_user.id,
latitude=telemetry_in.latitude,
longitude=telemetry_in.longitude,
speed=telemetry_in.speed,
heading=telemetry_in.heading,
status=telemetry_in.status
)
session.add(db_telemetry)
session.commit()
session.refresh(db_telemetry)
return db_telemetry
@router.get("/active", response_model=List[dict])
async def get_active_units(
*,
session: Session = Depends(get_session)
):
"""
Get the latest location of all active units (last 5 minutes).
"""
# Subquery to get the latest timestamp per user
five_minutes_ago = datetime.utcnow() - timedelta(minutes=5)
# This is a bit complex in SQLModel/SQLAlchemy for "latest record per group"
# We'll use a simpler approach: get all records from last 5 mins and filter in python
# or use a more efficient distinct on if supported.
statement = (
select(Telemetry)
.where(Telemetry.timestamp >= five_minutes_ago)
.where(Telemetry.status == VehicleStatus.ACTIVE)
.order_by(Telemetry.user_id, Telemetry.timestamp.desc())
)
results = session.exec(statement).all()
# Filter to get only the latest unique user_id
latest_units = {}
for t in results:
if t.user_id not in latest_units:
# We also want the driver name and vehicle info
user = session.get(User, t.user_id)
latest_units[t.user_id] = {
"user_id": t.user_id,
"full_name": user.full_name if user else "Unknown",
"latitude": t.latitude,
"longitude": t.longitude,
"speed": t.speed,
"heading": t.heading,
"timestamp": t.timestamp,
"vehicle_type": user.driver_profile.vehicle_type if user and user.driver_profile else "unknown",
"license_plate": user.driver_profile.license_plate if user and user.driver_profile else "unknown"
}
return list(latest_units.values())

116
backend/app/api/users.py Normal file
View File

@ -0,0 +1,116 @@
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlmodel import Session, select
from uuid import UUID
from app.core.database import get_session
from app.models.user import User
from app.api.deps import get_current_admin
router = APIRouter(prefix="/api/users", tags=["users"])
@router.get("/search")
async def search_users(
email: str = Query(..., description="Email to search for"),
session: Session = Depends(get_session),
_: bool = Depends(get_current_admin)
):
"""Search for users by email (Admin only)."""
statement = select(User).where(User.email.contains(email))
users = session.exec(statement).all()
# Clean response (don't send hashed passwords)
return [
{
"id": user.id,
"email": user.email,
"full_name": user.full_name,
"role": user.role,
"is_verified": user.is_verified,
"created_at": user.created_at
} for user in users
]
@router.get("/{user_id}")
async def get_user_details(
user_id: UUID,
session: Session = Depends(get_session),
_: bool = Depends(get_current_admin)
):
"""Get detailed user info including driver profile (Admin only)."""
user = session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
result = {
"id": user.id,
"email": user.email,
"full_name": user.full_name,
"role": user.role,
"is_active": user.is_active,
"is_verified": user.is_verified,
"created_at": user.created_at,
"driver_profile": None
}
if user.driver_profile:
dp = user.driver_profile
result["driver_profile"] = {
"cedula": dp.cedula,
"vehicle_type": dp.vehicle_type,
"license_plate": dp.license_plate,
"cooperative_name": dp.cooperative_name,
"photo_url": dp.photo_url,
"vehicle_photo_url": dp.vehicle_photo_url,
"shift": dp.shift,
"payment_methods": dp.payment_methods,
"speaks_english": dp.speaks_english
}
return result
@router.get("/pending-drivers")
async def get_pending_drivers(
session: Session = Depends(get_session),
_: bool = Depends(get_current_admin)
):
"""List drivers waiting for verification (Admin only)."""
# Find users with DRIVER role who are NOT verified
from app.models.user import UserRole
statement = select(User).where(User.role == UserRole.DRIVER, User.is_verified.is_(False))
return [
{
"id": driver.id,
"email": driver.email,
"full_name": driver.full_name,
"created_at": driver.created_at,
"driver_profile": {
"cedula": driver.driver_profile.cedula,
"vehicle_type": driver.driver_profile.vehicle_type,
"license_plate": driver.driver_profile.license_plate,
"cooperative_name": driver.driver_profile.cooperative_name,
"shift": driver.driver_profile.shift,
"payment_methods": driver.driver_profile.payment_methods,
"speaks_english": driver.driver_profile.speaks_english
} if driver.driver_profile else None
} for driver in session.exec(statement).all()
]
@router.post("/{user_id}/verify")
async def verify_user(
user_id: UUID,
is_verified: bool = Query(..., description="True to approve, False to stay unverified/reject"),
session: Session = Depends(get_session),
_: bool = Depends(get_current_admin)
):
"""Approve or Reject a user verification (Admin only)."""
user = session.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
user.is_verified = is_verified
session.add(user)
session.commit()
session.refresh(user)
return {"id": user.id, "email": user.email, "is_verified": user.is_verified}

View File

@ -0,0 +1,2 @@
"""Core configuration and utilities."""

View File

@ -0,0 +1,46 @@
"""Configuration settings using pydantic-settings."""
import os
from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import Literal
def get_env_file() -> str:
"""Get the appropriate env file based on ENVIRONMENT variable."""
env = os.getenv("ENVIRONMENT", "development")
env_file = f".env.{env}"
# Check if env file exists, fallback to .env.development
if not os.path.exists(env_file):
env_file = ".env.development"
return env_file
class Settings(BaseSettings):
"""Application settings loaded from environment variables."""
# Database
database_url: str = "postgresql+asyncpg://sibu:sibu@localhost:5432/sibu"
# Google Maps (for server-side APIs)
google_maps_api_key: str = ""
google_maps_url_signing_secret: str = ""
# Environment
environment: Literal["development", "production", "testing"] = "development"
debug: bool = False
# Security
admin_password: str = "admin" # Default for development, override in .env
secret_key: str = "insecure-secret-key-dev" # Default for development, override in .env
model_config = SettingsConfigDict(
env_file=get_env_file(),
env_file_encoding="utf-8",
case_sensitive=False,
extra="ignore",
)
# Global settings instance
settings = Settings()

View File

@ -0,0 +1,27 @@
"""Database connection and session management."""
from sqlmodel import SQLModel, create_engine, Session
from typing import Generator
from app.core.config import settings
# Create database engine
# Convert asyncpg URL to psycopg2 for synchronous operations
database_url = settings.database_url.replace("+asyncpg", "+psycopg2")
engine = create_engine(
database_url,
echo=settings.debug,
future=True,
)
def init_db() -> None:
"""Initialize database by creating all tables."""
SQLModel.metadata.create_all(engine)
def get_session() -> Generator[Session, None, None]:
"""Dependency for getting database session."""
with Session(engine) as session:
yield session

View File

@ -0,0 +1,251 @@
"""Export current database data and generate a seeder script."""
from sqlmodel import Session, select
from typing import Dict, Any
import json
from app.core.database import engine
from app.models.route import Route
from app.models.bus_stop import BusStop
from app.models.route_stop import RouteStop
from app.models.bus_schedule import BusSchedule
def export_all_data() -> Dict[str, Any]:
"""Export all data from the database."""
with Session(engine) as session:
# Export routes
routes = session.exec(select(Route)).all()
routes_data = []
for route in routes:
routes_data.append({
"id": str(route.id),
"name": route.name,
"description": route.description,
"origin_city": route.origin_city,
"destination_city": route.destination_city,
"distance_km": route.distance_km,
"estimated_duration_minutes": route.estimated_duration_minutes,
"status": route.status.value if route.status else "active",
})
# Export bus stops
bus_stops = session.exec(select(BusStop)).all()
bus_stops_data = []
for stop in bus_stops:
bus_stops_data.append({
"id": str(stop.id),
"name": stop.name,
"latitude": stop.latitude,
"longitude": stop.longitude,
"city": stop.city,
"address": stop.address,
"stop_type": stop.stop_type.value if stop.stop_type else "regular",
"has_shelter": stop.has_shelter,
"has_seating": stop.has_seating,
"is_accessible": stop.is_accessible,
})
# Export route stops
route_stops = session.exec(select(RouteStop)).all()
route_stops_data = []
for route_stop in route_stops:
route_stops_data.append({
"id": str(route_stop.id),
"route_id": str(route_stop.route_id),
"stop_id": str(route_stop.stop_id),
"stop_order": route_stop.stop_order,
"travel_time_minutes": route_stop.travel_time_minutes,
"is_pickup_point": route_stop.is_pickup_point,
"is_dropoff_point": route_stop.is_dropoff_point,
})
# Export bus schedules
bus_schedules = session.exec(select(BusSchedule)).all()
bus_schedules_data = []
for schedule in bus_schedules:
bus_schedules_data.append({
"id": str(schedule.id),
"route_id": str(schedule.route_id),
"departure_time": schedule.departure_time.strftime("%H:%M:%S") if schedule.departure_time else None,
"frequency_minutes": schedule.frequency_minutes,
"schedule_type": schedule.schedule_type.value if schedule.schedule_type else "weekday",
"is_active": schedule.is_active,
"notes": schedule.notes,
})
return {
"routes": routes_data,
"bus_stops": bus_stops_data,
"route_stops": route_stops_data,
"bus_schedules": bus_schedules_data,
}
def generate_seeder_code(data: Dict[str, Any]) -> str:
"""Generate Python seeder code from exported data."""
code = '''"""Database seeding script generated from current database."""
from sqlmodel import Session, select, create_engine
from uuid import UUID
from datetime import time
from app.core.config import settings
from app.models.route import Route, RouteStatus
from app.models.bus_stop import BusStop, StopType
from app.models.route_stop import RouteStop
from app.models.bus_schedule import BusSchedule, BusScheduleType
def seed_database():
"""Seed the database with exported data."""
# Use synchronous engine for seeding (replace asyncpg with psycopg2)
sync_database_url = settings.database_url.replace("+asyncpg", "+psycopg2")
sync_engine = create_engine(sync_database_url, echo=False)
with Session(sync_engine) as session:
# Check if data already exists
try:
existing_routes = session.exec(select(Route)).first()
if existing_routes:
print("Database already has data. Skipping seed.")
print("To reseed, drop the tables first or use: make db-reset")
return
except Exception as e:
# If tables don't exist yet, that's fine - we'll create the data
print(f"Note: {e}")
print("Proceeding with seed...")
'''
# Generate routes
code += " # Insert Routes\n"
code += " routes = [\n"
for route in data["routes"]:
status_enum = route["status"].upper().replace("-", "_")
code += f''' Route(
id=UUID("{route["id"]}"),
name={repr(route["name"])},
description={repr(route["description"])},
origin_city={repr(route["origin_city"])},
destination_city={repr(route["destination_city"])},
distance_km={route["distance_km"] if route["distance_km"] is not None else "None"},
estimated_duration_minutes={route["estimated_duration_minutes"] if route["estimated_duration_minutes"] is not None else "None"},
status=RouteStatus.{status_enum},
),
'''
code += " ]\n"
code += " for route in routes:\n"
code += " session.add(route)\n"
code += " session.flush() # Flush routes so we can reference them in foreign keys\n\n"
# Generate bus stops
code += " # Insert Bus Stops\n"
code += " bus_stops = [\n"
for stop in data["bus_stops"]:
stop_type_enum = stop["stop_type"].upper().replace("-", "_")
code += f''' BusStop(
id=UUID("{stop["id"]}"),
name={repr(stop["name"])},
latitude={stop["latitude"]},
longitude={stop["longitude"]},
city={repr(stop["city"])},
address={repr(stop["address"]) if stop["address"] else "None"},
stop_type=StopType.{stop_type_enum},
has_shelter={stop["has_shelter"]},
has_seating={stop["has_seating"]},
is_accessible={stop["is_accessible"]},
),
'''
code += " ]\n"
code += " for stop in bus_stops:\n"
code += " session.add(stop)\n"
code += " session.flush() # Flush stops so we can reference them in route_stops\n\n"
# Generate route stops
code += " # Insert Route Stops\n"
code += " route_stops = [\n"
for route_stop in data["route_stops"]:
code += f''' RouteStop(
route_id=UUID("{route_stop["route_id"]}"),
stop_id=UUID("{route_stop["stop_id"]}"),
stop_order={route_stop["stop_order"]},
travel_time_minutes={route_stop["travel_time_minutes"] if route_stop["travel_time_minutes"] is not None else "None"},
is_pickup_point={route_stop["is_pickup_point"]},
is_dropoff_point={route_stop["is_dropoff_point"]},
),
'''
code += " ]\n"
code += " for route_stop in route_stops:\n"
code += " session.add(route_stop)\n"
code += " session.flush() # Flush route_stops before adding schedules\n\n"
# Generate bus schedules
code += " # Insert Bus Schedules\n"
code += " bus_schedules = [\n"
for schedule in data["bus_schedules"]:
if schedule["departure_time"]:
hour, minute, second = schedule["departure_time"].split(":")
# Convert to int to remove leading zeros
hour_int = int(hour)
minute_int = int(minute)
time_str = f"time({hour_int}, {minute_int})"
else:
time_str = "None"
schedule_type_enum = schedule["schedule_type"].upper().replace("-", "_")
code += f''' BusSchedule(
route_id=UUID("{schedule["route_id"]}"),
departure_time={time_str},
frequency_minutes={schedule["frequency_minutes"] if schedule["frequency_minutes"] is not None else "None"},
schedule_type=BusScheduleType.{schedule_type_enum},
is_active={schedule["is_active"]},
notes={repr(schedule["notes"]) if schedule["notes"] else "None"},
),
'''
code += " ]\n"
code += " for schedule in bus_schedules:\n"
code += " session.add(schedule)\n\n"
code += " session.commit()\n"
code += ' print("Database seeded successfully!")\n\n'
code += '''
if __name__ == "__main__":
seed_database()
'''
return code
def main():
"""Main function to export database and generate seeder."""
print("Exporting database data...")
try:
data = export_all_data()
# Save JSON export
json_file = "database_export.json"
with open(json_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, default=str, ensure_ascii=False)
print(f"✅ Exported data to {json_file}")
print(f" - {len(data['routes'])} routes")
print(f" - {len(data['bus_stops'])} bus stops")
print(f" - {len(data['route_stops'])} route stops")
print(f" - {len(data['bus_schedules'])} bus schedules")
# Generate seeder code
seeder_code = generate_seeder_code(data)
seeder_file = "app/core/seed.py"
with open(seeder_file, "w", encoding="utf-8") as f:
f.write(seeder_code)
print(f"✅ Generated seeder script: {seeder_file}")
except Exception as e:
print(f"❌ Error exporting database: {e}")
raise
if __name__ == "__main__":
main()

View File

@ -0,0 +1,92 @@
"""Script to export route data from Supabase to update seed script."""
import os
from supabase import create_client, Client
from typing import Dict, Any
import json
def get_supabase_client() -> Client:
"""Create Supabase client from environment variables."""
supabase_url = os.getenv("SUPABASE_URL")
supabase_key = os.getenv("SUPABASE_ANON_KEY")
if not supabase_url or not supabase_key:
raise ValueError("SUPABASE_URL and SUPABASE_ANON_KEY must be set in environment")
return create_client(supabase_url, supabase_key)
def export_route_data(route_name: str = "Boquete>David") -> Dict[str, Any]:
"""Export route data including all stops from Supabase."""
supabase = get_supabase_client()
# Get route
route_response = supabase.table("routes").select("*").eq("name", route_name).execute()
if not route_response.data:
raise ValueError(f"Route '{route_name}' not found in Supabase")
route = route_response.data[0]
route_id = route["id"]
# Get all route stops with stop details
route_stops_response = supabase.table("route_stops").select(
"*, bus_stops(*)"
).eq("route_id", route_id).order("stop_order").execute()
route_stops = route_stops_response.data
return {
"route": route,
"route_stops": route_stops,
"total_stops": len(route_stops)
}
def export_all_routes() -> Dict[str, Any]:
"""Export all routes and their stops from Supabase."""
supabase = get_supabase_client()
# Get all routes
routes_response = supabase.table("routes").select("*").execute()
routes = routes_response.data
all_data = {}
for route in routes:
route_id = route["id"]
route_name = route["name"]
# Get all route stops
route_stops_response = supabase.table("route_stops").select(
"*, bus_stops(*)"
).eq("route_id", route_id).order("stop_order").execute()
all_data[route_name] = {
"route": route,
"route_stops": route_stops_response.data,
"total_stops": len(route_stops_response.data)
}
return all_data
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "--all":
# Export all routes
data = export_all_routes()
output_file = "supabase_export_all.json"
else:
# Export specific route
route_name = sys.argv[1] if len(sys.argv) > 1 else "Boquete>David"
data = export_route_data(route_name)
output_file = f"supabase_export_{route_name.replace('>', '_')}.json"
# Save to JSON file
with open(output_file, "w") as f:
json.dump(data, f, indent=2, default=str)
print(f"✅ Exported data to {output_file}")
if isinstance(data, dict) and "total_stops" in data:
print(f" Total stops: {data['total_stops']}")
elif isinstance(data, dict):
for route_name, route_data in data.items():
print(f" {route_name}: {route_data['total_stops']} stops")

View File

@ -0,0 +1,99 @@
"""Helper script to generate intermediate stops along a route."""
from typing import List, Tuple
def interpolate_point(
start: Tuple[float, float],
end: Tuple[float, float],
fraction: float
) -> Tuple[float, float]:
"""Interpolate a point between start and end coordinates."""
lat = start[0] + (end[0] - start[0]) * fraction
lng = start[1] + (end[1] - start[1]) * fraction
return (lat, lng)
def generate_intermediate_stops(
start_coords: Tuple[float, float],
end_coords: Tuple[float, float],
num_stops: int,
start_name: str = "Start",
end_name: str = "End",
city: str = "Route"
) -> List[dict]:
"""Generate intermediate stops along a route.
Args:
start_coords: (latitude, longitude) of start point
end_coords: (latitude, longitude) of end point
num_stops: Total number of stops to generate (including start and end)
start_name: Name of the start stop
end_name: Name of the end stop
city: City name for intermediate stops
Returns:
List of stop dictionaries with name, lat, lng, city
"""
if num_stops < 2:
return [
{"name": start_name, "lat": start_coords[0], "lng": start_coords[1], "city": city},
{"name": end_name, "lat": end_coords[0], "lng": end_coords[1], "city": city}
]
stops = []
# Add start stop
stops.append({
"name": start_name,
"lat": start_coords[0],
"lng": start_coords[1],
"city": city
})
# Generate intermediate stops
for i in range(1, num_stops - 1):
fraction = i / (num_stops - 1)
lat, lng = interpolate_point(start_coords, end_coords, fraction)
# Generate realistic stop names
stop_name = f"Parada {i}"
if i % 10 == 0:
stop_name = f"Parada Principal {i // 10}"
elif i % 5 == 0:
stop_name = f"Intersección {i // 5}"
stops.append({
"name": stop_name,
"lat": lat,
"lng": lng,
"city": city
})
# Add end stop
stops.append({
"name": end_name,
"lat": end_coords[0],
"lng": end_coords[1],
"city": city
})
return stops
# Example: Generate 61 stops for Boquete>David route
if __name__ == "__main__":
# Boquete coordinates (Terminal)
boquete_start = (8.7697, -82.4328)
# David coordinates (Terminal)
david_end = (8.4177, -82.4270)
stops = generate_intermediate_stops(
boquete_start,
david_end,
num_stops=61,
start_name="Terminal de Boquete",
end_name="Terminal de David",
city="Ruta Boquete-David"
)
print(f"Generated {len(stops)} stops:")
for i, stop in enumerate(stops, 1):
print(f"{i:2d}. {stop['name']:30s} ({stop['lat']:.6f}, {stop['lng']:.6f})")

View File

@ -0,0 +1,223 @@
"""Script to import bus stop coordinates from Supabase that follow actual roads."""
import os
from pathlib import Path
from supabase import create_client, Client
from sqlmodel import Session, select, create_engine
import sys
from app.core.config import settings, get_env_file
from app.models.bus_stop import BusStop
from app.models.route import Route
from app.models.route_stop import RouteStop
def load_env_file():
"""Load environment variables from .env.development file."""
env_file = get_env_file()
env_path = Path(env_file)
# If relative path, make it relative to backend directory
if not env_path.is_absolute():
backend_dir = Path(__file__).parent.parent.parent
env_path = backend_dir / env_file
if env_path.exists():
from dotenv import load_dotenv
load_dotenv(env_path)
print(f"✓ Loaded environment from {env_path}")
else:
print(f"⚠️ Warning: {env_path} not found, using system environment variables")
def get_supabase_client() -> Client:
"""Create Supabase client from environment variables."""
# Load .env.development file first
load_env_file()
supabase_url = os.getenv("SUPABASE_URL")
supabase_key = os.getenv("SUPABASE_ANON_KEY")
if not supabase_url or not supabase_key:
raise ValueError(
"SUPABASE_URL and SUPABASE_ANON_KEY must be set in environment or .env.development file.\n"
f"Checked file: {get_env_file()}"
)
return create_client(supabase_url, supabase_key)
def import_coordinates_from_supabase(route_name: str = "Boquete>David", supabase_route_name: str = None):
"""Import bus stop coordinates from Supabase for a specific route.
Args:
route_name: Route name in local database format (e.g., "Boquete>David")
supabase_route_name: Route name in Supabase format (e.g., "Boquete David").
If None, will try to find it automatically.
"""
supabase = get_supabase_client()
# If supabase_route_name is provided, use it directly
if supabase_route_name:
route_response = supabase.table("routes").select("*").eq("name", supabase_route_name).execute()
else:
# Get route from Supabase - try exact match first, then try variations
route_response = supabase.table("routes").select("*").eq("name", route_name).execute()
# If not found, try with different separators (Supabase uses " " em dash)
if not route_response.data:
# Try with different separators
variations = [
route_name.replace(">", " "), # Em dash (Supabase format)
route_name.replace(">", "-"), # Regular dash
route_name.replace(">", " to "), # " to "
route_name.replace(">", " -> "), # " -> "
]
for variant in variations:
route_response = supabase.table("routes").select("*").eq("name", variant).execute()
if route_response.data:
print(f"Found route with name variation: '{variant}'")
break
# If still not found, list available routes
if not route_response.data:
all_routes = supabase.table("routes").select("name").execute()
available = [r["name"] for r in all_routes.data] if all_routes.data else []
raise ValueError(
f"Route '{route_name}' not found in Supabase.\n"
f"Available routes: {', '.join(available) if available else 'None found'}"
)
supabase_route = route_response.data[0]
supabase_route_id = supabase_route["id"]
# Get all route stops with stop details from Supabase
# Old app uses 'stops' table with 'lat' and 'lng' columns, and 'seq' for order
# Try different query formats to match Supabase schema
try:
# Try the format used by old Flutter app: stops:stop_id with seq
route_stops_response = supabase.table("route_stops").select(
"seq, stops:stop_id(id, name, lat, lng)"
).eq("route_id", supabase_route_id).order("seq").execute()
except Exception as e1:
try:
# Try with stop_order instead of seq
route_stops_response = supabase.table("route_stops").select(
"stop_order, stops:stop_id(id, name, lat, lng)"
).eq("route_id", supabase_route_id).order("stop_order").execute()
except Exception as e2:
try:
# Try bus_stops table (new schema)
route_stops_response = supabase.table("route_stops").select(
"stop_order, bus_stops(*)"
).eq("route_id", supabase_route_id).order("stop_order").execute()
except Exception as e3:
raise Exception(f"Could not query Supabase: {e1}, {e2}, {e3}")
if not route_stops_response.data:
print(f"No stops found for route '{route_name}' in Supabase")
return
# Connect to local database
sync_database_url = settings.database_url.replace("+asyncpg", "+psycopg2")
sync_engine = create_engine(sync_database_url, echo=False)
with Session(sync_engine) as session:
# Find the route in local database
local_route = session.exec(select(Route).where(Route.name == route_name)).first()
if not local_route:
print(f"Route '{route_name}' not found in local database. Please create it first.")
return
print(f"Found route '{route_name}' in local database (ID: {local_route.id})")
# Get local route stops ordered by stop_order
local_route_stops = session.exec(
select(RouteStop, BusStop)
.join(BusStop, RouteStop.stop_id == BusStop.id)
.where(RouteStop.route_id == local_route.id)
.order_by(RouteStop.stop_order)
).all()
if len(local_route_stops) != len(route_stops_response.data):
print(f"⚠️ Warning: Local database has {len(local_route_stops)} stops, Supabase has {len(route_stops_response.data)} stops")
print(" Updating coordinates for matching stops...")
# Update coordinates for each stop
updated_count = 0
for i, supabase_stop_data in enumerate(route_stops_response.data):
if i >= len(local_route_stops):
break
# Try different possible field names for the stop data
supabase_stop = (
supabase_stop_data.get("bus_stops") or
supabase_stop_data.get("stops") or
supabase_stop_data # If the stop data is directly in the response
)
if not supabase_stop or not isinstance(supabase_stop, dict):
continue
# Get the local stop
local_route_stop, local_bus_stop = local_route_stops[i]
# Update coordinates from Supabase
# Supabase may use 'lat'/'lng' (old schema) or 'latitude'/'longitude' (new schema)
new_latitude = supabase_stop.get("latitude") or supabase_stop.get("lat")
new_longitude = supabase_stop.get("longitude") or supabase_stop.get("lng")
if new_latitude is None or new_longitude is None:
print(f"⚠️ Skipping stop {i+1}: missing coordinates in Supabase")
continue
# Check if coordinates are different
if abs(local_bus_stop.latitude - new_latitude) > 0.0001 or \
abs(local_bus_stop.longitude - new_longitude) > 0.0001:
local_bus_stop.latitude = new_latitude
local_bus_stop.longitude = new_longitude
session.add(local_bus_stop)
updated_count += 1
print(f"✓ Updated stop {i+1} ({local_bus_stop.name}): "
f"({new_latitude:.6f}, {new_longitude:.6f})")
session.commit()
print(f"\n✅ Successfully updated {updated_count} stops with coordinates from Supabase")
def import_all_routes():
"""Import coordinates for all routes from Supabase."""
supabase = get_supabase_client()
# Get all routes from Supabase
routes_response = supabase.table("routes").select("*").execute()
routes = routes_response.data
print(f"Found {len(routes)} routes in Supabase")
for route in routes:
supabase_route_name = route["name"] # Format: "Boquete David"
# Convert Supabase format to local format for matching
local_route_name = supabase_route_name.replace(" ", ">").replace(" - ", ">").replace(" to ", ">").replace(" -> ", ">")
print(f"\n{'='*60}")
print(f"Importing coordinates for route: {supabase_route_name}")
print(f"Looking for local route: {local_route_name}")
print(f"{'='*60}")
try:
import_coordinates_from_supabase(local_route_name, supabase_route_name=supabase_route_name)
except Exception as e:
print(f"❌ Error importing route '{supabase_route_name}': {e}")
continue
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "--all":
# Import all routes
import_all_routes()
else:
# Import specific route
route_name = sys.argv[1] if len(sys.argv) > 1 else "Boquete>David"
import_coordinates_from_supabase(route_name)

View File

@ -0,0 +1,46 @@
import bcrypt
from datetime import datetime, timedelta, timezone
from typing import Any, Union, Optional
from jose import jwt
from app.core.config import settings
ALGORITHM = "HS256"
def create_access_token(
subject: Union[str, Any],
role: str,
full_name: str,
expires_delta: Optional[timedelta] = None
) -> str:
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=1440)
to_encode = {
"exp": expire,
"sub": str(subject),
"role": role,
"full_name": full_name
}
encoded_jwt = jwt.encode(to_encode, settings.secret_key, algorithm=ALGORITHM)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
return bcrypt.checkpw(
plain_password.encode('utf-8'),
hashed_password.encode('utf-8')
)
def get_password_hash(password: str) -> str:
return bcrypt.hashpw(
password.encode('utf-8'),
bcrypt.gensalt()
).decode('utf-8')
def get_token_payload(token: str) -> dict:
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[ALGORITHM])
return payload
except Exception:
return {}

5900
backend/app/core/seed.py Normal file

File diff suppressed because it is too large Load Diff

70
backend/app/main.py Normal file
View File

@ -0,0 +1,70 @@
"""FastAPI application entry point."""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
import os
from app.core.config import settings
from app.api.routes import router as routes_router
from app.api.bus_stops import router as bus_stops_router
from app.api.schedules import router as schedules_router
from app.api.coupons import router as coupons_router
from app.api.taxis import router as taxis_router
from app.api.auth import router as auth_router
from app.api.users import router as users_router
from app.api.favorites import router as favorites_router
from app.api.telemetry import router as telemetry_router
from app.api.businesses import router as businesses_router
from app.api.analytics import router as analytics_router
from app.api.reports import router as reports_router
from app.api.shuttles import router as shuttles_router
app = FastAPI(
title="SIBU Transportation API",
description="API for SIBU public transportation system",
version="1.0.0",
debug=settings.debug,
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure properly for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Ensure upload directories exist
for sub in ["profiles", "vehicles", "businesses"]:
os.makedirs(os.path.join("uploads", sub), exist_ok=True)
# Mount static files
app.mount("/uploads", StaticFiles(directory="uploads"), name="uploads")
# Include routers
app.include_router(routes_router)
app.include_router(bus_stops_router)
app.include_router(schedules_router)
app.include_router(coupons_router)
app.include_router(taxis_router)
app.include_router(auth_router)
app.include_router(users_router)
app.include_router(favorites_router)
app.include_router(telemetry_router)
app.include_router(businesses_router)
app.include_router(analytics_router, prefix="/api/analytics", tags=["analytics"])
app.include_router(reports_router)
app.include_router(shuttles_router)
@app.get("/")
async def root():
"""Root endpoint."""
return {"message": "SIBU Transportation API", "version": "1.0.0"}
@app.get("/health")
async def health():
"""Health check endpoint."""
return {"status": "healthy", "environment": settings.environment}

View File

@ -0,0 +1,18 @@
"""Database models."""
from app.models.route import Route
from app.models.bus_stop import BusStop
from app.models.route_stop import RouteStop
from app.models.bus_schedule import BusSchedule
from app.models.user import User, DriverProfile
from app.models.taxi import Taxi
from app.models.favorite import Favorite
from app.models.telemetry import Telemetry
from app.models.coupon import Coupon
from app.models.business import Business
from app.models.user_coupon import UserCoupon
from app.models.analytics import AnalyticsEvent
from app.models.report import Report
from app.models.shuttle import Shuttle
__all__ = ["Route", "BusStop", "RouteStop", "BusSchedule", "User", "DriverProfile", "Taxi", "Favorite", "Telemetry", "Coupon", "Business", "UserCoupon", "AnalyticsEvent", "Report", "Shuttle"]

View File

@ -0,0 +1,19 @@
from sqlmodel import SQLModel, Field
from typing import Optional, Dict
from uuid import UUID, uuid4
from datetime import datetime
from sqlalchemy import Column, DateTime, func, JSON
class AnalyticsEvent(SQLModel, table=True):
__tablename__ = "analytics_events"
id: Optional[UUID] = Field(default_factory=uuid4, primary_key=True)
event_name: str = Field(index=True)
user_id: Optional[UUID] = Field(default=None, index=True, foreign_key="users.id")
screen_name: Optional[str] = None
item_id: Optional[str] = None # route_id, stop_id, promo_id, etc.
properties: Optional[Dict] = Field(default_factory=dict, sa_column=Column(JSON))
timestamp: datetime = Field(
default_factory=datetime.utcnow,
sa_column=Column(DateTime(timezone=True), server_default=func.now(), index=True)
)

View File

@ -0,0 +1,31 @@
"""Bus schedule model."""
from sqlmodel import SQLModel, Field, Column
from datetime import datetime, time
from typing import Optional
from enum import Enum
from uuid import UUID, uuid4
from sqlalchemy import DateTime, func
class BusScheduleType(str, Enum):
"""Schedule type enumeration."""
WEEKDAY = "weekday"
WEEKEND = "weekend"
HOLIDAY = "holiday"
class BusSchedule(SQLModel, table=True):
"""Bus schedule model."""
__tablename__ = "bus_schedules"
id: Optional[UUID] = Field(default_factory=uuid4, primary_key=True)
route_id: UUID = Field(foreign_key="routes.id")
departure_time: time
frequency_minutes: Optional[int] = 30
schedule_type: BusScheduleType = BusScheduleType.WEEKDAY
is_active: bool = Field(default=True)
is_published: bool = Field(default=False)
notes: Optional[str] = None
created_at: Optional[datetime] = Field(sa_column=Column(DateTime, server_default=func.now()))

View File

@ -0,0 +1,35 @@
"""Bus stop model."""
from sqlmodel import SQLModel, Field, Column
from datetime import datetime
from typing import Optional
from enum import Enum
from uuid import UUID, uuid4
from sqlalchemy import DateTime, func
class StopType(str, Enum):
"""Stop type enumeration."""
TERMINAL = "terminal"
REGULAR = "regular"
EXPRESS_ONLY = "express_only"
class BusStop(SQLModel, table=True):
"""Bus stop model."""
__tablename__ = "bus_stops"
id: Optional[UUID] = Field(default_factory=uuid4, primary_key=True)
name: str
latitude: float
longitude: float
city: str
address: Optional[str] = None
stop_type: StopType = StopType.REGULAR
has_shelter: bool = False
has_seating: bool = False
is_accessible: bool = False
stop_order: Optional[int] = None
created_at: Optional[datetime] = Field(sa_column=Column(DateTime, server_default=func.now()))
updated_at: Optional[datetime] = Field(sa_column=Column(DateTime, server_default=func.now(), onupdate=func.now()))

View File

@ -0,0 +1,58 @@
from sqlmodel import SQLModel, Field, Column, Relationship
from datetime import datetime
from typing import Optional, List, TYPE_CHECKING
from uuid import UUID, uuid4
from sqlalchemy import DateTime, func
if TYPE_CHECKING:
from app.models.coupon import Coupon
class Business(SQLModel, table=True):
"""Business record for local partners."""
__tablename__ = "businesses"
id: Optional[UUID] = Field(default_factory=uuid4, primary_key=True)
name: str
address: Optional[str] = None
phone: Optional[str] = None
image_url: Optional[str] = None
social_media: Optional[str] = None
category: Optional[str] = None
latitude: Optional[float] = None
longitude: Optional[float] = None
area: Optional[str] = Field(default="Boquete")
# Relationship to coupons
coupons: List["Coupon"] = Relationship(back_populates="business")
created_at: Optional[datetime] = Field(
default=None,
sa_column=Column(DateTime(timezone=True), server_default=func.now())
)
updated_at: Optional[datetime] = Field(
default=None,
sa_column=Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
)
class BusinessCreate(SQLModel):
name: str
address: Optional[str] = None
phone: Optional[str] = None
image_url: Optional[str] = None
social_media: Optional[str] = None
category: Optional[str] = None
latitude: Optional[float] = None
longitude: Optional[float] = None
area: Optional[str] = "Boquete"
class BusinessUpdate(SQLModel):
name: Optional[str] = None
address: Optional[str] = None
phone: Optional[str] = None
image_url: Optional[str] = None
social_media: Optional[str] = None
category: Optional[str] = None
latitude: Optional[float] = None
longitude: Optional[float] = None
area: Optional[str] = None

View File

@ -0,0 +1,79 @@
from sqlmodel import SQLModel, Field, Column, Relationship
from datetime import datetime
from typing import Optional, TYPE_CHECKING
from uuid import UUID, uuid4
from sqlalchemy import DateTime, func
if TYPE_CHECKING:
from app.models.business import Business
class Coupon(SQLModel, table=True):
"""Coupon record for promotions."""
__tablename__ = "coupons"
id: Optional[UUID] = Field(default_factory=uuid4, primary_key=True)
business_id: Optional[UUID] = Field(default=None, foreign_key="businesses.id")
title: str = Field(index=True)
description: Optional[str] = None
# Relationship to business
business: Optional["Business"] = Relationship(back_populates="coupons")
business_name: Optional[str] = None
business_address: Optional[str] = None
business_phone: Optional[str] = None
image_url: Optional[str] = None
social_media: Optional[str] = None
terms: Optional[str] = None
discount_percentage: Optional[int] = None
discount_amount: Optional[float] = None
category: Optional[str] = None
valid_from: Optional[datetime] = Field(
sa_column=Column(DateTime(timezone=True), nullable=True)
)
valid_until: Optional[datetime] = Field(
sa_column=Column(DateTime(timezone=True), nullable=True)
)
is_active: bool = Field(default=True)
created_at: Optional[datetime] = Field(
default=None,
sa_column=Column(DateTime(timezone=True), server_default=func.now())
)
updated_at: Optional[datetime] = Field(
default=None,
sa_column=Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
)
class CouponCreate(SQLModel):
title: str
business_id: Optional[UUID] = None
description: Optional[str] = None
business_name: Optional[str] = None
business_address: Optional[str] = None
business_phone: Optional[str] = None
image_url: Optional[str] = None
social_media: Optional[str] = None
terms: Optional[str] = None
discount_percentage: Optional[int] = None
discount_amount: Optional[float] = None
category: Optional[str] = None
valid_from: Optional[datetime] = None
valid_until: Optional[datetime] = None
is_active: Optional[bool] = True
class CouponUpdate(SQLModel):
title: Optional[str] = None
business_id: Optional[UUID] = None
description: Optional[str] = None
business_name: Optional[str] = None
business_address: Optional[str] = None
business_phone: Optional[str] = None
image_url: Optional[str] = None
social_media: Optional[str] = None
terms: Optional[str] = None
discount_percentage: Optional[int] = None
discount_amount: Optional[float] = None
category: Optional[str] = None
valid_from: Optional[datetime] = None
valid_until: Optional[datetime] = None
is_active: Optional[bool] = None

View File

@ -0,0 +1,31 @@
from sqlmodel import SQLModel, Field
from datetime import datetime
from typing import Optional
from uuid import UUID, uuid4
class Favorite(SQLModel, table=True):
__tablename__ = "favorites"
id: Optional[UUID] = Field(default_factory=uuid4, primary_key=True)
user_id: UUID = Field(foreign_key="users.id", index=True)
# Type of favorite: 'coupon', 'business', 'taxi', 'route'
item_type: str = Field(index=True)
item_id: str = Field(index=True)
# Optional metadata
item_name: Optional[str] = None
item_image: Optional[str] = None
created_at: datetime = Field(default_factory=datetime.utcnow)
class Config:
json_schema_extra = {
"example": {
"user_id": "user-123",
"item_type": "coupon",
"item_id": "coupon-456",
"item_name": "50% descuento en restaurante",
"item_image": "/uploads/coupon.jpg"
}
}

View File

@ -0,0 +1,27 @@
from sqlmodel import SQLModel, Field, Relationship
from typing import Optional, TYPE_CHECKING
if TYPE_CHECKING:
from app.models.user import User
from uuid import UUID, uuid4
from datetime import datetime
from sqlalchemy import Column, DateTime, func
from enum import Enum
class ReportStatus(str, Enum):
PENDING = "pending"
RESOLVED = "resolved"
ARCHIVED = "archived"
class Report(SQLModel, table=True):
__tablename__ = "reports"
id: Optional[UUID] = Field(default_factory=uuid4, primary_key=True)
user_id: Optional[UUID] = Field(default=None, foreign_key="users.id")
message: str
status: ReportStatus = Field(default=ReportStatus.PENDING)
created_at: Optional[datetime] = Field(
sa_column=Column(DateTime, server_default=func.now())
)
# Relationships
user: Optional["User"] = Relationship()

View File

@ -0,0 +1,33 @@
"""Route model."""
from sqlmodel import SQLModel, Field, Column
from datetime import datetime
from typing import Optional
from enum import Enum
from uuid import UUID, uuid4
from sqlalchemy import DateTime, func
class RouteStatus(str, Enum):
"""Route status enumeration."""
ACTIVE = "active"
INACTIVE = "inactive"
MAINTENANCE = "maintenance"
class Route(SQLModel, table=True):
"""Route model representing a bus route."""
__tablename__ = "routes"
id: Optional[UUID] = Field(default_factory=uuid4, primary_key=True)
name: str = Field(unique=True, index=True)
description: Optional[str] = None
origin_city: str
destination_city: str
distance_km: Optional[float] = None
estimated_duration_minutes: Optional[int] = None
average_speed_kmh: Optional[float] = None
status: RouteStatus = RouteStatus.ACTIVE
created_at: Optional[datetime] = Field(sa_column=Column(DateTime, server_default=func.now()))
updated_at: Optional[datetime] = Field(sa_column=Column(DateTime, server_default=func.now(), onupdate=func.now()))

View File

@ -0,0 +1,23 @@
"""Route stop junction model."""
from sqlmodel import SQLModel, Field, Column
from datetime import datetime
from typing import Optional
from uuid import UUID, uuid4
from sqlalchemy import DateTime, func
class RouteStop(SQLModel, table=True):
"""Route stop junction table connecting routes to their stops."""
__tablename__ = "route_stops"
id: Optional[UUID] = Field(default_factory=uuid4, primary_key=True)
route_id: UUID = Field(foreign_key="routes.id")
stop_id: UUID = Field(foreign_key="bus_stops.id")
stop_order: int
travel_time_minutes: Optional[int] = None
stop_delay_minutes: int = 0
is_pickup_point: bool = True
is_dropoff_point: bool = True
created_at: Optional[datetime] = Field(sa_column=Column(DateTime, server_default=func.now()))

View File

@ -0,0 +1,32 @@
"""Shuttle model for intercity and tourist trips."""
from sqlmodel import SQLModel, Field, Column
from datetime import datetime
from typing import Optional
from uuid import UUID, uuid4
from sqlalchemy import DateTime, func
class Shuttle(SQLModel, table=True):
"""Model representing an intercity shuttle or tourist trip."""
__tablename__ = "shuttles"
id: Optional[UUID] = Field(default_factory=uuid4, primary_key=True)
route_name: str = Field(index=True) # e.g. "Boquete - Santa Catalina"
description: Optional[str] = None
origin: str = Field(index=True)
destination: str = Field(index=True)
vehicle_type: str # Private Bus, Private Car, Van
company_name: Optional[str] = None # e.g. "Chiriqui Transfers"
trip_type: str = "one_way" # one_way, round_trip, both
price_per_person: Optional[float] = None
price_private_trip: Optional[float] = None
estimated_duration: str # e.g. "4.5 hours"
departure_times: Optional[str] = None # e.g. "Every Day at 8:00 AM"
contact_whatsapp: str
phone_number: Optional[str] = None
english_speaking: bool = Field(default=False)
image_url: Optional[str] = None
is_active: bool = Field(default=True)
created_at: Optional[datetime] = Field(sa_column=Column(DateTime, server_default=func.now()))
updated_at: Optional[datetime] = Field(sa_column=Column(DateTime, server_default=func.now(), onupdate=func.now()))

View File

@ -0,0 +1,26 @@
"""Taxi model."""
from sqlmodel import SQLModel, Field, Column
from datetime import datetime
from typing import Optional
from uuid import UUID, uuid4
from sqlalchemy import DateTime, func
class Taxi(SQLModel, table=True):
"""Taxi model representing an authorized taxi."""
__tablename__ = "taxis"
id: Optional[UUID] = Field(default_factory=uuid4, primary_key=True)
owner_name: str
phone_number: str
license_plate: str = Field(unique=True, index=True)
cooperative: Optional[str] = None
corregimiento: str = Field(index=True)
shift: str = "day" # day, night, 24h
rating: float = Field(default=5.0)
english_speaking: bool = Field(default=False)
image_url: Optional[str] = None
is_active: bool = Field(default=True)
created_at: Optional[datetime] = Field(sa_column=Column(DateTime, server_default=func.now()))
updated_at: Optional[datetime] = Field(sa_column=Column(DateTime, server_default=func.now(), onupdate=func.now()))

View File

@ -0,0 +1,35 @@
"""Telemetry model for real-time tracking."""
from sqlmodel import SQLModel, Field, Column
from datetime import datetime
from typing import Optional
from uuid import UUID, uuid4
from sqlalchemy import DateTime, func
from enum import Enum
class VehicleStatus(str, Enum):
ACTIVE = "active"
OFFLINE = "offline"
BREAK = "break"
class Telemetry(SQLModel, table=True):
"""Telemetry record for a driver's vehicle."""
__tablename__ = "telemetry"
id: Optional[UUID] = Field(default_factory=uuid4, primary_key=True)
user_id: UUID = Field(foreign_key="users.id", index=True)
latitude: float
longitude: float
speed: Optional[float] = None
heading: Optional[float] = None
status: VehicleStatus = Field(default=VehicleStatus.ACTIVE)
timestamp: datetime = Field(
sa_column=Column(DateTime, server_default=func.now(), index=True)
)
class TelemetryCreate(SQLModel):
latitude: float
longitude: float
speed: Optional[float] = None
heading: Optional[float] = None
status: VehicleStatus = VehicleStatus.ACTIVE

View File

@ -0,0 +1,59 @@
"""User and DriverProfile models."""
from sqlmodel import SQLModel, Field, Relationship
from typing import Optional
from enum import Enum
from uuid import UUID, uuid4
from datetime import datetime
from sqlalchemy import Column, DateTime, func
class UserRole(str, Enum):
ADMIN = "ADMIN"
PASSENGER = "PASSENGER"
DRIVER = "DRIVER"
PROMOTER = "PROMOTER"
class VehicleType(str, Enum):
TAXI = "taxi"
BUS = "bus"
class User(SQLModel, table=True):
__tablename__ = "users"
id: Optional[UUID] = Field(default_factory=uuid4, primary_key=True)
email: str = Field(unique=True, index=True)
hashed_password: str
full_name: str = Field(index=True)
role: UserRole = Field(default=UserRole.PASSENGER)
is_active: bool = Field(default=True)
is_verified: bool = Field(default=False) # For drivers/admins verification
profile_photo_url: Optional[str] = None
created_at: Optional[datetime] = Field(
sa_column=Column(DateTime, server_default=func.now())
)
# Relationships
driver_profile: Optional["DriverProfile"] = Relationship(
back_populates="user", sa_relationship_kwargs={"uselist": False}
)
class DriverProfile(SQLModel, table=True):
__tablename__ = "driver_profiles"
id: Optional[UUID] = Field(default_factory=uuid4, primary_key=True)
user_id: UUID = Field(foreign_key="users.id")
cedula: str
vehicle_type: VehicleType
license_plate: str
photo_url: Optional[str] = None
vehicle_photo_url: Optional[str] = None
cooperative_name: Optional[str] = None # Specifically for Bus
shift: Optional[str] = None # For Taxi schedules (e.g. "Dia,Noche")
payment_methods: Optional[str] = None # e.g. "Efectivo,Yappi"
speaks_english: bool = Field(default=False)
# Relationship
user: User = Relationship(back_populates="driver_profile")

View File

@ -0,0 +1,49 @@
"""UserCoupon model for tracking claimed coupons."""
from sqlmodel import SQLModel, Field, Relationship
from datetime import datetime
from typing import Optional, TYPE_CHECKING
from uuid import UUID, uuid4
from sqlalchemy import Column, DateTime, func, String
from enum import Enum
if TYPE_CHECKING:
from app.models.user import User
from app.models.coupon import Coupon
class UserCouponStatus(str, Enum):
CLAIMED = "claimed"
REDEEMED = "redeemed"
EXPIRED = "expired"
class UserCoupon(SQLModel, table=True):
__tablename__ = "user_coupons"
id: Optional[UUID] = Field(default_factory=uuid4, primary_key=True)
user_id: UUID = Field(foreign_key="users.id")
coupon_id: UUID = Field(foreign_key="coupons.id")
status: UserCouponStatus = Field(default=UserCouponStatus.CLAIMED)
redemption_code: str = Field(
sa_column=Column(String, unique=True, index=True)
)
claimed_at: datetime = Field(
sa_column=Column(DateTime(timezone=True), server_default=func.now())
)
redeemed_at: Optional[datetime] = Field(
sa_column=Column(DateTime(timezone=True), nullable=True)
)
# Relationships
user: "User" = Relationship()
coupon: "Coupon" = Relationship()
class UserCouponRead(SQLModel):
id: UUID
user_id: UUID
coupon_id: UUID
status: UserCouponStatus
redemption_code: str
claimed_at: datetime
redeemed_at: Optional[datetime]
coupon: Optional["Coupon"] = None

View File

@ -0,0 +1,2 @@
"""Pydantic schemas for request/response validation."""

View File

@ -0,0 +1,53 @@
"""Bus stop schemas."""
from pydantic import BaseModel, field_serializer
from typing import Optional
from datetime import datetime
from uuid import UUID
from app.models.bus_stop import StopType
class BusStopBase(BaseModel):
"""Base bus stop schema."""
name: str
latitude: float
longitude: float
city: str
address: Optional[str] = None
stop_type: StopType = StopType.REGULAR
has_shelter: bool = False
has_seating: bool = False
is_accessible: bool = False
stop_order: Optional[int] = None
class BusStopCreate(BusStopBase):
"""Schema for creating a bus stop."""
pass
class BusStopUpdate(BaseModel):
"""Schema for updating a bus stop."""
name: Optional[str] = None
latitude: Optional[float] = None
longitude: Optional[float] = None
city: Optional[str] = None
address: Optional[str] = None
stop_type: Optional[StopType] = None
has_shelter: Optional[bool] = None
has_seating: Optional[bool] = None
is_accessible: Optional[bool] = None
class BusStopResponse(BusStopBase):
"""Schema for bus stop response."""
id: UUID
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
@field_serializer('id')
def serialize_id(self, value: UUID) -> str:
return str(value)
class Config:
from_attributes = True

View File

@ -0,0 +1,22 @@
from pydantic import BaseModel
from typing import Optional
from uuid import UUID
from datetime import datetime
from app.models.report import ReportStatus
class ReportCreate(BaseModel):
message: str
class ReportUpdate(BaseModel):
status: ReportStatus
class ReportResponse(BaseModel):
id: UUID
user_id: Optional[UUID] = None
user_name: Optional[str] = None
message: str
status: ReportStatus
created_at: datetime
class Config:
from_attributes = True

View File

@ -0,0 +1,50 @@
"""Route schemas."""
from pydantic import BaseModel, field_serializer
from typing import Optional
from datetime import datetime
from uuid import UUID
from app.models.route import RouteStatus
class RouteBase(BaseModel):
"""Base route schema."""
name: str
description: Optional[str] = None
origin_city: str
destination_city: str
distance_km: Optional[float] = None
estimated_duration_minutes: Optional[int] = None
average_speed_kmh: Optional[float] = None
status: RouteStatus = RouteStatus.ACTIVE
class RouteCreate(RouteBase):
"""Schema for creating a route."""
pass
class RouteUpdate(BaseModel):
"""Schema for updating a route."""
name: Optional[str] = None
description: Optional[str] = None
origin_city: Optional[str] = None
destination_city: Optional[str] = None
distance_km: Optional[float] = None
estimated_duration_minutes: Optional[int] = None
average_speed_kmh: Optional[float] = None
status: Optional[RouteStatus] = None
class RouteResponse(RouteBase):
"""Schema for route response."""
id: UUID
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
@field_serializer('id')
def serialize_id(self, value: UUID) -> str:
return str(value)
class Config:
from_attributes = True

View File

@ -0,0 +1,21 @@
from pydantic import BaseModel
from typing import Optional
from uuid import UUID
class RouteStopBase(BaseModel):
stop_id: UUID
stop_order: Optional[int] = None
travel_time_minutes: Optional[int] = None
stop_delay_minutes: int = 0
is_pickup_point: Optional[bool] = True
is_dropoff_point: Optional[bool] = True
class RouteStopCreate(RouteStopBase):
pass
class RouteStopUpdate(BaseModel):
stop_order: Optional[int] = None
travel_time_minutes: Optional[int] = None
stop_delay_minutes: Optional[int] = None
is_pickup_point: Optional[bool] = None
is_dropoff_point: Optional[bool] = None

View File

@ -0,0 +1,45 @@
"""Shuttle schemas."""
from typing import Optional
from uuid import UUID
from pydantic import BaseModel
class ShuttleBase(BaseModel):
route_name: str
description: Optional[str] = None
origin: str
destination: str
vehicle_type: str
company_name: Optional[str] = None
trip_type: str = "one_way"
price_per_person: Optional[float] = None
price_private_trip: Optional[float] = None
estimated_duration: str
departure_times: Optional[str] = None
contact_whatsapp: str
image_url: Optional[str] = None
is_active: bool = True
class ShuttleCreate(ShuttleBase):
pass
class ShuttleUpdate(BaseModel):
route_name: Optional[str] = None
description: Optional[str] = None
origin: Optional[str] = None
destination: Optional[str] = None
vehicle_type: Optional[str] = None
company_name: Optional[str] = None
trip_type: Optional[str] = None
price_per_person: Optional[float] = None
price_private_trip: Optional[float] = None
estimated_duration: Optional[str] = None
departure_times: Optional[str] = None
contact_whatsapp: Optional[str] = None
image_url: Optional[str] = None
is_active: Optional[bool] = None
class ShuttleRead(ShuttleBase):
id: UUID
class Config:
from_attributes = True

View File

@ -0,0 +1,61 @@
"""User and Auth schemas."""
from pydantic import BaseModel, EmailStr
from typing import Optional
from uuid import UUID
from app.models.user import UserRole, VehicleType
class UserBase(BaseModel):
email: EmailStr
full_name: str
profile_photo_url: Optional[str] = None
class UserUpdate(BaseModel):
full_name: Optional[str] = None
password: Optional[str] = None
profile_photo_url: Optional[str] = None
class PassengerCreate(UserBase):
password: str
class DriverProfileBase(BaseModel):
cedula: str
vehicle_type: VehicleType
license_plate: str
cooperative_name: Optional[str] = None
class DriverCreate(UserBase):
password: str
cedula: str
vehicle_type: VehicleType
license_plate: str
cooperative_name: Optional[str] = None
# Photos will be handled via UploadFile in FastAPI, not in the Pydantic schema for JSON data
class UserResponse(UserBase):
id: UUID
role: UserRole
is_active: bool
is_verified: bool
class Config:
from_attributes = True
class LoginRequest(BaseModel):
email: str
password: str
keep_session: bool = False
class Token(BaseModel):
access_token: str
token_type: str
role: str
full_name: str
profile_photo_url: Optional[str] = None

View File