Files
weibo_signin/backend/tests/test_auth_service.py
2026-03-09 14:05:00 +08:00

318 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Tests for auth_service: security utils, AuthService logic, and API endpoints.
Validates tasks 2.1 2.3.
"""
import pytest
import pytest_asyncio
from unittest.mock import patch, AsyncMock
from fastapi import HTTPException
from shared.models import User
from tests.conftest import TestSessionLocal, FakeRedis
# Import security utilities
from auth_service.app.utils.security import (
hash_password,
verify_password,
validate_password_strength,
create_access_token,
decode_access_token,
)
from auth_service.app.services.auth_service import AuthService
from auth_service.app.schemas.user import UserCreate, UserLogin
# ===================== Password utilities =====================
class TestPasswordUtils:
def test_hash_and_verify(self):
raw = "MyP@ssw0rd"
hashed = hash_password(raw)
assert verify_password(raw, hashed)
def test_wrong_password_rejected(self):
hashed = hash_password("Correct1!")
assert not verify_password("Wrong1!", hashed)
@pytest.mark.parametrize(
"pwd, expected_valid",
[
("Ab1!abcd", True), # meets all criteria
("short1A!", True), # 8 chars, has upper/lower/digit/special valid
("alllower1!", False), # no uppercase
("ALLUPPER1!", False), # no lowercase
("NoDigits!Aa", False), # no digit
("NoSpecial1a", False), # no special char
],
)
def test_password_strength(self, pwd, expected_valid):
is_valid, _ = validate_password_strength(pwd)
assert is_valid == expected_valid
def test_password_too_short(self):
is_valid, msg = validate_password_strength("Ab1!")
assert not is_valid
assert "8 characters" in msg
# ===================== JWT utilities =====================
class TestJWT:
def test_create_and_decode(self):
token = create_access_token({"sub": "user-123", "username": "alice"})
payload = decode_access_token(token)
assert payload is not None
assert payload["sub"] == "user-123"
def test_invalid_token_returns_none(self):
assert decode_access_token("not.a.valid.token") is None
# ===================== Refresh token helpers (with fake Redis) =====================
class TestRefreshToken:
@pytest.mark.asyncio
async def test_create_verify_revoke(self, fake_redis):
"""Full lifecycle: create → verify → revoke → verify again returns None."""
async def _fake_get_redis():
return fake_redis
with patch(
"auth_service.app.utils.security.get_redis",
new=_fake_get_redis,
):
from auth_service.app.utils.security import (
create_refresh_token,
verify_refresh_token,
revoke_refresh_token,
)
token = await create_refresh_token("user-42")
assert isinstance(token, str) and len(token) > 0
uid = await verify_refresh_token(token)
assert uid == "user-42"
await revoke_refresh_token(token)
assert await verify_refresh_token(token) is None
# ===================== AuthService business logic =====================
class TestAuthServiceLogic:
@pytest_asyncio.fixture
async def auth_svc(self, db_session):
return AuthService(db_session)
@pytest.mark.asyncio
async def test_create_user_success(self, auth_svc, db_session):
data = UserCreate(username="newuser", email="new@example.com", password="Str0ng!Pass")
user = await auth_svc.create_user(data)
assert user.username == "newuser"
assert user.email == "new@example.com"
assert user.hashed_password != "Str0ng!Pass"
@pytest.mark.asyncio
async def test_create_user_weak_password_rejected(self, auth_svc):
# Use a password that passes Pydantic min_length=8 but fails strength check
data = UserCreate(username="weakuser", email="weak@example.com", password="weakpassword")
with pytest.raises(HTTPException) as exc_info:
await auth_svc.create_user(data)
assert exc_info.value.status_code == 400
@pytest.mark.asyncio
async def test_get_user_by_email(self, auth_svc, db_session):
data = UserCreate(username="findme", email="find@example.com", password="Str0ng!Pass")
await auth_svc.create_user(data)
found = await auth_svc.get_user_by_email("find@example.com")
assert found is not None
assert found.username == "findme"
@pytest.mark.asyncio
async def test_check_user_exists(self, auth_svc, db_session):
data = UserCreate(username="exists", email="exists@example.com", password="Str0ng!Pass")
await auth_svc.create_user(data)
email_u, username_u = await auth_svc.check_user_exists("exists@example.com", "other")
assert email_u is not None
assert username_u is None
# ===================== Auth API endpoint tests =====================
class TestAuthAPI:
"""Integration tests hitting the FastAPI app via httpx."""
@pytest_asyncio.fixture
async def client(self, fake_redis):
"""
Provide an httpx AsyncClient wired to the auth_service app,
with DB session overridden to use the test SQLite engine.
"""
from shared.models import get_db
from auth_service.app.main import app
from httpx import AsyncClient, ASGITransport
from tests.conftest import TEST_ENGINE, TestSessionLocal, Base
async with TEST_ENGINE.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def override_get_db():
async with TestSessionLocal() as session:
yield session
async def _fake_get_redis():
return fake_redis
app.dependency_overrides[get_db] = override_get_db
with patch(
"auth_service.app.utils.security.get_redis",
new=_fake_get_redis,
):
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as ac:
yield ac
app.dependency_overrides.clear()
@pytest.mark.asyncio
async def test_register_and_login(self, client):
# Register
resp = await client.post("/auth/register", json={
"username": "apiuser",
"email": "api@example.com",
"password": "Str0ng!Pass1",
})
assert resp.status_code == 201
body = resp.json()
assert body["success"] is True
assert body["data"]["username"] == "apiuser"
# Login
resp = await client.post("/auth/login", json={
"email": "api@example.com",
"password": "Str0ng!Pass1",
})
assert resp.status_code == 200
body = resp.json()
assert body["success"] is True
assert "access_token" in body["data"]
assert "refresh_token" in body["data"]
@pytest.mark.asyncio
async def test_login_wrong_password(self, client):
await client.post("/auth/register", json={
"username": "wrongpw",
"email": "wrongpw@example.com",
"password": "Str0ng!Pass1",
})
resp = await client.post("/auth/login", json={
"email": "wrongpw@example.com",
"password": "WrongPassword1!",
})
assert resp.status_code == 401
@pytest.mark.asyncio
async def test_register_duplicate_email(self, client):
await client.post("/auth/register", json={
"username": "dup1",
"email": "dup@example.com",
"password": "Str0ng!Pass1",
})
resp = await client.post("/auth/register", json={
"username": "dup2",
"email": "dup@example.com",
"password": "Str0ng!Pass1",
})
assert resp.status_code == 409
@pytest.mark.asyncio
async def test_register_weak_password(self, client):
resp = await client.post("/auth/register", json={
"username": "weakpwd",
"email": "weakpwd@example.com",
"password": "weakpassword",
})
assert resp.status_code == 400
@pytest.mark.asyncio
async def test_me_endpoint(self, client):
await client.post("/auth/register", json={
"username": "meuser",
"email": "me@example.com",
"password": "Str0ng!Pass1",
})
login_resp = await client.post("/auth/login", json={
"email": "me@example.com",
"password": "Str0ng!Pass1",
})
token = login_resp.json()["data"]["access_token"]
resp = await client.get(
"/auth/me",
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code == 200
body = resp.json()
assert body["data"]["username"] == "meuser"
assert body["data"]["email"] == "me@example.com"
@pytest.mark.asyncio
async def test_refresh_endpoint(self, client):
await client.post("/auth/register", json={
"username": "refreshuser",
"email": "refresh@example.com",
"password": "Str0ng!Pass1",
})
login_resp = await client.post("/auth/login", json={
"email": "refresh@example.com",
"password": "Str0ng!Pass1",
})
refresh_token = login_resp.json()["data"]["refresh_token"]
# Refresh
resp = await client.post("/auth/refresh", json={
"refresh_token": refresh_token,
})
assert resp.status_code == 200
body = resp.json()
assert body["success"] is True
assert "access_token" in body["data"]
new_refresh = body["data"]["refresh_token"]
assert new_refresh != refresh_token # rotation
# Old token should be revoked
resp2 = await client.post("/auth/refresh", json={
"refresh_token": refresh_token,
})
assert resp2.status_code == 401
@pytest.mark.asyncio
async def test_me_without_token(self, client):
resp = await client.get("/auth/me")
assert resp.status_code in (401, 403)
@pytest.mark.asyncio
async def test_unified_error_format(self, client):
"""Verify error responses follow the unified format."""
resp = await client.post("/auth/login", json={
"email": "nobody@example.com",
"password": "Whatever1!",
})
body = resp.json()
assert body["success"] is False
assert body["data"] is None
assert "error" in body