""" Tests for auth_service: security utils, AuthService logic, and API endpoints. Validates tasks 2.1 – 2.3. """ import pytest import pytest_asyncio from unittest.mock import patch, AsyncMock from fastapi import HTTPException from shared.models import User from tests.conftest import TestSessionLocal, FakeRedis # Import security utilities from auth_service.app.utils.security import ( hash_password, verify_password, validate_password_strength, create_access_token, decode_access_token, ) from auth_service.app.services.auth_service import AuthService from auth_service.app.schemas.user import UserCreate, UserLogin # ===================== Password utilities ===================== class TestPasswordUtils: def test_hash_and_verify(self): raw = "MyP@ssw0rd" hashed = hash_password(raw) assert verify_password(raw, hashed) def test_wrong_password_rejected(self): hashed = hash_password("Correct1!") assert not verify_password("Wrong1!", hashed) @pytest.mark.parametrize( "pwd, expected_valid", [ ("Ab1!abcd", True), # meets all criteria ("short1A!", True), # 8 chars, has upper/lower/digit/special – valid ("alllower1!", False), # no uppercase ("ALLUPPER1!", False), # no lowercase ("NoDigits!Aa", False), # no digit ("NoSpecial1a", False), # no special char ], ) def test_password_strength(self, pwd, expected_valid): is_valid, _ = validate_password_strength(pwd) assert is_valid == expected_valid def test_password_too_short(self): is_valid, msg = validate_password_strength("Ab1!") assert not is_valid assert "8 characters" in msg # ===================== JWT utilities ===================== class TestJWT: def test_create_and_decode(self): token = create_access_token({"sub": "user-123", "username": "alice"}) payload = decode_access_token(token) assert payload is not None assert payload["sub"] == "user-123" def test_invalid_token_returns_none(self): assert decode_access_token("not.a.valid.token") is None # ===================== Refresh token helpers (with fake Redis) ===================== class TestRefreshToken: @pytest.mark.asyncio async def test_create_verify_revoke(self, fake_redis): """Full lifecycle: create → verify → revoke → verify again returns None.""" async def _fake_get_redis(): return fake_redis with patch( "auth_service.app.utils.security.get_redis", new=_fake_get_redis, ): from auth_service.app.utils.security import ( create_refresh_token, verify_refresh_token, revoke_refresh_token, ) token = await create_refresh_token("user-42") assert isinstance(token, str) and len(token) > 0 uid = await verify_refresh_token(token) assert uid == "user-42" await revoke_refresh_token(token) assert await verify_refresh_token(token) is None # ===================== AuthService business logic ===================== class TestAuthServiceLogic: @pytest_asyncio.fixture async def auth_svc(self, db_session): return AuthService(db_session) @pytest.mark.asyncio async def test_create_user_success(self, auth_svc, db_session): data = UserCreate(username="newuser", email="new@example.com", password="Str0ng!Pass") user = await auth_svc.create_user(data) assert user.username == "newuser" assert user.email == "new@example.com" assert user.hashed_password != "Str0ng!Pass" @pytest.mark.asyncio async def test_create_user_weak_password_rejected(self, auth_svc): # Use a password that passes Pydantic min_length=8 but fails strength check data = UserCreate(username="weakuser", email="weak@example.com", password="weakpassword") with pytest.raises(HTTPException) as exc_info: await auth_svc.create_user(data) assert exc_info.value.status_code == 400 @pytest.mark.asyncio async def test_get_user_by_email(self, auth_svc, db_session): data = UserCreate(username="findme", email="find@example.com", password="Str0ng!Pass") await auth_svc.create_user(data) found = await auth_svc.get_user_by_email("find@example.com") assert found is not None assert found.username == "findme" @pytest.mark.asyncio async def test_check_user_exists(self, auth_svc, db_session): data = UserCreate(username="exists", email="exists@example.com", password="Str0ng!Pass") await auth_svc.create_user(data) email_u, username_u = await auth_svc.check_user_exists("exists@example.com", "other") assert email_u is not None assert username_u is None # ===================== Auth API endpoint tests ===================== class TestAuthAPI: """Integration tests hitting the FastAPI app via httpx.""" @pytest_asyncio.fixture async def client(self, fake_redis): """ Provide an httpx AsyncClient wired to the auth_service app, with DB session overridden to use the test SQLite engine. """ from shared.models import get_db from auth_service.app.main import app from httpx import AsyncClient, ASGITransport from tests.conftest import TEST_ENGINE, TestSessionLocal, Base async with TEST_ENGINE.begin() as conn: await conn.run_sync(Base.metadata.create_all) async def override_get_db(): async with TestSessionLocal() as session: yield session async def _fake_get_redis(): return fake_redis app.dependency_overrides[get_db] = override_get_db with patch( "auth_service.app.utils.security.get_redis", new=_fake_get_redis, ): async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test" ) as ac: yield ac app.dependency_overrides.clear() @pytest.mark.asyncio async def test_register_and_login(self, client): # Register resp = await client.post("/auth/register", json={ "username": "apiuser", "email": "api@example.com", "password": "Str0ng!Pass1", }) assert resp.status_code == 201 body = resp.json() assert body["success"] is True assert body["data"]["username"] == "apiuser" # Login resp = await client.post("/auth/login", json={ "email": "api@example.com", "password": "Str0ng!Pass1", }) assert resp.status_code == 200 body = resp.json() assert body["success"] is True assert "access_token" in body["data"] assert "refresh_token" in body["data"] @pytest.mark.asyncio async def test_login_wrong_password(self, client): await client.post("/auth/register", json={ "username": "wrongpw", "email": "wrongpw@example.com", "password": "Str0ng!Pass1", }) resp = await client.post("/auth/login", json={ "email": "wrongpw@example.com", "password": "WrongPassword1!", }) assert resp.status_code == 401 @pytest.mark.asyncio async def test_register_duplicate_email(self, client): await client.post("/auth/register", json={ "username": "dup1", "email": "dup@example.com", "password": "Str0ng!Pass1", }) resp = await client.post("/auth/register", json={ "username": "dup2", "email": "dup@example.com", "password": "Str0ng!Pass1", }) assert resp.status_code == 409 @pytest.mark.asyncio async def test_register_weak_password(self, client): resp = await client.post("/auth/register", json={ "username": "weakpwd", "email": "weakpwd@example.com", "password": "weakpassword", }) assert resp.status_code == 400 @pytest.mark.asyncio async def test_me_endpoint(self, client): await client.post("/auth/register", json={ "username": "meuser", "email": "me@example.com", "password": "Str0ng!Pass1", }) login_resp = await client.post("/auth/login", json={ "email": "me@example.com", "password": "Str0ng!Pass1", }) token = login_resp.json()["data"]["access_token"] resp = await client.get( "/auth/me", headers={"Authorization": f"Bearer {token}"}, ) assert resp.status_code == 200 body = resp.json() assert body["data"]["username"] == "meuser" assert body["data"]["email"] == "me@example.com" @pytest.mark.asyncio async def test_refresh_endpoint(self, client): await client.post("/auth/register", json={ "username": "refreshuser", "email": "refresh@example.com", "password": "Str0ng!Pass1", }) login_resp = await client.post("/auth/login", json={ "email": "refresh@example.com", "password": "Str0ng!Pass1", }) refresh_token = login_resp.json()["data"]["refresh_token"] # Refresh resp = await client.post("/auth/refresh", json={ "refresh_token": refresh_token, }) assert resp.status_code == 200 body = resp.json() assert body["success"] is True assert "access_token" in body["data"] new_refresh = body["data"]["refresh_token"] assert new_refresh != refresh_token # rotation # Old token should be revoked resp2 = await client.post("/auth/refresh", json={ "refresh_token": refresh_token, }) assert resp2.status_code == 401 @pytest.mark.asyncio async def test_me_without_token(self, client): resp = await client.get("/auth/me") assert resp.status_code in (401, 403) @pytest.mark.asyncio async def test_unified_error_format(self, client): """Verify error responses follow the unified format.""" resp = await client.post("/auth/login", json={ "email": "nobody@example.com", "password": "Whatever1!", }) body = resp.json() assert body["success"] is False assert body["data"] is None assert "error" in body