93 lines
2.8 KiB
Python
93 lines
2.8 KiB
Python
from sqlalchemy import Column, String, DateTime
|
|
from sqlalchemy import update as sqlalchemy_update
|
|
from sqlalchemy.future import select
|
|
from app.database.models import BaseCRUD
|
|
from app.database.db import async_session
|
|
from datetime import datetime
|
|
|
|
from uuid import uuid4
|
|
from passlib.context import CryptContext
|
|
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
|
|
class User(BaseCRUD):
|
|
__tablename__ = "users"
|
|
username = Column(String, nullable=False, unique=True)
|
|
full_name = Column(String)
|
|
email = Column(String, nullable=False, unique=True)
|
|
password = Column(String, nullable=False)
|
|
created_at = Column(DateTime, index=True, default=datetime.utcnow)
|
|
|
|
def __repr__(self):
|
|
return (
|
|
f"<{self.__class__.__name__}("
|
|
f"id={self.id}, "
|
|
f"username={self.username}, "
|
|
f")>"
|
|
)
|
|
|
|
def verify_password(self, plain_password):
|
|
return pwd_context.verify(plain_password, self.password)
|
|
|
|
@staticmethod
|
|
def get_password_hash(password):
|
|
return pwd_context.hash(password)
|
|
|
|
@classmethod
|
|
async def authenticate_user(cls, username: str, password: str):
|
|
# TODO: add exception when noone found
|
|
async with async_session() as db:
|
|
query = select(cls).where(cls.username == username)
|
|
users = await db.execute(query)
|
|
(user,) = users.first()
|
|
|
|
if user:
|
|
if not cls.verify_password(user, password):
|
|
return False
|
|
|
|
return user
|
|
|
|
@classmethod
|
|
async def create(cls, **kwargs):
|
|
if 'plain_password' in kwargs:
|
|
kwargs['password'] = cls.get_password_hash(kwargs.pop('plain_password'))
|
|
async with async_session() as db:
|
|
user = cls(id=str(uuid4()), **kwargs)
|
|
db.add(user)
|
|
try:
|
|
await db.commit()
|
|
await db.refresh()
|
|
except Exception:
|
|
await db.rollback()
|
|
raise
|
|
return user
|
|
|
|
@classmethod
|
|
async def update(cls, id, **kwargs):
|
|
if 'plain_password' in kwargs:
|
|
kwargs['password'] = cls.get_password_hash(kwargs.pop('plain_password'))
|
|
async with async_session() as db:
|
|
query = (
|
|
sqlalchemy_update(cls)
|
|
.where(cls.id == id)
|
|
.values(**kwargs)
|
|
.execution_options(synchronize_session="fetch")
|
|
)
|
|
await db.execute(query)
|
|
try:
|
|
await db.commit()
|
|
except Exception:
|
|
await db.rollback()
|
|
raise
|
|
return await cls.get(id)
|
|
|
|
# class UserBase(SQLModel):
|
|
# username: str
|
|
# full_name: str
|
|
# email: str
|
|
# hashed_password: str
|
|
# disabled: bool
|
|
|
|
# class User(UserBase, table=True):
|
|
# id: int = Field(default=None, primary_key=True)
|