from sqlalchemy import Column, String, DateTime from sqlalchemy import update as sqlalchemy_update from sqlalchemy.future import select from app.database.models import BaseCRUD from app.database.db import async_session from datetime import datetime from uuid import uuid4 from passlib.context import CryptContext pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") class User(BaseCRUD): __tablename__ = "users" username = Column(String, nullable=False, unique=True) full_name = Column(String) email = Column(String, nullable=False, unique=True) password = Column(String, nullable=False) created_at = Column(DateTime, index=True, default=datetime.utcnow) def __repr__(self): return ( f"<{self.__class__.__name__}(" f"id={self.id}, " f"username={self.username}, " f")>" ) def verify_password(self, plain_password): return pwd_context.verify(plain_password, self.password) @staticmethod def get_password_hash(password): return pwd_context.hash(password) @classmethod async def authenticate_user(cls, username: str, password: str): # TODO: add exception when noone found async with async_session() as db: query = select(cls).where(cls.username == username) users = await db.execute(query) (user,) = users.first() if user: if not cls.verify_password(user, password): return False return user @classmethod async def create(cls, **kwargs): if 'plain_password' in kwargs: kwargs['password'] = cls.get_password_hash(kwargs.pop('plain_password')) async with async_session() as db: user = cls(id=str(uuid4()), **kwargs) db.add(user) try: await db.commit() await db.refresh() except Exception: await db.rollback() raise return user @classmethod async def update(cls, id, **kwargs): if 'plain_password' in kwargs: kwargs['password'] = cls.get_password_hash(kwargs.pop('plain_password')) async with async_session() as db: query = ( sqlalchemy_update(cls) .where(cls.id == id) .values(**kwargs) .execution_options(synchronize_session="fetch") ) await db.execute(query) try: await db.commit() except Exception: await db.rollback() raise return await cls.get(id) # class UserBase(SQLModel): # username: str # full_name: str # email: str # hashed_password: str # disabled: bool # class User(UserBase, table=True): # id: int = Field(default=None, primary_key=True)