from datetime import datetime from fastapi import HTTPException from passlib.context import CryptContext from sqlalchemy.inspection import inspect from app.schemas import * from constants import SHA1_SALT from database import SessionLocal from database.models import * def get_db(): db = SessionLocal() try: yield db finally: db.close() def instantiate_model(model, data): table = eval(model) instance = table(**data.dict()) return instance def insert_data(model, data, db): item = instantiate_model(model=model, data=data) db.add(item) db.commit() db.refresh(item) return item def delete_data(model, data, db): table = eval(model) primary_key_tuple = inspect(table).primary_key primary_key = primary_key_tuple[0] db.query(table).filter(primary_key == data).delete() db.commit() def fetch_user_by_key(data, db): return db.query(Users).filter(Users.access_key == data.access_key).first() def fetch_user_by_email(data, db): return db.query(Users).filter(Users.email == data.email).first() def create_user(data, db): data.password = create_password_hash(secret=data.password) user = insert_data(model="Users", data=data, db=db) return user def update_otp(data, db): db.query(Users).filter(Users.email == data.email).update( {Users.otp: data.otp, Users.otp_valid_time: data.otp_valid_time} ) db.commit() def create_password_hash(secret): pwd_context = CryptContext(schemes=["bcrypt", "hex_sha1"], deprecated=["hex_sha1"]) password_hash = pwd_context.hash(secret=secret) return password_hash def update_password_hash(user, password, db): new_hash = create_password_hash(secret=password) db.query(Users).filter(Users.email == user.email).update({Users.password: new_hash}) db.commit() db.refresh(user) def check_legacy_hash(db_hash): sha1_length = 40 if len(db_hash) == sha1_length: return True return False def construct_secret(db_hash, password): legacy_hash = check_legacy_hash(db_hash=db_hash) if legacy_hash: return SHA1_SALT + password, legacy_hash return password, legacy_hash def verify_password_hash(secret, hash): pwd_context = CryptContext(schemes=["bcrypt", "hex_sha1"], deprecated=["hex_sha1"]) return pwd_context.verify(secret=secret, hash=hash) def verify_password(user, password, db): secret, legacy_hash = construct_secret(db_hash=user.password, password=password) correct_password = verify_password_hash(secret=secret, hash=user.password) if correct_password: if legacy_hash: update_password_hash(user=user, password=password, db=db) return True return False def authenticate_user(data: UserLogin, db): user = fetch_user_by_email(data=data, db=db) if not user: raise HTTPException(status_code=400, detail="Incorrect username or password") correct_password = verify_password(user=user, password=data.password, db=db) if not correct_password: raise HTTPException(status_code=400, detail="Incorrect username or password") valid_account = user.status if not valid_account: raise HTTPException(status_code=400, detail="Your account is not active") return user def activate_account(user, db): db.query(Users).filter(Users.access_key == user.access_key).update( {Users.status: 1} ) db.commit() db.refresh(user) def deactivate_account(user, db): db.query(Users).filter(Users.email == user.email).update( {Users.status: 0, Users.forgot_password: 1} ) db.commit() db.refresh(user) def unset_forgot_password(user, db): db.query(Users).filter(Users.email == user.email).update({Users.forgot_password: 0}) db.commit() db.refresh(user) def verify_otp(data: OTPVerify, db): user = fetch_user_by_key(data=data, db=db) matching_otp = user.otp == data.otp valid_time = datetime.now() < user.otp_valid_time valid_otp = matching_otp and valid_time if valid_otp: activate_account(user=user, db=db) return user else: raise HTTPException(status_code=400, detail="The OTP is not correct") def mark_password_reset(data, db): user = fetch_user_by_email(data=data, db=db) if user.social_id: raise HTTPException( status_code=400, detail="You logged in with Facebook/Google. You can't reset the password", ) deactivate_account(user=user, db=db) def verify_password_reset(data, db): user = fetch_user_by_email(data=data, db=db) valid_account = user.status password_reset_request = user.forgot_password valid_request = valid_account and password_reset_request if valid_request: update_password_hash(user=user, password=data.password, db=db) unset_forgot_password(user=user, db=db) return user else: raise HTTPException(status_code=400, detail="An error has ocurred")