basic users model and views

This commit is contained in:
2023-05-15 21:14:33 +03:00
parent ffbc6bf88b
commit 5e54e5b0f6
2 changed files with 140 additions and 138 deletions

View File

@@ -1,16 +1,21 @@
# from sqlmodel import SQLModel, Field
from sqlalchemy import Column, String, DateTime, delete
from sqlalchemy import update as sqlalchemy_update, delete as sqlalchemy_delete
from sqlalchemy import Column, String, DateTime
from sqlalchemy import update as sqlalchemy_update
from sqlalchemy.future import select
from app.db import Base, db
from app.database.models import BaseCRUD
from app.database.db import async_session
from datetime import datetime
from uuid import uuid4
from passlib.context import CryptContext
class User(Base):
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class User(BaseCRUD):
__tablename__ = "users"
id = Column(String, primary_key=True)
username = Column(String)
username = Column(String, nullable=False, unique=True)
full_name = Column(String)
email = Column(String, nullable=False, unique=True)
password = Column(String, nullable=False)
created_at = Column(DateTime, index=True, default=datetime.utcnow)
def __repr__(self):
@@ -21,57 +26,60 @@ class User(Base):
f")>"
)
def verify_password(self, plain_password):
return pwd_context.verify(plain_password, self.password)
@staticmethod
def get_password_hash(password):
return pwd_context.hash(password)
@classmethod
async def authenticate_user(cls, username: str, password: str):
# TODO: add exception when noone found
async with async_session() as db:
query = select(cls).where(cls.username == username)
users = await db.execute(query)
(user,) = users.first()
if user:
if not cls.verify_password(user, password):
return False
return user
@classmethod
async def create(cls, **kwargs):
user = cls(id=str(uuid4()), **kwargs)
db.add(user)
try:
await db.commit()
except Exception:
await db.rollback()
raise
return user
if 'plain_password' in kwargs:
kwargs['password'] = cls.get_password_hash(kwargs.pop('plain_password'))
async with async_session() as db:
user = cls(id=str(uuid4()), **kwargs)
db.add(user)
try:
await db.commit()
await db.refresh()
except Exception:
await db.rollback()
raise
return user
@classmethod
async def update(cls, id, **kwargs):
query = (
sqlalchemy_update(cls)
.where(cls.id == id)
.values(**kwargs)
.execution_options(synchronize_session="fetch")
)
await db.execute(query)
try:
await db.commit()
except Exception:
await db.rollback()
raise
return await cls.get(id)
@classmethod
async def get(cls, id):
query = select(cls).where(cls.id == id)
users = await db.execute(query)
(user,) = users.first()
return user
@classmethod
async def get_all(cls):
query = select(cls)
users = await db.execute(query)
users = users.scalars().all()
return users
@classmethod
async def delete(cls, id):
query = sqlalchemy_delete(cls).where(cls.id == id)
await db.execute(query)
try:
await db.commit()
except Exception:
await db.rollback()
raise
return True
if 'plain_password' in kwargs:
kwargs['password'] = cls.get_password_hash(kwargs.pop('plain_password'))
async with async_session() as db:
query = (
sqlalchemy_update(cls)
.where(cls.id == id)
.values(**kwargs)
.execution_options(synchronize_session="fetch")
)
await db.execute(query)
try:
await db.commit()
except Exception:
await db.rollback()
raise
return await cls.get(id)
# class UserBase(SQLModel):
# username: str