Files
zod-backend/account/utils.py
2023-07-12 16:24:30 +05:30

190 lines
6.2 KiB
Python

"""Account utils"""
"""Import django"""
from django.conf import settings
from rest_framework import viewsets, status
from rest_framework.response import Response
"""Third party Django app"""
from templated_email import send_templated_mail
import jwt
import string
from datetime import datetime
from calendar import timegm
from uuid import uuid4
import secrets
from rest_framework import serializers
from junior.models import Junior
from guardian.models import Guardian
from account.models import UserDelete
from base.messages import ERROR_CODE
def delete_user_account_condition(user, user_type_data, user_type, user_tb, data, random_num):
"""delete user account"""
if user_type == '1' and user_type_data == '1':
junior_account_update(user_tb)
elif user_type == '2' and user_type_data == '2':
guardian_account_update(user_tb)
else:
raise serializers.ValidationError({"details": ERROR_CODE['2030'], "code": "400", "status": "failed"})
user_tb.email = str(random_num) + str('@D_') + '{}'.format(user_tb.username).lower()
user_tb.username = str(random_num) + str('@D_') + '{}'.format(user_tb.username).lower()
user_tb.password = 'None'
d_email = user_tb.email
o_mail = user.email
user_tb.save()
instance = UserDelete.objects.create(user=user_tb, d_email=d_email, old_email=o_mail,
is_active=True, reason=data)
return instance
def delete_user_account_condition_social(user, user_type,user_tb, data, random_num):
"""delete user account"""
if user_type == '1':
junior_account_update(user_tb)
elif user_type == '2':
guardian_account_update(user_tb)
else:
raise serializers.ValidationError({"details": ERROR_CODE['2030'], "code": "400", "status": "failed"})
user_tb.email = str(random_num) + str('@D_') + '{}'.format(user_tb.username).lower()
user_tb.username = str(random_num) + str('@D_') + '{}'.format(user_tb.username).lower()
user_tb.password = 'None'
dummy_email = user_tb.email
old_mail = user.email
user_tb.save()
instance_data = UserDelete.objects.create(user=user_tb, d_email=dummy_email, old_email=old_mail,
is_active=True, reason=data)
return instance_data
def junior_account_update(user_tb):
"""junior account delete"""
junior_data = Junior.objects.filter(auth__email=user_tb.email).first()
if junior_data:
junior_data.is_active = False
junior_data.is_verified = False
junior_data.guardian_code = '{}'
junior_data.save()
def guardian_account_update(user_tb):
"""update guardian account after delete the user account"""
guardian_data = Guardian.objects.filter(user__email=user_tb.email).first()
if guardian_data:
guardian_data.is_active = False
guardian_data.is_verified = False
guardian_data.save()
jun_data = Junior.objects.filter(guardian_code__icontains=str(guardian_data.guardian_code))
for data in jun_data:
data.guardian_code.remove(guardian_data.guardian_code)
data.save()
def send_otp_email(recipient_email, otp):
"""Send otp on email with template"""
from_email = settings.EMAIL_FROM_ADDRESS
recipient_list = [recipient_email]
send_templated_mail(
template_name='email_otp_verification.email',
from_email=from_email,
recipient_list=recipient_list,
context={
'otp': otp
}
)
return otp
def send_support_email(name, sender, subject, message):
"""Send otp on email with template"""
to_email = [settings.EMAIL_FROM_ADDRESS]
from_email = settings.DEFAULT_ADDRESS
send_templated_mail(
template_name='support_mail.email',
from_email=from_email,
recipient_list=to_email,
context={
'name': name.title(),
'sender': sender,
'subject': subject,
'message': message
}
)
return name
def custom_response(detail, data=None, response_status=status.HTTP_200_OK):
"""Custom response code"""
if not data:
"""when data is none"""
data = None
return Response({"data": data, "message": detail, "status": "success", "code": response_status})
def custom_error_response(detail, response_status):
"""
function is used for getting same global error response for all
:param detail: error message .
:param response_status: http status.
:return: Json response
"""
if not detail:
"""when details is empty"""
detail = {}
return Response({"error": detail, "status": "failed", "code": response_status})
def get_user_data(attrs):
"""
used to decode token
"""
user_data = jwt.decode(jwt=attrs['token'], options={'verify_signature': False},
algorithms=['RS256'])
return user_data
def generate_jwt_token(token_type: str, now_time: int, data: dict = dict):
"""
used to generate jwt token
"""
if type(data) == type:
data = {}
"""Update data dictionary"""
data.update({
'token_type': token_type,
'iss': 'your_site_url',
'iat': timegm(datetime.utcnow().utctimetuple()),
'jti': uuid4().hex
})
"""Access and Refresh token"""
TOKEN_TYPE = ["access", "refresh"]
if token_type == TOKEN_TYPE[1]:
"""Refresh token"""
exp = now_time + settings.SIMPLE_JWT['REFRESH_TOKEN_LIFETIME']
else:
"""access token"""
exp = now_time + settings.SIMPLE_JWT['ACCESS_TOKEN_LIFETIME']
data.update({
"exp": timegm(exp.utctimetuple())
})
signing_key = secrets.token_hex(32)
return jwt.encode(payload=data, key=signing_key,
algorithm='HS256')
def get_token(data: dict = dict):
""" create access and refresh token """
now_time = datetime.utcnow()
"""generate access token"""
access = generate_jwt_token('access', now_time, data)
"""generate refresh token"""
refresh = generate_jwt_token('refresh', now_time, data)
return {
'access': access,
'refresh': refresh
}
def generate_alphanumeric_code(length):
alphabet = string.ascii_letters + string.digits
code = ''.join(secrets.choice(alphabet) for _ in range(length))
return code