# Project_folder/main.py
from app.core.inits import initialize_app
app = initialize_app()
# Project_folder/app/apis/accounts.py
import os
import uuid
from typing import Optional
from fastapi import APIRouter, status, Depends, Response, Request, HTTPException, Form, UploadFile, File
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse
from fastapi_mail import MessageSchema, MessageType
from pydantic import EmailStr, TypeAdapter, ValidationError
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db, PROFILE_IMAGE_UPLOAD_URL, ARTICLE_THUMBNAIL_UPLOAD_DIR, ARTICLE_EDITOR_USER_IMG_UPLOAD_DIR, ARTICLE_EDITOR_USER_VIDEO_UPLOAD_DIR
from app.core.redis import ACCESS_COOKIE_MAX_AGE, redis_client, CODE_TTL_SECONDS
from app.core.settings import CONFIG
from app.dependencies.auth import get_optional_current_user, get_current_user
from app.models.user import User
from app.schemas.accounts import EmailRequest, VerifyRequest, UserOut, UserLostPasswordIn, UserPasswordUpdate, UserIn, UserUpdate, UserResetPasswordIn
from app.schemas.auth import TokenResponse, LoginRequest
from app.services.account_service import UserService, get_user_service
from app.services.auth_service import AuthService, get_auth_service
from app.services.token_service import AsyncTokenService, REFRESH_TOKEN_PREFIX
from app.utils.accounts import verify_password
from app.utils.auth import get_token_expiry
from app.utils.commons import refresh_expire, random_string, upload_single_image, remove_dir_with_files, old_image_remove
from app.utils.cookies import compute_cookie_attrs
from app.utils.email import AUTHCODE_EMAIL_HTML_TEMPLATE, fastapi_email
from app.utils.exc_handler import CustomErrorException
router = APIRouter()
@router.post("/authcode/request",
summary="인증 코드", description="인증 코드 생성",
responses={401: {
"description": "유효하지 않은 인증 코드 확인 시도",
"content": {"application/json": {"example": {"detail": "유효하지 않은 인증 코드입니다."}}}
}})
async def authcode_request_email(payload: EmailRequest,
current_user: Optional[User] = Depends(get_optional_current_user),
_user_service: UserService = Depends(get_user_service),
):
email = str(payload.email).lower().strip()
_type = payload.type
print("_type: ", _type)
existed_email_user = await _user_service.get_user_by_email(payload.email)
if _type == "register":
if existed_email_user:
print("신규 가입: 존재하는 이메일")
raise CustomErrorException(status_code=499, detail="존재하는 이메일입니다.")
else:
pass
elif _type == "lost":
if not existed_email_user:
print("비밀번호 분실/설정 요청중 본인 확인: 가입되어 있지 않은 이메일")
raise CustomErrorException(status_code=499, detail="가입되어 있지 않은 이메일입니다.")
else:
pass
elif _type == "email":
if existed_email_user and existed_email_user.email == current_user.email:
print("이메일 변경 요청중 본인 확인: 로그인한 사용자의 동일한 이메일")
raise CustomErrorException(status_code=499, detail="동일한 이메일입니다.")
if existed_email_user and existed_email_user.email != current_user.email: # 이런 경우는 없는 것 같은데...
print("이메일 변경 요청중 본인 확인: 다른 사용자 이메일")
raise CustomErrorException(status_code=499, detail="다른 사용자의 이메일입니다.")
else:
pass
else:
print("요청 type 없슴: Bad request")
raise CustomErrorException(status_code=410, detail="잘못된 요청입니다.")
# 비번 분실 그리고 통과한 신규가입과 이메일 변경
recent_key = f"verify_recent:{email}"
if await redis_client.exists(recent_key):
print("CustomErrorException STATUS_CODE: ", 439, "과도한 요청")
raise CustomErrorException(status_code=439, detail="과도한 요청: 잠시 후에 다시 진행해 주세요")
session_key = f"user:{email}" # Redis 해시 키 (세션 역할)
await redis_client.hset(session_key, mapping={"email": email}) # Redis에 이메일 저장 (hset)
await redis_client.expire(session_key, CODE_TTL_SECONDS)
""" Redis의 hset() 명령 자체는 **만료시간(TTL)**을 직접 지정할 수 없어요.
대신에 **키 전체(session_key)**에 대해 만료시간을 따로 expire() 또는 expireat()으로 설정해야 합니다.
즉, hset()으로 저장한 뒤에 expire()를 호출하는 방식으로 처리합니다. """
authcode = str(await random_string(7, "number"))
code_key = f"verify:{email}"
await redis_client.set(code_key, authcode, ex=CODE_TTL_SECONDS) # Redis에 인증코드 저장하고 TTL 설정 (10분)
await redis_client.set(recent_key, "1", ex=30) # 최근 요청 키(예: 30초 내 재요청 방지) - 선택
print("await redis_client.get(code_key): ", await redis_client.get(code_key))
title = None
if _type == "register":
title = "[서비스] 회원가입 인증번호"
elif _type == "lost":
title = "[서비스] 비밀번호 설정 인증번호"
elif _type == "email":
title = "[서비스] 이메일 변경 인증번호"
from jinja2 import Template
html_body = Template(AUTHCODE_EMAIL_HTML_TEMPLATE).render(code=authcode, title=title) # , verify_link=verify_link) # 직접 입력으로 교체
"""fastapi-mail 1.5.0(1.5.8버전은 recipients=[NameEmail(email=email)]을 쓰라는데
작동을 하지 않는다."""
message = MessageSchema(
subject=title,
recipients=[payload.email],
body=html_body,
subtype=MessageType.html,
)
try:
await fastapi_email.send_message(message)
except Exception as e:
await redis_client.delete(code_key) # 실패 시 Redis에 저장된 코드 제거
print("이메일 전송 실패: ", e)
raise CustomErrorException(status_code=600, detail="이메일 전송이 실패했습니다.")
return JSONResponse({"message": "인증번호를 이메일로 발송했습니다. (10분간 유효)"})
@router.post("/authcode/verify",
summary="인증 코드", description="인증 코드 확인",
responses={401: {
"description": "유효하지 않은 인증 코드 확인 시도",
"content": {"application/json": {"example": {"detail": "유효하지 않은 인증 코드입니다."}}}
}})
async def authcode_verify(payload: VerifyRequest,
current_user: Optional[User] = Depends(get_optional_current_user),
db: AsyncSession = Depends(get_db)):
""" 회원 가입시 인증 코드로 본인 확인(단순 인증하기)
### 이메일 변경 로직도 여기에 넣었다.(인증과 동시에 이메일 변경)
javascript 코드도 authVerifyForm.addEventListener('submit' 에서 같은 흐름의 로직을 유지했다."""
email = str(payload.email).lower().strip()
authcode = payload.authcode.strip()
_type = payload.type
password = payload.password
old_email = payload.old_email
code_key = f"verify:{email}"
session_key = f"user:{email}"
stored_code = await redis_client.get(code_key) # Redis에서 코드 확인
session_data = await redis_client.hgetall(session_key) # 세션에 저장된 이메일 확인
if not stored_code:
print("CustomErrorException STATUS_CODE: ", 410, "유효하지 않은 인증코드")
raise CustomErrorException(status_code=410, detail="유효하지 않은 인증코드입니다.") # 만료되었거나 존재하지 않습니다.
if stored_code != authcode:
print("CustomErrorException STATUS_CODE: ", 410, "인증코드 불일치")
raise CustomErrorException(status_code=410, detail="인증코드가 일치하지 않습니다.")
if not session_data or session_data.get("email") != email:
print("CustomErrorException STATUS_CODE: ", 410, "세션 이메일 불일치")
raise CustomErrorException(status_code=410, detail="세션 이메일이 일치하지 않습니다.")
if _type == "email":
""" email_update 과정 """
if not old_email:
raise HTTPException(status_code=400, detail="과거 이메일이 반드시 필요합니다.")
_user_service = UserService(db=db)
old_user = await _user_service.get_user_by_email(old_email)
if old_user and old_user.id == current_user.id:
password_ok = await verify_password(password, str(old_user.password))
if password_ok:
await _user_service.update_email(old_email, payload.email)
await redis_client.delete(code_key) # 검증 성공 -> 코드 삭제(한번만 사용)
return JSONResponse({"message": "이메일 변경 성공: 확인을 클릭하면, 새로운 이메일로 로그인됩니다."})
else:
raise CustomErrorException(status_code=411,
detail="비밀번호가 일치하지 않습니다.",
headers={"WWW-Authenticate": "Bearer"})
else:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="이메일 변경 권한이 없습니다."
)
await redis_client.delete(code_key) # 검증 성공 -> 코드 삭제(한번만 사용)
verified_token = str(uuid.uuid4())
verified_key = f"verified:{email}"
await redis_client.set(verified_key, verified_token, ex=CODE_TTL_SECONDS) # 10분 동안 유지
if _type == "register":
message = "이메일 인증 성공: 회원가입을 진행하세요."
else: # _type == "lost" # 비번 분실/설정
message = "이메일 인증 성공: 비밀번호 설정을 진행하세요."
return JSONResponse({"message": message,
"verified_token": verified_token})
@router.post("/register",
response_model=UserOut, )
async def register_user(username: str = Form(...),
email: str = Form(...),
token: str = Form(...),
password: str = Form(...),
password2: str = Form(...),
imagefile: UploadFile | None = File(None),
user_service: UserService = Depends(get_user_service), ):
"""이메일과 토큰은 입력값이 없어도 진입된다.
username(닉네임), 비밀번호는 js단에서 빈값 및 validation을 처리한다. 이미지는 없어도 들어온다.
이미지등의 파일을 받는 경우는 pydantic schema로 검증이 안된다. formData로 받아서 검증해야 한다.
하지만, 이미지등의 파일이 없는 경우는 json으로 받아서 pydantic schema로 검증할 수 있고, 그렇게 하는 것을 권장한다."""
print("In register_user: ", username, email, token, password, imagefile)
verified_key = f"verified:{email}"
session_key = f"user:{email}"
verified_token = await redis_client.get(verified_key)
session_data = await redis_client.hgetall(session_key)
if not verified_token: # email이 빈칸이어도 여기로 오지만, CustomError 발생시킨다.
print("CustomErrorException STATUS_CODE: ", 410, "유효하지 않은 인증토큰")
raise CustomErrorException(status_code=410, detail="유효하지 않은 인증토큰입니다.")
if verified_token != token: # token이 빈칸이어도 여기로 오지만, CustomError 발생시킨다.
print("CustomErrorException STATUS_CODE: ", 410, "인증토큰 불일치")
raise CustomErrorException(status_code=410, detail="인증토큰이 일치하지 않습니다.")
if not session_data or session_data.get("email") != email: # 들어온 이메일 값이 세션에 저장된 이메일과 다르면, CustomError 발생시킨다.
print("CustomErrorException STATUS_CODE: ", 410, "세션 이메일 불일치")
raise CustomErrorException(status_code=410, detail="세션 이메일이 일치하지 않습니다.")
try:
validated_email: EmailStr = TypeAdapter(EmailStr).validate_python(email)
user_in = UserIn(username=username, email=validated_email, password=password, confirmPassword=password2) # 템플릿 단에서 넘어온 UserIn validation
except ValidationError as e:
"""UserIn pydantic schema 검증 에러 결과의 첫번째 실제 메시지만 출력하기 위한 로직"""
first = next(iter(e.errors()), None)
if not first:
raise CustomErrorException(status_code=422, detail={"non_field": ["잘못된 입력입니다."]})
field = str((first.get("loc") or ["non_field"])[-1])
msg = first.get("msg", "잘못된 입력입니다.")
# 기존 포맷(dict of list)을 유지
raise CustomErrorException(status_code=422, detail={field: [msg]})
existed_username = await user_service.get_user_by_username(user_in.username)
if existed_username:
print("CustomErrorException STATUS_CODE: ", 499, "존재하는 닉네임")
raise CustomErrorException(status_code=499, detail="이미 사용하는 닉네임입니다.")
existed_user_email = await user_service.get_user_by_email(user_in.email)
if existed_user_email:
print("CustomErrorException STATUS_CODE: ", 499, "존재하는 이메일")
raise CustomErrorException(status_code=499, detail="이미 사용하고 있는 이메일입니다.")
created_user = await user_service.create_user(user_in)
img_path = None
if imagefile:
img_path = await upload_single_image(PROFILE_IMAGE_UPLOAD_URL, created_user, imagefile)
created_user = await user_service.user_image_update(created_user.id, img_path)
await redis_client.delete(verified_key)
await redis_client.delete(session_key)
return JSONResponse(status_code=201, content=jsonable_encoder(created_user))
@router.patch("/lost/password/resetting", response_model=UserOut,
summary="회원 비밀번호 분실/설정", description="특정 회원의 비밀번호를 분실하여 재설정합니다.",
responses={404: {
"description": "회원 비밀번호 설정 실패",
"content": {"application/json": {"example": {"detail": "회원을 찾을 수 없습니다."}}},
403: {
"description": "회원 비밀번호 수정 권한 없슴",
"content": {"application/json": {"example": {"detail": "접근 권한이 없습니다."}}}
}
}})
async def lost_password_resetting(lost_password_in: UserLostPasswordIn,
_user_service: UserService = Depends(get_user_service)):
email = str(lost_password_in.email).lower().strip()
token = lost_password_in.token
print("token: ", token)
newpassword = lost_password_in.newpassword
verified_key = f"verified:{email}"
session_key = f"user:{email}"
verified_token = await redis_client.get(verified_key)
print("verified_token: ", verified_token)
session_data = await redis_client.hgetall(session_key)
if not verified_token: # email이 빈칸이어도 여기로 오지만, CustomError 발생시킨다.
raise CustomErrorException(status_code=410, detail="유효하지 않은 인증토큰입니다.")
if verified_token != token: # token이 빈칸이어도 여기로 오지만, CustomError 발생시킨다.
raise CustomErrorException(status_code=410, detail="인증토큰이 일치하지 않습니다.")
if not session_data or session_data.get("email") != email: # 들어온 이메일 값이 세션에 저장된 이메일과 다르면, CustomError 발생시킨다.
raise CustomErrorException(status_code=410, detail="세션 이메일이 일치하지 않습니다.")
user = await _user_service.get_user_by_email(lost_password_in.email)
if user:
_password_update = UserPasswordUpdate(password=newpassword)
await _user_service.update_password(user.id, _password_update)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="회원을 찾을 수 없습니다."
)
await redis_client.delete(verified_key)
await redis_client.delete(session_key)
return user
# return JSONResponse({"message": "비밀번호 설정 성공: 재설정된 비밀번호로 로그인됩니다."})
@router.post(
"/login",
response_model=TokenResponse,
summary="사용자 로그인",
description="사용자 로그인 후 JWT 토큰을 발급합니다.",
responses={
401: {
"description": "인증 실패",
"content": {"application/json": {"example": {"detail": "인증 실패"}}}
}})
async def login(response: Response, request: Request,
login_data: LoginRequest,
auth_service: AuthService = Depends(get_auth_service)):
user = await auth_service.authenticate_user(login_data)
if not user:
from app.utils.exc_handler import CustomErrorException
raise CustomErrorException(status_code=411,
detail="인증에 실패했습니다.",
headers={"WWW-Authenticate": "Bearer"})
token_data = await auth_service.create_user_token(user)
_access_token = token_data.get(CONFIG.ACCESS_COOKIE_NAME)
_refresh_token = token_data.get(CONFIG.REFRESH_COOKIE_NAME)
# 요청 정보로 HTTPS 여부 판별 (프록시가 있다면 x-forwarded-proto 우선)
attrs = compute_cookie_attrs(request, cross_site=False)
print("1. attrs['secure']: ", attrs["secure"])
print("2. attrs['samesite']: ", attrs["samesite"])
_REFRESH_COOKIE_EXPIRE = refresh_expire()
response.set_cookie(
key=CONFIG.ACCESS_COOKIE_NAME,
value=_access_token,
httponly=True,
secure=attrs["secure"], # HTTPS라면 True
samesite=attrs["samesite"],
max_age=ACCESS_COOKIE_MAX_AGE,
)
response.set_cookie(
key=CONFIG.REFRESH_COOKIE_NAME,
value=_refresh_token,
httponly=True, # JavaScript에서 쿠키에 접근 불가능하도록 설정
secure=attrs["secure"], # HTTPS 환경에서만 쿠키 전송
samesite=attrs["samesite"], # CSRF 공격 방지
expires=_REFRESH_COOKIE_EXPIRE
)
return token_data
@router.patch("/account/update/{user_id}", response_model=UserOut,
summary="회원 정보 수정", description="특정 회원의 정보를 수정합니다.",
responses={404: {
"description": "회원 정보 수정 실패",
"content": {"application/json": {"example": {"detail": "회원을 찾을 수 없습니다."}}},
403: {
"description": "회원 정보 수정 권한 없슴",
"content": {"application/json": {"example": {"detail": "접근 권한이 없습니다."}}}
}
}})
async def update_user(user_id: int,
username: str | None = Form(None),
email: EmailStr | None = Form(None), # 별도로 변경 라우터 만듦(여기서는 None으로 들어옴)
password: str | None = Form(None),
imagefile: UploadFile | None = File(None),
current_user: User = Depends(get_current_user),
_user_service: UserService = Depends(get_user_service)):
''' username, 프로필 이미지 변경용
email 변경은 별도로 이메일 인증코드 방식으로 구성
password은 변경 권한으로 사용함 '''
from app.utils.exc_handler import CustomErrorException
user = await _user_service.get_user_by_id(user_id)
if user != current_user:
raise CustomErrorException(status_code=411, detail="접근 권한이 없습니다.")
if username:
existed_username_user = await _user_service.get_user_by_username(username)
if username == user.username:
raise CustomErrorException(status_code=499, detail="기존 닉네임과 동일합니다.")
if existed_username_user and existed_username_user.username != user.username:
raise CustomErrorException(status_code=499, detail="이미 사용하는 닉네임입니다.")
if email: # 여기서는 None으로 들어옴(우선 코드는 남겨두었다.)
existed_email_user = await _user_service.get_user_by_email(email)
if existed_email_user and existed_email_user.email != user.email:
raise CustomErrorException(status_code=499, detail="이미 가입되어 있는 이메일입니다.")
try:
user_update = UserUpdate(username=username, email=email)
password_ok = await verify_password(password, str(user.password))
if password_ok:
updated_user = await _user_service.update_user(user_id, user_update)
if imagefile is not None:
filename_only, ext = os.path.splitext(imagefile.filename)
if len(imagefile.filename.strip()) > 0 and len(filename_only) > 0:
if user.img_path:
await old_image_remove(imagefile.filename, user.img_path)
img_path = await upload_single_image(PROFILE_IMAGE_UPLOAD_URL, updated_user, imagefile)
updated_user = await _user_service.user_image_update(updated_user.id, img_path)
else:
img_path = user.img_path # user.img_path인 경우도 적용된다.
updated_user = await _user_service.user_image_update(updated_user.id, img_path)
else:
raise CustomErrorException(status_code=411, detail="비밀번호가 일치하지 않습니다.")
if not updated_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="회원을 찾을 수 없습니다."
)
return updated_user
except ValidationError as e:
print("except ValidationError as e:", e.errors())
print("except ValidationError as e.errors():", e.errors()[0]["loc"][0])
# 요청 본문에 대한 자동 422 변환이 아닌, 수동으로 422로 변환해 주는 것이 좋습니다.
if email is not None and e.errors()[0]["loc"][0] == "email":
raise CustomErrorException(status_code=432, detail="이메일 형식 부적합")
elif username is not None and e.errors()[0]["loc"][0] == "username":
raise CustomErrorException(status_code=432, detail="닉네임 정책 위반")
@router.patch("/account/password/update/{user_id}", response_model=UserOut,
summary="회원 비밀번호 수정", description="특정 회원의 비밀번호를 수정합니다.",
responses={404: {
"description": "회원 비밀번호 수정 실패",
"content": {"application/json": {"example": {"detail": "회원을 찾을 수 없습니다."}}},
403: {
"description": "회원 비밀번호 수정 권한 없슴",
"content": {"application/json": {"example": {"detail": "접근 권한이 없습니다."}}}
}
}})
async def password_update(password_in: UserResetPasswordIn,
current_user: User = Depends(get_current_user),
_user_service: UserService = Depends(get_user_service)):
user = await _user_service.get_user_by_id(password_in.user_id)
if user != current_user:
raise CustomErrorException(status_code=411, detail="접근 권한이 없습니다.")
if user:
_password_update = UserPasswordUpdate(password=password_in.newpassword)
password_ok = await verify_password(password_in.password, str(user.password))
if password_ok:
await _user_service.update_password(password_in.user_id, _password_update)
else:
raise CustomErrorException(status_code=411, detail="기존의 비밀번호와 일치하지 않습니다.")
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="회원을 찾을 수 없습니다."
)
return user
@router.patch("/account/email/update/{user_id}")
async def email_update(user_id: int):
"""email_update 과정은
@router.post("/authcode/verify")
async def authcode_verify
authcode_verify 에 담았다."""
pass
@router.post(
"/logout",
summary="사용자 로그아웃",
description="로그아웃 후 JWT 토큰을 삭제합니다.",
responses={
200: {
"description": "로그아웃 성공",
"content": {"application/json": {"example": {"message": "로그아웃 되었습니다."}}}
}})
async def logout(response: Response,
request: Request):
access_token = request.cookies.get(CONFIG.ACCESS_COOKIE_NAME)
refresh_token = request.cookies.get(CONFIG.REFRESH_COOKIE_NAME)
# 쿠키 삭제
response.delete_cookie(key=CONFIG.ACCESS_COOKIE_NAME, path="/")
response.delete_cookie(key=CONFIG.REFRESH_COOKIE_NAME, path="/")
# 블랙리스트 처리
if access_token:
expiry = get_token_expiry(access_token)
await AsyncTokenService.blacklist_token(access_token, expiry)
if refresh_token:
expiry = get_token_expiry(refresh_token)
await AsyncTokenService.blacklist_token(refresh_token, expiry)
print("로그아웃")
return {"message": "로그아웃되었습니다."}
@router.delete("/account/delete/{user_id}",
summary="회원 탈퇴",
description="특정 회원을 탈퇴 시킵니다.",
responses={200: {
"description": "회원 탈퇴 성공",
"content": {"application/json": {"example": {"detail": "회원의 탈퇴가 성공적으로 이루어 졌습니다."}}},
404: {
"description": "회원 탈퇴 실패",
"content": {"application/json": {"example": {"detail": "회원을 찾을 수 없습니다."}}}
}
}})
async def delete_user(user_id: int,
request: Request,
current_user: User = Depends(get_current_user),
_user_service: UserService = Depends(get_user_service)):
_user = await _user_service.get_user_by_id(user_id)
if _user != current_user:
raise CustomErrorException(status_code=411, detail="접근 권한이 없습니다.")
if _user is False:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="회원을 찾을 수 없습니다."
)
# article(게시글)의 썸네일, quill 내용의 이미지/동영상 삭제
# article(게시글)의 내용 자체는 모델에서 삭제되도록 ORM정의해놓음 """cascade="all, delete-orphan","""
article_thumb_dir = f'{ARTICLE_THUMBNAIL_UPLOAD_DIR}' + '/' + f'{user_id}'
content_img_dir = f'{ARTICLE_EDITOR_USER_IMG_UPLOAD_DIR}' + '/' + f'{user_id}'
content_video_dir = f'{ARTICLE_EDITOR_USER_VIDEO_UPLOAD_DIR}' + '/' + f'{user_id}'
await remove_dir_with_files(article_thumb_dir)
await remove_dir_with_files(content_img_dir)
await remove_dir_with_files(content_video_dir)
user_thumb_dir = f'{PROFILE_IMAGE_UPLOAD_URL}' + '/' + f'{user_id}'
await remove_dir_with_files(user_thumb_dir)
"""프로필 이미지는 삭제한다. 하지만,
게시글의 author_id는 남겨두고, 해당 회원이 작성했던 게시글은 비활성화 하는 것으로 처리하자."""
print("delete_user user_id:", user_id)
await _user_service.delete_user(user_id)
#############################
access_token = request.cookies.get(CONFIG.ACCESS_COOKIE_NAME)
refresh_token = request.cookies.get(CONFIG.REFRESH_COOKIE_NAME)
resp = JSONResponse(
status_code=200,
content={"detail": "회원의 탈퇴가 성공적으로 이루어졌습니다."}
)
domain = request.url.hostname
resp.delete_cookie(
key=CONFIG.ACCESS_COOKIE_NAME,
path="/",
domain=domain, # 필요 시 명시적으로 지정
httponly=True
)
resp.delete_cookie(
key=CONFIG.REFRESH_COOKIE_NAME,
path="/",
domain=domain,
httponly=True
)
if access_token:
access_exp = get_token_expiry(access_token)
await AsyncTokenService.blacklist_token(access_token, access_exp)
if refresh_token:
refresh_exp = get_token_expiry(refresh_token)
await AsyncTokenService.blacklist_token(refresh_token, refresh_exp)
# 만약 Redis에 별도 키로 저장했다면 삭제:
await redis_client.delete(f"{REFRESH_TOKEN_PREFIX}{user_id}")
request.state.skip_set_cookie = True
print("Final headers:", resp.headers.getlist("set-cookie"))
print("쿠키 삭제 확인 request.cookies.get(ACCESS_COOKIE_NAME): ", request.cookies.get(CONFIG.ACCESS_COOKIE_NAME))
print("쿠키 삭제 확인 request.cookies.get(REFRESH_COOKIE_NAME): ", request.cookies.get(CONFIG.REFRESH_COOKIE_NAME))
print("response.raw_headers: ", resp.raw_headers)
return resp
# Project_folder/app/apis/auth.py
from fastapi import APIRouter, Request
router = APIRouter()
@router.get("/csrf_token", summary="CSRF_TOKEN End Point",)
def csrf_token_endpoint(request: Request):
# middleware sets cookie 'csrf_token' and possibly request.state.csrf_token
token = request.cookies.get("csrf_token")
if not token:
token = getattr(request.state, "csrf_token", "")
return {"csrf_token": token}
# Project_folder/app/core/database.py
import os
from datetime import datetime, timezone
from typing import AsyncGenerator
from sqlalchemy import Integer, DateTime
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.orm import declarative_base, Mapped, mapped_column
from app.core.settings import CONFIG, MEDIA_DIR
PROFILE_IMAGE_UPLOAD_URL = os.path.join(MEDIA_DIR, CONFIG.PROFILE_IMAGE_URL)
ARTICLE_THUMBNAIL_UPLOAD_DIR = os.path.join(MEDIA_DIR, CONFIG.ARTICLE_THUMBNAIL_DIR)
ARTICLE_EDITOR_USER_IMG_UPLOAD_DIR = os.path.join(MEDIA_DIR, CONFIG.ARTICLE_EDITOR_USER_IMG_DIR)
ARTICLE_EDITOR_USER_VIDEO_UPLOAD_DIR = os.path.join(MEDIA_DIR, CONFIG.ARTICLE_EDITOR_USER_VIDEO_DIR)
DATABASE_URL = f"{CONFIG.DB_TYPE}+{CONFIG.DB_DRIVER}://{CONFIG.DB_USER}:{CONFIG.DB_PASSWORD}@{CONFIG.DB_HOST}:{CONFIG.DB_PORT}/{CONFIG.DB_NAME}?charset=utf8"
print("2. DATABASE_URL:", DATABASE_URL.split("//")[0])
ASYNC_ENGINE = create_async_engine(DATABASE_URL,
echo=CONFIG.DEBUG,
future=True,
pool_size=10, max_overflow=0, pool_recycle=300, # 5분마다 연결 재활용
# encoding="utf-8"
)
# 세션 로컬 클래스 생성
AsyncSessionLocal = async_sessionmaker(
ASYNC_ENGINE,
class_=AsyncSession, # add
expire_on_commit=False,
autocommit=False,
autoflush=False,
# poolclass=NullPool, # SQLite에서는 NullPool 권장
)
"""
autocommit=False 부분은 주의하자.
autocommit=False로 설정하면 데이터를 변경했을때 commit 이라는 사인을 주어야만 실제 저장이 된다.
만약 autocommit=True로 설정할 경우에는 commit이라는 사인이 없어도 즉시 데이터베이스에 변경사항이 적용된다.
그리고 autocommit=False인 경우에는 데이터를 잘못 저장했을 경우 rollback 사인으로 되돌리는 것이 가능하지만
autocommit=True인 경우에는 commit이 필요없는 것처럼 rollback도 동작하지 않는다는 점에 주의해야 한다.
"""
Base = declarative_base() # Base 클래스 (모든 모델이 상속)
class BaseModel(Base):
__abstract__ = True
id: Mapped[int] = mapped_column(Integer, primary_key=True)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True),
nullable=False,
default=lambda: datetime.now(timezone.utc))
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True),
nullable=False,
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc))
async def get_db() -> AsyncGenerator[AsyncSession, None]:
session: AsyncSession = AsyncSessionLocal()
print(f"[get_session] new session: {id(session)}")
try:
yield session
except Exception as e:
print(f"Session rollback triggered due to exception: {e}")
await session.rollback()
raise
finally:
print(f"[get_session] close session: {id(session)}")
await session.close()
# Project_folder/app/core/inits.py
from contextlib import asynccontextmanager
import redis
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from fastapi_csrf_jinja.middleware import FastAPICSRFJinjaMiddleware
from app.apis import accounts as apis_accounts
from app.apis import auth as apis_auth
from app.core.database import ASYNC_ENGINE
from app.core.redis import redis_client
from app.core.settings import STATIC_DIR, MEDIA_DIR, CONFIG, templates
from app.utils.commons import to_kst
from app.utils.middleware import AccessTokenSetCookieMiddleware
from app.views import index
from app.views import accounts as views_accounts
@asynccontextmanager
async def lifespan(app: FastAPI):
print("Initializing database......")
# FastAPI 인스턴스 기동시 필요한 작업 수행.
try:
await redis_client.ping() # Redis 연결 테스트
print("Redis connection established......")
except redis.exceptions.ConnectionError:
print("Failed to connect to Redis......")
print("Starting up...")
yield
# FastAPI 인스턴스 종료시 필요한 작업 수행
await redis_client.aclose()
print("Redis connection closed......")
print("Shutting down...")
await ASYNC_ENGINE.dispose()
def including_middleware(app):
app.add_middleware(CORSMiddleware,
allow_origins=[
"http://localhost:5173", # 프론트엔드 origin (예시)
"http://127.0.0.1:5500", # 필요시 추가
"http://localhost:8000", # 백엔드가 템플릿 제공하는 경우
], # 실제 프론트 주소
allow_methods=["*"],
allow_headers=["*"],
allow_credentials=True,
max_age=-1)
app.add_middleware(FastAPICSRFJinjaMiddleware,
secret=CONFIG.SECRET_KEY,
cookie_name="csrf_token",
header_name="X-CSRF-Token",)
# swagger를 CSRF_TOKEN적용에서 제외시키는 방법: 라이브러리에서 제공하는 예외 옵션 이름에 맞춰 사용하세요.
# (예: exclude_paths, exempt_paths, exempt_urls 등)
# 이 프로젝트는 swagger로 커스터마이징해서 csrf_token을 적용시켰다.
# exempt_urls=["/swagger/custom/docs", "/swagger/custom/redoc", "/swagger/custom/openapi.json"])
""" AccessTokenSetCookieMiddleware: access_token이 만료되면,
get_current_user 리프레시로 폴백하면서 액세스토큰을 만들때 가로채서 쿠키에 심는다."""
app.add_middleware(AccessTokenSetCookieMiddleware)
def including_router(app):
app.include_router(index.router, prefix="", tags=["IndexViews"])
app.include_router(apis_accounts.router, prefix="/apis/accounts", tags=["AccountsAPI"])
app.include_router(apis_auth.router, prefix="/apis/auth", tags=["Auth"])
app.include_router(views_accounts.router, prefix="/views/accounts", tags=["AccountsViews"])
def initialize_app():
app = FastAPI(title=CONFIG.APP_NAME,
version=CONFIG.APP_VERSION,
description=CONFIG.APP_DESCRIPTION,
lifespan=lifespan,
docs_url=None, redoc_url=None, openapi_url="/swagger/custom/openapi.json",
)
app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
app.mount("/media", StaticFiles(directory=MEDIA_DIR), name="media")
templates.env.globals["STATIC_URL"] = "/static"
templates.env.globals["MEDIA_URL"] = "/media"
templates.env.filters["to_kst"] = to_kst
including_middleware(app)
including_router(app)
return app
# Project_folder/app/core/redis.py
from redis.asyncio import Redis, ConnectionPool
from app.core.settings import CONFIG
# # Connection Pool 기반 access ##############################
""" # AI Chat # 비동기적으로 ConnectionPool 적용
비동기 Redis 클라이언트와 함께 쓰려면 동기용 ConnectionPool이 아니라 asyncio 전용 풀을 써야 합니다.
즉, redis.ConnectionPool이 아니라 redis.asyncio.ConnectionPool을 사용해야 합니다.
from redis.asyncio import Redis, ConnectionPool
예시 1) asyncio용 ConnectionPool을 직접 생성해서 사용
"""
'''마땅히 설정할 곳이 없어서... 서버 구동시작시 여기를 지나가므로 전역변수처럼 사용'''
ACCESS_COOKIE_MAX_AGE = CONFIG.ACCESS_TOKEN_EXPIRE * 60 # 초 1800 : 30분
CODE_TTL_SECONDS = 10 * 60 # 10분
host = CONFIG.REDIS_HOST if CONFIG.APP_ENV == "production" else "localhost"
port = CONFIG.REDIS_PORT if CONFIG.APP_ENV == "production" else 6379
password = CONFIG.REDIS_PASSWORD if CONFIG.APP_ENV == "production" else None
db = CONFIG.REDIS_DB if CONFIG.APP_ENV == "production" else 0
redis_pool = ConnectionPool(
host=host,
port=port,
db=db,
password=password,
decode_responses=True, # 문자열 응답을 자동으로 디코딩
max_connections=10)
redis_client = Redis(connection_pool=redis_pool)
# Project_folder/app/core/settings.py
import os
from functools import lru_cache
from pathlib import Path
from typing import Optional, List
from fastapi.templating import Jinja2Templates
from fastapi_csrf_jinja.jinja_processor import csrf_token_processor
from pydantic import Field, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
PRESENT_DIR = os.path.dirname(os.path.abspath(__file__))
APP_DIR = Path(__file__).resolve().parent.parent
ROOT_DIR = Path(__file__).resolve().parent.parent.parent ## root폴더
TEMPLATE_DIR = os.path.join(APP_DIR, 'templates')
STATIC_DIR = os.path.join(APP_DIR, 'static')
MEDIA_DIR = os.path.join(APP_DIR, 'media')
templates = Jinja2Templates(
directory=TEMPLATE_DIR,
context_processors=[csrf_token_processor("csrf_token", "X-CSRF-Token")]
)
class Settings(BaseSettings):
"""여기에서 값이 비어있는 것은 .env 파일에서 채워진다.
그 값은 override 되지 않는다. 흐름상 그럴것 같기는 한데...???
최종적으로 .env에 설정된 값으로 채워져 버리는 것인것 같다."""
APP_ENV: str = "development"
APP_NAME: str = "FastAPI_Jinja"
APP_VERSION: str = "0.0.0"
APP_DESCRIPTION: str = ("FastAPI_Jinja/Javascript을 이용한 프로젝트 개발: \n"
"Jinja Template를 이용하여 Server Side Rendering을 적용하고, \n"
"데이터 입력과정을 pydantic schema적용하고 POST 메서드에서 vanilla javascript를 이용하여 개발 진행")
DEBUG: bool = False
# 기본값은 두지 않고 .env에서 읽히도록 둡니다.
SECRET_KEY: Optional[str] = None
ALGORITHM: str
DB_TYPE: str
DB_DRIVER: str
# 이메일/인증코드
SMTP_FROM: str # = os.getenv("SMTP_FROM", SMTP_USERNAME)
SMTP_USERNAME: str
SMTP_PASSWORD: str
SMTP_PORT: int # = int(os.getenv("SMTP_PORT", 587))
SMTP_HOST: str # = os.getenv("SMTP_HOST", "smtp.gmail.com")
# 액세스/리프페시 토큰
ACCESS_COOKIE_NAME: str
REFRESH_COOKIE_NAME: str
NEW_ACCESS_COOKIE_NAME: str
NEW_REFRESH_COOKIE_NAME: str
ACCESS_TOKEN_EXPIRE: int
REFRESH_TOKEN_EXPIRE: int
# Media Store Path
PROFILE_IMAGE_URL: str
ARTICLE_THUMBNAIL_DIR: str
ARTICLE_EDITOR_USER_IMG_DIR: str
ARTICLE_EDITOR_USER_VIDEO_DIR: str
ORIGINS: List[str] = Field(default_factory=list)
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
extra="ignore",
case_sensitive=False,
)
class DevSettings(Settings):
DEBUG: bool = Field(..., validation_alias="DEBUG_TRUE")
ORIGINS: str = Field(..., validation_alias="DEV_ORIGINS")
DB_NAME: str = Field(..., validation_alias="DEV_DB_NAME")
DB_HOST: str = Field(..., validation_alias="DEV_DB_HOST")
DB_PORT: str = Field(..., validation_alias="DEV_DB_PORT")
DB_USER: str = Field(..., validation_alias="DEV_DB_USER")
DB_PASSWORD: str = Field(..., validation_alias="DEV_DB_PASSWORD")
class ProdSettings(Settings):
APP_NAME: str = Field(..., validation_alias="PROD_APP_NAME")
APP_VERSION: str = Field(..., validation_alias="PROD_APP_VERSION")
APP_DESCRIPTION: str = Field(..., validation_alias="PROD_APP_DESCRIPTION")
ORIGINS: str = Field(..., validation_alias="PROD_ORIGINS")
DB_NAME: str = Field(..., validation_alias="PROD_DB_NAME")
DB_HOST: str = Field(..., validation_alias="PROD_DB_HOST")
DB_PORT: str = Field(..., validation_alias="PROD_DB_PORT")
DB_USER: str = Field(..., validation_alias="PROD_DB_USER")
DB_PASSWORD: str = Field(..., validation_alias="PROD_DB_PASSWORD")
REDIS_HOST: str = Field(..., validation_alias="REDIS_HOST")
REDIS_PORT: str = Field(..., validation_alias="REDIS_PORT")
REDIS_DB: str = Field(..., validation_alias="REDIS_DB")
REDIS_PASSWORD: str = Field(..., validation_alias="REDIS_PASSWORD")
# 운영에서는 SECRET_KEY 필수
@model_validator(mode="after")
def ensure_secret_key(self):
if not self.SECRET_KEY or len(self.SECRET_KEY) < 16:
raise ValueError("In production, SECRET_KEY must be set and sufficiently long.")
return self
@lru_cache(maxsize=1)
def get_settings() -> Settings:
print("1. setting start... Only One")
# model_config의 env_file 설정만으로도 충분하지만, 아래 호출이 있어도 문제는 없습니다.
try:
from dotenv import load_dotenv
load_dotenv(".env", override=False, encoding="utf-8")
except Exception as e:
print(f"load_dotenv error: {e}")
# python-dotenv 미설치 등인 경우에도 설정 로딩은 pydantic-settings가 처리하므로 무시 가능
pass
app_env = os.getenv("APP_ENV", "development").strip().lower()
if app_env == "production":
return ProdSettings()
print("2. settings...APP_ENV: ", app_env)
return DevSettings()
CONFIG = get_settings()
# Project_folder/app/dependencies/auth.py
from typing import Optional, Set
from fastapi import Depends, HTTPException, Request, status, Response, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose.exceptions import ExpiredSignatureError
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.core.redis import ACCESS_COOKIE_MAX_AGE
from app.core.settings import CONFIG
from app.models.user import User
from app.services.auth_service import AuthService, get_auth_service
from app.services.token_service import AsyncTokenService
from app.utils.auth import payload_to_user, get_token_expiry, verify_token
from app.utils.cookies import compute_cookie_attrs
# 헤더는 선택적으로만 받도록 설정 (없어도 에러 발생 X)
bearer_scheme = HTTPBearer(auto_error=False)
"""
토큰에서 현재 사용자 정보를 가져오는 의존성 함수
"""
async def get_current_user(
request: Request, response: Response,
credentials: Optional[HTTPAuthorizationCredentials] = Security(bearer_scheme),
db: AsyncSession = Depends(get_db),
):
# 1) 헤더
auth = request.headers.get("authorization")
print("1. get_current_user 시작 ::: auth::::::: ", auth)
access_token = auth.split(" ", 1)[1].strip() if auth and auth.lower().startswith("bearer ") else None
print("2. get_current_user 시작 ::: access_token::::::: ", access_token)
# 2) 쿠키 폴백
if not access_token:
access_token = request.cookies.get(CONFIG.ACCESS_COOKIE_NAME)
print("3. get_current_user 시작 ::: request.cookies.get access_token::::::: ", access_token)
if access_token:
try:
user = await payload_to_user(access_token, db)
print("get_current_user 끝 if access_token: user::::::: ", user)
return user
except ExpiredSignatureError:
pass # 3) 리프레시 시도
# 3) 리프레시 폴백 (공통 함수 호출)
refresh_token = request.cookies.get(CONFIG.REFRESH_COOKIE_NAME)
print("4. refresh_access_token_from_cookie: refresh_token: ", refresh_token)
if not refresh_token:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")
auth_service = AuthService(db=db)
token_payload = await auth_service.refresh_access_token(refresh_token)
if not token_payload:
# refresh 토큰이 유효하지 않거나 만료된 경우
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Refresh Token is invalid or expired",
)
new_access = token_payload.get(CONFIG.ACCESS_COOKIE_NAME)
if not new_access:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to issue access token")
# request.state에 보관 (뷰에서 꺼내서 실제 TemplateResponse에 심을 수 있게)
request.state.new_access = new_access
attrs = compute_cookie_attrs(request, cross_site=False)
print("1. attrs['secure']: ", attrs["secure"])
print("2. attrs['samesite']: ", attrs["samesite"])
response.set_cookie(
key=CONFIG.ACCESS_COOKIE_NAME,
value=new_access,
httponly=True,
secure=attrs["secure"], # HTTPS라면 True
samesite=attrs["samesite"],
path="/",
max_age=ACCESS_COOKIE_MAX_AGE,
)
user = await payload_to_user(new_access, db)
print("get_current_user 끝 refresh_access_token) 리프레시 폴백: user::::::: ", user)
return user
async def get_optional_current_user(request: Request, response: Response,
db: AsyncSession = Depends(get_db)) -> Optional[User]:
"""
인증 토큰이 없거나 유효하지 않은 경우 None을 반환하고, 다른 예외는 그대로 전달합니다.
AI Chat:
- 익명 접근을 허용하는 엔드포인트에서는 Depends(get_optional_current_user)를 사용하세요.
이것을 적용하고 if current_user is None or current_user != user 로 분기하여 "Not authorized: 접근권한이 없습니다."로 raise 날려도 된다.
그러면, Depends(get_current_user) 주입해서, "Not authenticated: 로그인하지 않았습니다."를 raise 날리는 것과 효과가 같다.
효과는 같지만, 엄밀한 의미에서는 다르다.
- 인증이 반드시 필요한 엔드포인트는 기존처럼 Depends(get_current_user)를 유지하면 됩니다.
"""
try:
# 핵심: get_current_user를 직접 호출하되, db를 명시적으로 전달
return await get_current_user(request=request, response=response, db=db)
except HTTPException as e:
if e.status_code in (status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN):
return None
raise
# def allow_usernames(*allowed_names: str):
def allow_usernames(allowed_names: list):
""" AI Chat이 알려준 방식임
특정 username만 접근을 허용하는 FastAPI 의존성 팩토리.
ADMINS = ["admin", "owner"]
사용 예) Depends(allow_usernames(ADMINS))
"""
allowed: Set[str] = {n.strip() for n in allowed_names if n and n.strip()}
if not allowed:
raise ValueError("allow_usernames requires at least one non-empty username")
async def dependency(user = Depends(get_current_user)):
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
)
username = getattr(user, "username", None)
if not username or username not in allowed:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You Don't Have Permission to Access This Resource.",
)
return user # 필요 시 엔드포인트에서 user를 그대로 사용 가능
return dependency
# Project_folder/app/models/user.py
from typing import Optional
from sqlalchemy import String, Boolean
from sqlalchemy.orm import Mapped, mapped_column
from app.core.database import BaseModel
class User(BaseModel):
__tablename__ = "users"
# String은 제한 글자수를 지정해야 한다.
username: Mapped[str] = mapped_column(String(20), unique=True, nullable=False)
email: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
password: Mapped[str] = mapped_column(String(255), nullable=False)
img_path: Mapped[Optional[str]] = mapped_column(String(200), nullable=True)
is_admin: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
"""
- unique 와 index 중복
- unique=True만으로도 고유 인덱스가 생성됩니다. index=True는 중복이므로 제거해도 됩니다(권장).
- 비밀번호 길이
- password는 해시를 저장하므로 길이를 넉넉히 두는 것이 안전합니다(예: String(255)). 해시 종류에 따라 100자를 넘길 수 있습니다.
- 기본 키/타입
- id는 Integer(primary_key=True)면 충분합니다. 아주 큰 규모를 예상하면 BigInteger도 고려할 수 있습니다.
"""
# Project_folder/app/schemas/accounts.py
from datetime import datetime
from pydantic import BaseModel, field_validator, EmailStr, ConfigDict, Field
from pydantic_core import PydanticCustomError
from pydantic_core.core_schema import FieldValidationInfo
from app.utils.accounts import optimal_password
from app.utils.commons import strict_email
class UserBase(BaseModel):
username: str
email: EmailStr
@field_validator('username')
def validate_username_min_length(cls, v: str):
if len(v) < 3:
raise PydanticCustomError('value_error', '닉네임은 최소 3글자이상 이어야 합니다.')
return v
@field_validator('email', mode='before')
def validate_email_strict(cls, v):
email = strict_email(v)
return email
class UserIn(UserBase):
password: str
confirmPassword: str
@field_validator('username', 'password', 'confirmPassword', 'email')
def not_empty(cls, v):
if not v or not v.strip():
# raise ValueError('빈 값은 허용되지 않습니다.')
# 접두사 없이 메시지 그대로 내려감
raise PydanticCustomError('empty_value', '빈 값은 허용되지 않습니다.')
return v
@field_validator("password")
def password_validator(cls, password: str | None) -> str | None:
if not password:
return password
optimal_password(password)
return password
@field_validator('confirmPassword')
def passwords_match(cls, v, info: FieldValidationInfo):
if 'password' in info.data and v != info.data['password']:
# raise ValueError('비밀번호가 일치하지 않습니다')
# 접두사 없이 메시지 그대로 내려감
raise PydanticCustomError('empty_value', '비밀번호가 일치하지 않습니다.')
return v
class UserOut(UserBase):
id: int
created_at: datetime
model_config = ConfigDict(from_attributes=True)
class UserUpdate(BaseModel):
username: str | None = Field(None)
email: EmailStr | None = None
@field_validator('username')
def validate_username_min_length(cls, v: str):
if v is None:
print("username is None")
return None
if isinstance(v, str) and v.strip() == '':
return None
if len(v) < 3:
raise PydanticCustomError('value_error', '닉네임은 최소 3글자이상 이어야 합니다.')
return v
@field_validator('email', mode='before')
def validate_email_none(cls, v):
if v is None:
print("email is None")
return None
if isinstance(v, str) and v.strip() == '':
return None
email = strict_email(v)
return email
class UserResetPasswordIn(BaseModel):
user_id: int
password: str
newpassword: str
confirmPassword: str
@field_validator('password', 'newpassword', 'confirmPassword')
def not_empty(cls, v):
if not v or not v.strip():
# raise ValueError('빈 값은 허용되지 않습니다.')
# 접두사 없이 메시지 그대로 내려감
raise PydanticCustomError('empty_value', '빈 값은 허용되지 않습니다.')
return v
@field_validator("newpassword")
def password_validator(cls, v: str | None) -> str | None:
if not v:
return v
optimal_password(v)
return v
@field_validator('confirmPassword')
def passwords_match(cls, v, info: FieldValidationInfo):
if 'newpassword' in info.data and v != info.data['newpassword']:
# raise ValueError('비밀번호가 일치하지 않습니다')
# 접두사 없이 메시지 그대로 내려감
raise PydanticCustomError('empty_value', '비밀번호가 일치하지 않습니다.')
return v
class UserLostPasswordIn(BaseModel):
email: EmailStr
token: str
newpassword: str
confirmPassword: str
@field_validator('email', mode='before')
def validate_email_strict(cls, v):
email = strict_email(v)
return email
@field_validator('token', 'newpassword', 'confirmPassword')
def not_empty(cls, v):
if not v or not v.strip():
raise PydanticCustomError('empty_value', '빈 값은 허용되지 않습니다.')
return v
@field_validator("newpassword")
def password_validator(cls, v: str | None) -> str | None:
if not v:
return v
optimal_password(v)
return v
@field_validator('confirmPassword')
def passwords_match(cls, v, info: FieldValidationInfo):
if 'newpassword' in info.data and v != info.data['newpassword']:
# raise ValueError('비밀번호가 일치하지 않습니다')
# 접두사 없이 메시지 그대로 내려감
raise PydanticCustomError('empty_value', '비밀번호가 일치하지 않습니다.')
return v
class UserPasswordUpdate(BaseModel):
password: str | None = Field(None)
@field_validator("password")
def password_validator(cls, password: str | None) -> str | None:
if not password:
return password
optimal_password(password)
return password
class EmailRequest(BaseModel):
email: EmailStr
type:str
@field_validator('email', mode='before')
def validate_email_strict(cls, v):
email = strict_email(v)
return email
class VerifyRequest(BaseModel):
type: str | None = None
old_email: EmailStr | None = None
email: EmailStr | None = None # ← 이메일도 상황에 따라 비울 수 있게 None 허용
authcode: str | None = None # ← type에 따라 필수가 다르므로 None 허용
password: str | None = None
@field_validator('old_email', 'email', mode='before')
def validate_email_strict(cls, v):
"""인증코드만 검증할 때는 old_email/email이 들어오지 않는다.
이메일 변경할 때는 인증코드와 함께 old_email과 email이 들어온다.
old_email과 email이 들어오지 않는 경우는 인증코드만 검증할 수 있도록 None을 반환"""
if v is None or v == "":
return None
"""# 그 외는 strict_email 통과"""
email = strict_email(v)
return email
# Project_folder/app/schemas/auth.py
from pydantic import BaseModel, EmailStr, field_validator
from pydantic_core import PydanticCustomError
from app.utils.commons import strict_email
class TokenResponse(BaseModel):
access_token: str
refresh_token: str
token_type: str
class LoginRequest(BaseModel):
email: EmailStr
password: str
@field_validator('email', mode='before')
def validate_email_strict(cls, v):
email = strict_email(v)
return email
@field_validator('password')
def not_empty(cls, v):
print(f"Login Request: ", v)
if not v or not v.strip():
# raise ValueError('빈 값은 허용되지 않습니다.')
# 접두사 없이 메시지 그대로 내려감
raise PydanticCustomError('empty_value', '빈 값은 허용되지 않습니다.')
return v
# Project_folder/app/services/account_service.py
from pydantic import EmailStr
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi import Depends
from app.core.database import get_db
from app.models.user import User
from app.schemas.accounts import UserIn, UserPasswordUpdate, UserUpdate
from app.utils.accounts import get_password_hash
class UserService:
def __init__(self, db: AsyncSession):
self.db = db
async def create_user(self, user_in: UserIn):
hashed_password = await get_password_hash(user_in.confirmPassword)
db_user = User(
email=str(user_in.email),
username=user_in.username,
password=hashed_password,
)
self.db.add(db_user)
await self.db.commit()
await self.db.refresh(db_user)
return db_user
async def get_user_by_email(self, email: EmailStr):
query = (select(User).where(User.email == email))
result = await self.db.execute(query) # await 추가
return result.scalar_one_or_none()
async def get_user_by_username(self, username: str):
query = (select(User).where(User.username == username))
result = await self.db.execute(query) # await 추가
return result.scalar_one_or_none()
async def get_users(self):
query = (select(User).order_by(User.created_at.desc()))
result = await self.db.execute(query)
users = result.scalars().all()
return users
async def get_user_by_id(self, user_id: int):
query = (select(User).where(User.id == user_id))
result = await self.db.execute(query)
user = result.scalar_one_or_none()
return user
async def update_user(self, user_id: int, user_update: UserUpdate):
user = await self.get_user_by_id(user_id)
if user is None:
return None
if user_update.username is not None:
user.username = user_update.username
if user_update.email is not None:
user.email = str(user_update.email)
await self.db.commit()
await self.db.refresh(user)
return user
async def update_email(self, old_email: EmailStr, email: EmailStr):
user = await self.get_user_by_email(old_email)
print("user", user)
if user is None:
return None
user.email = str(email)
await self.db.commit()
await self.db.refresh(user)
return user
async def update_password(self, user_id: int, password_update: UserPasswordUpdate):
user = await self.get_user_by_id(user_id)
if user is None:
return None
hashed_password = await get_password_hash(password_update.password)
user.password = hashed_password
await self.db.commit()
await self.db.refresh(user)
return user
async def user_image_update(self, user_id: int, img_path: str):
user = await self.get_user_by_id(user_id)
if user is None:
return None
user.img_path = img_path
await self.db.commit()
await self.db.refresh(user)
return user
async def delete_user(self, user_id: int):
user = await self.get_user_by_id(user_id)
if user is None:
return False
await self.db.delete(user)
await self.db.commit()
return True
def get_user_service(db: AsyncSession = Depends(get_db)) -> 'UserService':
return UserService(db)
# Project_folder/app/services/auth_service.py
from datetime import timedelta, datetime, timezone
from fastapi import Depends
from jose import jwt
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.core.settings import CONFIG
from app.models.user import User
from app.schemas.auth import LoginRequest
from app.services.token_service import AsyncTokenService
from app.utils.accounts import verify_password
from app.utils.auth import create_access_token, create_refresh_token, verify_token
from app.utils.exc_handler import CustomErrorException
class AuthService:
def __init__(self, db: AsyncSession):
self.db = db
"""
사용자 인증을 수행합니다.
"""
async def authenticate_user(self, login_data: LoginRequest):
# 사용자 조회
print("login_data: ", login_data)
print("login_data.email: ", login_data.email)
if not login_data.email or not login_data.password:
raise CustomErrorException(status_code=410,
detail="인증 실패: 빈칸을 채워주세요.",
headers={"WWW-Authenticate": "Bearer"})
query = (
select(User).
where(User.email == login_data.email)
)
result = await self.db.execute(query)
user = result.scalar_one_or_none()
print("user: ", user)
print("user.email: ", user.email)
print("login_data.email: ", login_data.email)
print("login_data.password: ", login_data.password)
print("user.password: ", user.password)
if not user:
# return None
# 사용자 없음
raise CustomErrorException(status_code=411,
detail="인증 실패: 가입된 이메일이 존재하지 않습니다.",
headers={"WWW-Authenticate": "Bearer"})
password_ok = await verify_password(login_data.password, str(user.password))
if not password_ok:
# return None
# raise PydanticCustomError를 던지면 Internal Server Error가 터져버린다.
# raise PydanticCustomError('empty_value', '비밀번호 불일치')
# 비밀번호 불일치
raise CustomErrorException(status_code=411,
detail="인증 실패: 비밀번호가 일치하지 않습니다.",
headers={"WWW-Authenticate": "Bearer"})
print("user: ", user)
return user
"""
사용자 정보를 기반으로 액세스 토큰을 생성합니다.
"""
# AI Chat 권장: 정말 self/cls를 쓰지 않는다면 정적 메서드로 전환
# - 메서드 선언에 @staticmethod를 붙이고 self 인자를 제거하세요.
@staticmethod
async def create_user_token(user: User):
# async def create_user_token(self, user: User):
# 토큰에 포함될 데이터
token_data = {
"username": user.username,
"email": user.email,
"user_id": user.id,
}
# 만료 시간 설정
access_token_expires = timedelta(minutes=CONFIG.ACCESS_TOKEN_EXPIRE)
# 액세스 토큰 생성
access_token = await create_access_token(
data=token_data,
expires_delta=access_token_expires
)
try:
unverified = jwt.get_unverified_claims(access_token)
exp_ts = unverified.get("exp")
if exp_ts:
print("============== access_token의 만료기간 관련 정보 ==============")
print("exp:", datetime.fromtimestamp(exp_ts, tz=timezone.utc))
print("now:", datetime.now(timezone.utc))
print("seconds_left:", int(exp_ts - datetime.now(timezone.utc).timestamp()))
except Exception as e:
print("get_unverified_claims 실패는 단순 디버깅 용도이므로 그대로 진행", e)
pass
# 리프레시 토큰 생성
refresh_token = await create_refresh_token(
data=token_data
)
try:
unverified = jwt.get_unverified_claims(refresh_token)
exp_ts = unverified.get("exp")
if exp_ts:
print("============== refresh_token의 만료기간 관련 정보 ==============")
print("exp:", datetime.fromtimestamp(exp_ts, tz=timezone.utc))
print("now:", datetime.now(timezone.utc))
print("seconds_left:", int(exp_ts - datetime.now(timezone.utc).timestamp()))
except Exception as e:
print("get_unverified_claims 실패는 단순 디버깅 용도이므로 그대로 진행", e)
pass
# refresh_token을 Redis에 저장
await AsyncTokenService.store_refresh_token(user.id, refresh_token)
return {
CONFIG.ACCESS_COOKIE_NAME: access_token,
CONFIG.REFRESH_COOKIE_NAME: refresh_token,
"token_type": "bearer"
}
"""
리프레시 토큰을 사용하여 새 액세스 토큰을 발급합니다.
"""
async def refresh_access_token(self, refresh_token: str):
print("refresh_access_token 리프레시 토큰으로 액세스 토큰 생성 시작: refresh_token: ", refresh_token)
# 리프레시 토큰 검증
payload = verify_token(refresh_token)
if not payload:
print("refresh_access_token payload 없다.: ", payload)
return None
user_id = payload.get("user_id")
if not user_id:
print("refresh_access_token user_id 없다.: ", user_id)
return None
""" # refresh_token이 만료기간이 남아 있는 경우 redis에 저장하는 로직을 한번 더 실행
# 뭔가 redis 관련 문제로 인해 is_valid가 None으로 반환되어 버리는 오류?를 없애기 위해 인위적으로 redis에 저장하는 로직을 한번더 실행한다. """
print("refresh_token 인위적 다시 redis 저장 시작")
await AsyncTokenService.store_refresh_token(user_id, refresh_token)
print("refresh_token 인위적 다시 저장 끝 ===> Redis에서 리프레시 토큰 유효성 확인")
# Redis에서 리프레시 토큰 유효성 확인
is_valid = await AsyncTokenService.validate_refresh_token(user_id, refresh_token)
if not is_valid:
print("refresh_access_token is_valid 안됐다.: ", is_valid)
return None
# 사용자 조회
query = (select(User).where(User.id == user_id))
result = await self.db.execute(query)
user = result.scalar_one_or_none()
if not user:
print("refresh_access_token user 없다.: ", user)
return None
# 토큰에 포함될 데이터
token_data = {
"username": user.username,
"email": user.email,
"user_id": user.id,
}
# 새 액세스 토큰 생성
access_token = await create_access_token(token_data)
print("refresh_access_token access_token 만들었다.: ", access_token)
return {
CONFIG.ACCESS_COOKIE_NAME: access_token,
"token_type": "bearer"
}
def get_auth_service(db: AsyncSession = Depends(get_db)):
return AuthService(db)
# Project_folder/migrations/env.py
import asyncio
from logging.config import fileConfig
import os
import sys
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
sys.path.insert(0, os.path.realpath(os.path.join(os.path.dirname(__file__), "..")))
from app.core.database import Base, DATABASE_URL
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
# if config.config_file_name is not None:
# fileConfig(config.config_file_name)
if config.config_file_name is not None:
try:
fileConfig(config.config_file_name, encoding='utf-8')
except TypeError:
# older versions of fileConfig don't support the encoding parameter
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
''' 반드시 모델들을 임포트 하라. '''
from app.models import user
# from app.test import exam
# target_metadata = mymodel.Base.metadata
# target_metadata = None
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection):
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_migrations_online() -> None:
"""Run migrations in 'online' mode for an async application."""
# 환경 변수가 설정되지 않았다면 alembic.ini의 기본값을 사용하도록 폴백 설정 (선택 사항)
db_url = DATABASE_URL
if not db_url:
db_url = config.get_main_option("sqlalchemy.url")
if not db_url:
raise ValueError("데이터베이스 URL을 찾을 수 없습니다. .env 파일이나 환경 변수를 확인하세요.")
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
future=True, # SQLAlchemy 2.0
url=db_url # 여기에 환경 변수에서 가져온 URL을 전달 (개발 및 배포모드 모두 커버)
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
if context.is_offline_mode():
run_migrations_offline()
else:
asyncio.run(run_migrations_online())
# Project_folder/alembic.ini
# A generic, single database configuration.
[alembic]
# path to migration scripts.
# this is typically a path given in POSIX (e.g. forward slashes)
# format, relative to the token %(here)s which refers to the location of this
# ini file
script_location = %(here)s/migrations
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory. for multiple paths, the path separator
# is defined by "path_separator" below.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library.
# Any required deps can installed by adding `alembic[tz]` to the pip requirements
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =
# max length of characters to apply to the "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to <script_location>/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "path_separator"
# below.
# version_locations = %(here)s/bar:%(here)s/bat:%(here)s/alembic/versions
# path_separator; This indicates what character is used to split lists of file
# paths, including version_locations and prepend_sys_path within configparser
# files such as alembic.ini.
# The default rendered in new alembic.ini files is "os", which uses os.pathsep
# to provide os-dependent path splitting.
#
# Note that in order to support legacy alembic.ini files, this default does NOT
# take place if path_separator is not present in alembic.ini. If this
# option is omitted entirely, fallback logic is as follows:
#
# 1. Parsing of the version_locations option falls back to using the legacy
# "version_path_separator" key, which if absent then falls back to the legacy
# behavior of splitting on spaces and/or commas.
# 2. Parsing of the prepend_sys_path option falls back to the legacy
# behavior of splitting on spaces, commas, or colons.
#
# Valid values for path_separator are:
#
# path_separator = :
# path_separator = ;
# path_separator = space
# path_separator = newline
#
# Use os.pathsep. Default configuration used for new projects.
path_separator = os
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# database URL. This is consumed by the user-maintained env.py script only.
# other means of configuring database URLs may be customized within the env.py
# file.
# sqlalchemy.url = mysql+aiomysql://root:981011@localhost:3306/advanced_db?charset=utf8mb4
sqlalchemy.url =
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the module runner, against the "ruff" module
# hooks = ruff
# ruff.type = module
# ruff.module = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Alternatively, use the exec runner to execute a binary found on your PATH
# hooks = ruff
# ruff.type = exec
# ruff.executable = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Logging configuration. This is also consumed by the user-maintained
# env.py script only.
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARNING
handlers = console
qualname =
[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
# Project_folder/.env
# 배포시에 주석해제
#APP_ENV=production
DEBUG_TRUE=true
DEBUG_FALSE=false
# production시 DEBUG_FALSE = false
SECRET_KEY=7b6b057992ae6b79d3fecc6d3ebe40626ef4e2ed8221303202e60fdff204d2a7d334b0
ALGORITHM = HS256
DB_TYPE=mysql
DB_DRIVER=aiomysql
DEV_DB_NAME=advanced_db
DEV_DB_HOST=localhost
DEV_DB_PORT=3306
DEV_DB_USER=root
DEV_DB_PASSWORD=<비밀번호>
PROD_APP_NAME=Deployed APP
PROD_APP_VERSION=Deployed V_0
PROD_APP_DESCRIPTION=FastAPI_Svelte를 이용해 개발한 프로젝트에 대한 배포판
PROD_DB_NAME=advanced_db
PROD_DB_HOST=<배포IP>
PROD_DB_PORT=<포트번호>
PROD_DB_USER=root
PROD_DB_PASSWORD=<비밀번호>
# redis_config.py 연결 설정: REDIS_PASSWORD None은 에러 유발, 배포 과정에 비번 지정
REDIS_HOST=<배포IP>
REDIS_PORT=<포트번호>
REDIS_DB=0
REDIS_PASSWORD=<비밀번호>
# 이메일 보내기 sender 정보
SMTP_USERNAME=<email>
SMTP_PASSWORD=iccxyzmnkyfvszux
SMTP_FROM=<email>
SMTP_HOST=smtp.gmail.com
SMTP_PORT=587
ACCESS_COOKIE_NAME = access_token
REFRESH_COOKIE_NAME = refresh_token
NEW_ACCESS_COOKIE_NAME = new_access_token
NEW_REFRESH_COOKIE_NAME = new_refresh_token
ACCESS_TOKEN_EXPIRE = 30
REFRESH_TOKEN_EXPIRE = 7
PROFILE_IMAGE_URL = user_images/accounts/profiles
ARTICLE_THUMBNAIL_DIR = user_images/articles/thumbnails
ARTICLE_EDITOR_USER_IMG_DIR = user_images/articles/quills
ARTICLE_EDITOR_USER_VIDEO_DIR = user_videos/articles/quills
# 프론트엔드 개발에 사용되는 포트: 아래 두개(127.0.0.1, localhost)는 별개로 인식한다. 둘다 필요하다.
# 리스트는 한 줄로..., 파싱 가능한 형태로 바꾸기
#DEV_ORIGINS="http://localhost:5173,http://localhost:5100,http://127.0.0.1:5173,http://127.0.0.1:5100"
DEV_ORIGINS=["http://localhost:5173","http://localhost:5174","http://localhost:7100","http://localhost:7101","http://127.0.0.1:5173","http://127.0.0.1:5174","http://127.0.0.1:7100","http://127.0.0.1:7101"]
PROD_ORIGINS=[]