Files
zod-backend/account/utils.py
2023-07-11 11:52:07 +05:30

124 lines
3.7 KiB
Python

"""Account utils"""
"""Import django"""
from django.conf import settings
from rest_framework import viewsets, status
from rest_framework.response import Response
"""Third party Django app"""
from templated_email import send_templated_mail
import jwt
from datetime import datetime
from calendar import timegm
from uuid import uuid4
import secrets
from junior.models import Junior
from guardian.models import Guardian
def junior_account_update(user_tb):
"""junior account delete"""
junior_data = Junior.objects.filter(auth__email=user_tb.email).first()
if junior_data:
junior_data.is_active = False
junior_data.is_verified = False
junior_data.guardian_code = '{}'
junior_data.save()
def guardian_account_update(user_tb):
"""update guardian account after delete the user account"""
guardian_data = Guardian.objects.filter(user__email=user_tb.email).first()
if guardian_data:
guardian_data.is_active = False
guardian_data.is_verified = False
guardian_data.save()
jun_data = Junior.objects.filter(guardian_code__icontains=str(guardian_data.guardian_code))
for data in jun_data:
data.guardian_code.remove(guardian_data.guardian_code)
data.save()
def send_otp_email(recipient_email, otp):
"""Send otp on email with template"""
from_email = settings.EMAIL_FROM_ADDRESS
recipient_list = [recipient_email]
send_templated_mail(
template_name='email_otp_verification.email',
from_email=from_email,
recipient_list=recipient_list,
context={
'otp': otp
}
)
return otp
def custom_response(detail, data=None, response_status=status.HTTP_200_OK):
"""Custom response code"""
if not data:
"""when data is none"""
data = None
return Response({"data": data, "message": detail, "status": "success", "code": response_status})
def custom_error_response(detail, response_status):
"""
function is used for getting same global error response for all
:param detail: error message .
:param response_status: http status.
:return: Json response
"""
if not detail:
"""when details is empty"""
detail = {}
return Response({"error": detail, "status": "failed", "code": response_status})
def get_user_data(attrs):
"""
used to decode token
"""
user_data = jwt.decode(jwt=attrs['token'], options={'verify_signature': False},
algorithms=['RS256'])
return user_data
def generate_jwt_token(token_type: str, now_time: int, data: dict = dict):
"""
used to generate jwt token
"""
if type(data) == type:
data = {}
"""Update data dictionary"""
data.update({
'token_type': token_type,
'iss': 'your_site_url',
'iat': timegm(datetime.utcnow().utctimetuple()),
'jti': uuid4().hex
})
"""Access and Refresh token"""
TOKEN_TYPE = ["access", "refresh"]
if token_type == TOKEN_TYPE[1]:
"""Refresh token"""
exp = now_time + settings.SIMPLE_JWT['REFRESH_TOKEN_LIFETIME']
else:
"""access token"""
exp = now_time + settings.SIMPLE_JWT['ACCESS_TOKEN_LIFETIME']
data.update({
"exp": timegm(exp.utctimetuple())
})
signing_key = secrets.token_hex(32)
return jwt.encode(payload=data, key=signing_key,
algorithm='HS256')
def get_token(data: dict = dict):
""" create access and refresh token """
now_time = datetime.utcnow()
"""generate access token"""
access = generate_jwt_token('access', now_time, data)
"""generate refresh token"""
refresh = generate_jwt_token('refresh', now_time, data)
return {
'access': access,
'refresh': refresh
}