Files
weibo_signin/backend/tests/test_auth_service.py

318 lines
10 KiB
Python
Raw Normal View History

2026-03-09 14:05:00 +08:00
"""
Tests for auth_service: security utils, AuthService logic, and API endpoints.
Validates tasks 2.1 2.3.
"""
import pytest
import pytest_asyncio
from unittest.mock import patch, AsyncMock
from fastapi import HTTPException
from shared.models import User
from tests.conftest import TestSessionLocal, FakeRedis
# Import security utilities
from auth_service.app.utils.security import (
hash_password,
verify_password,
validate_password_strength,
create_access_token,
decode_access_token,
)
from auth_service.app.services.auth_service import AuthService
from auth_service.app.schemas.user import UserCreate, UserLogin
# ===================== Password utilities =====================
class TestPasswordUtils:
def test_hash_and_verify(self):
raw = "MyP@ssw0rd"
hashed = hash_password(raw)
assert verify_password(raw, hashed)
def test_wrong_password_rejected(self):
hashed = hash_password("Correct1!")
assert not verify_password("Wrong1!", hashed)
@pytest.mark.parametrize(
"pwd, expected_valid",
[
("Ab1!abcd", True), # meets all criteria
("short1A!", True), # 8 chars, has upper/lower/digit/special valid
("alllower1!", False), # no uppercase
("ALLUPPER1!", False), # no lowercase
("NoDigits!Aa", False), # no digit
("NoSpecial1a", False), # no special char
],
)
def test_password_strength(self, pwd, expected_valid):
is_valid, _ = validate_password_strength(pwd)
assert is_valid == expected_valid
def test_password_too_short(self):
is_valid, msg = validate_password_strength("Ab1!")
assert not is_valid
assert "8 characters" in msg
# ===================== JWT utilities =====================
class TestJWT:
def test_create_and_decode(self):
token = create_access_token({"sub": "user-123", "username": "alice"})
payload = decode_access_token(token)
assert payload is not None
assert payload["sub"] == "user-123"
def test_invalid_token_returns_none(self):
assert decode_access_token("not.a.valid.token") is None
# ===================== Refresh token helpers (with fake Redis) =====================
class TestRefreshToken:
@pytest.mark.asyncio
async def test_create_verify_revoke(self, fake_redis):
"""Full lifecycle: create → verify → revoke → verify again returns None."""
async def _fake_get_redis():
return fake_redis
with patch(
"auth_service.app.utils.security.get_redis",
new=_fake_get_redis,
):
from auth_service.app.utils.security import (
create_refresh_token,
verify_refresh_token,
revoke_refresh_token,
)
token = await create_refresh_token("user-42")
assert isinstance(token, str) and len(token) > 0
uid = await verify_refresh_token(token)
assert uid == "user-42"
await revoke_refresh_token(token)
assert await verify_refresh_token(token) is None
# ===================== AuthService business logic =====================
class TestAuthServiceLogic:
@pytest_asyncio.fixture
async def auth_svc(self, db_session):
return AuthService(db_session)
@pytest.mark.asyncio
async def test_create_user_success(self, auth_svc, db_session):
data = UserCreate(username="newuser", email="new@example.com", password="Str0ng!Pass")
user = await auth_svc.create_user(data)
assert user.username == "newuser"
assert user.email == "new@example.com"
assert user.hashed_password != "Str0ng!Pass"
@pytest.mark.asyncio
async def test_create_user_weak_password_rejected(self, auth_svc):
# Use a password that passes Pydantic min_length=8 but fails strength check
data = UserCreate(username="weakuser", email="weak@example.com", password="weakpassword")
with pytest.raises(HTTPException) as exc_info:
await auth_svc.create_user(data)
assert exc_info.value.status_code == 400
@pytest.mark.asyncio
async def test_get_user_by_email(self, auth_svc, db_session):
data = UserCreate(username="findme", email="find@example.com", password="Str0ng!Pass")
await auth_svc.create_user(data)
found = await auth_svc.get_user_by_email("find@example.com")
assert found is not None
assert found.username == "findme"
@pytest.mark.asyncio
async def test_check_user_exists(self, auth_svc, db_session):
data = UserCreate(username="exists", email="exists@example.com", password="Str0ng!Pass")
await auth_svc.create_user(data)
email_u, username_u = await auth_svc.check_user_exists("exists@example.com", "other")
assert email_u is not None
assert username_u is None
# ===================== Auth API endpoint tests =====================
class TestAuthAPI:
"""Integration tests hitting the FastAPI app via httpx."""
@pytest_asyncio.fixture
async def client(self, fake_redis):
"""
Provide an httpx AsyncClient wired to the auth_service app,
with DB session overridden to use the test SQLite engine.
"""
from shared.models import get_db
from auth_service.app.main import app
from httpx import AsyncClient, ASGITransport
from tests.conftest import TEST_ENGINE, TestSessionLocal, Base
async with TEST_ENGINE.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def override_get_db():
async with TestSessionLocal() as session:
yield session
async def _fake_get_redis():
return fake_redis
app.dependency_overrides[get_db] = override_get_db
with patch(
"auth_service.app.utils.security.get_redis",
new=_fake_get_redis,
):
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as ac:
yield ac
app.dependency_overrides.clear()
@pytest.mark.asyncio
async def test_register_and_login(self, client):
# Register
resp = await client.post("/auth/register", json={
"username": "apiuser",
"email": "api@example.com",
"password": "Str0ng!Pass1",
})
assert resp.status_code == 201
body = resp.json()
assert body["success"] is True
assert body["data"]["username"] == "apiuser"
# Login
resp = await client.post("/auth/login", json={
"email": "api@example.com",
"password": "Str0ng!Pass1",
})
assert resp.status_code == 200
body = resp.json()
assert body["success"] is True
assert "access_token" in body["data"]
assert "refresh_token" in body["data"]
@pytest.mark.asyncio
async def test_login_wrong_password(self, client):
await client.post("/auth/register", json={
"username": "wrongpw",
"email": "wrongpw@example.com",
"password": "Str0ng!Pass1",
})
resp = await client.post("/auth/login", json={
"email": "wrongpw@example.com",
"password": "WrongPassword1!",
})
assert resp.status_code == 401
@pytest.mark.asyncio
async def test_register_duplicate_email(self, client):
await client.post("/auth/register", json={
"username": "dup1",
"email": "dup@example.com",
"password": "Str0ng!Pass1",
})
resp = await client.post("/auth/register", json={
"username": "dup2",
"email": "dup@example.com",
"password": "Str0ng!Pass1",
})
assert resp.status_code == 409
@pytest.mark.asyncio
async def test_register_weak_password(self, client):
resp = await client.post("/auth/register", json={
"username": "weakpwd",
"email": "weakpwd@example.com",
"password": "weakpassword",
})
assert resp.status_code == 400
@pytest.mark.asyncio
async def test_me_endpoint(self, client):
await client.post("/auth/register", json={
"username": "meuser",
"email": "me@example.com",
"password": "Str0ng!Pass1",
})
login_resp = await client.post("/auth/login", json={
"email": "me@example.com",
"password": "Str0ng!Pass1",
})
token = login_resp.json()["data"]["access_token"]
resp = await client.get(
"/auth/me",
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code == 200
body = resp.json()
assert body["data"]["username"] == "meuser"
assert body["data"]["email"] == "me@example.com"
@pytest.mark.asyncio
async def test_refresh_endpoint(self, client):
await client.post("/auth/register", json={
"username": "refreshuser",
"email": "refresh@example.com",
"password": "Str0ng!Pass1",
})
login_resp = await client.post("/auth/login", json={
"email": "refresh@example.com",
"password": "Str0ng!Pass1",
})
refresh_token = login_resp.json()["data"]["refresh_token"]
# Refresh
resp = await client.post("/auth/refresh", json={
"refresh_token": refresh_token,
})
assert resp.status_code == 200
body = resp.json()
assert body["success"] is True
assert "access_token" in body["data"]
new_refresh = body["data"]["refresh_token"]
assert new_refresh != refresh_token # rotation
# Old token should be revoked
resp2 = await client.post("/auth/refresh", json={
"refresh_token": refresh_token,
})
assert resp2.status_code == 401
@pytest.mark.asyncio
async def test_me_without_token(self, client):
resp = await client.get("/auth/me")
assert resp.status_code in (401, 403)
@pytest.mark.asyncio
async def test_unified_error_format(self, client):
"""Verify error responses follow the unified format."""
resp = await client.post("/auth/login", json={
"email": "nobody@example.com",
"password": "Whatever1!",
})
body = resp.json()
assert body["success"] is False
assert body["data"] is None
assert "error" in body