Files
assist/check_and_fix_users.py

148 lines
5.7 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from src.core.database import db_manager
from sqlalchemy import text, inspect
print("Checking users table structure...")
try:
with db_manager.get_session() as session:
inspector = inspect(db_manager.engine)
cols = inspector.get_columns('users')
print("\nUsers table columns:")
required_fields = {}
optional_fields = {}
for col in cols:
name = col['name']
nullable = col.get('nullable', True)
default = col.get('default', None)
if nullable or default is not None:
optional_fields[name] = col
print(f" {name}: {col['type']} (nullable: {nullable}, default: {default})")
else:
required_fields[name] = col
print(f" {name}: {col['type']} (REQUIRED, nullable: {nullable})")
print(f"\nRequired fields: {list(required_fields.keys())}")
print(f"Optional fields: {list(optional_fields.keys())}")
# Check for existing admin user
result = session.execute(text("SELECT * FROM users WHERE username = 'admin' LIMIT 1"))
admin_row = result.fetchone()
if admin_row:
print("\nAdmin user found in database")
# Update password
from werkzeug.security import generate_password_hash
password_hash = generate_password_hash('admin123')
session.execute(text("""
UPDATE users
SET password_hash = :password_hash,
is_active = 1,
updated_at = NOW()
WHERE username = 'admin'
"""), {'password_hash': password_hash})
session.commit()
print("Admin password updated successfully")
else:
print("\nAdmin user not found, creating...")
from werkzeug.security import generate_password_hash
password_hash = generate_password_hash('admin123')
# Build INSERT with all required fields
insert_fields = ['username', 'email', 'password_hash', 'role']
insert_values = {
'username': 'admin',
'email': 'admin@tsp.com',
'password_hash': password_hash,
'role': 'admin'
}
# Add optional fields that exist in table
if 'is_active' in optional_fields or 'is_active' not in required_fields:
insert_fields.append('is_active')
insert_values['is_active'] = True
if 'region' in optional_fields or 'region' not in required_fields:
insert_fields.append('region')
insert_values['region'] = None
# Handle full_name if it exists
if 'full_name' in required_fields:
insert_fields.append('full_name')
insert_values['full_name'] = 'Administrator'
elif 'full_name' in optional_fields:
insert_fields.append('full_name')
insert_values['full_name'] = 'Administrator'
# Handle other required fields with defaults
for field_name in required_fields:
if field_name not in insert_fields:
if field_name in ['created_at', 'updated_at']:
insert_fields.append(field_name)
insert_values[field_name] = 'NOW()'
else:
# Use empty string or default value
insert_fields.append(field_name)
insert_values[field_name] = ''
fields_str = ', '.join(insert_fields)
values_str = ', '.join([f':{f}' for f in insert_fields])
sql = f"""
INSERT INTO users ({fields_str})
VALUES ({values_str})
"""
# Fix NOW() placeholders
final_values = {}
for k, v in insert_values.items():
if v == 'NOW()':
# Will use SQL NOW()
continue
final_values[k] = v
# Use raw SQL with NOW()
sql_final = f"""
INSERT INTO users ({fields_str.replace(':created_at', 'created_at').replace(':updated_at', 'updated_at')})
VALUES ({values_str.replace(':created_at', 'NOW()').replace(':updated_at', 'NOW()')})
"""
# Clean up the SQL
for k in ['created_at', 'updated_at']:
if f':{k}' in values_str:
values_str = values_str.replace(f':{k}', 'NOW()')
if k in final_values:
del final_values[k]
# Final SQL construction
final_sql = f"INSERT INTO users ({', '.join([f if f not in ['created_at', 'updated_at'] else f for f in insert_fields])}) VALUES ({', '.join([f':{f}' if f not in ['created_at', 'updated_at'] else 'NOW()' for f in insert_fields])})"
print(f"Executing SQL with fields: {insert_fields}")
session.execute(text(final_sql), final_values)
session.commit()
print("Admin user created successfully")
# Verify
result = session.execute(text("SELECT username, email, role, is_active FROM users WHERE username = 'admin'"))
admin_data = result.fetchone()
if admin_data:
print(f"\nVerification:")
print(f" Username: {admin_data[0]}")
print(f" Email: {admin_data[1]}")
print(f" Role: {admin_data[2]}")
print(f" Is Active: {admin_data[3]}")
print("\nAdmin user ready for login!")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()