Files
zod-backend/account/utils.py
2023-08-14 12:13:14 +05:30

237 lines
7.7 KiB
Python

"""Account utils"""
"""Import django"""
from django.conf import settings
from rest_framework import viewsets, status
from rest_framework.response import Response
"""Third party Django app"""
from templated_email import send_templated_mail
import jwt
import string
from datetime import datetime
from calendar import timegm
from uuid import uuid4
# Import secrets module for generating random number
import secrets
from rest_framework import serializers
# Django App Import
# Import models from junior App,
# Import models from guardian App,
# Import models from account App,
# Import messages from base package"""
from junior.models import Junior
from guardian.models import Guardian
from account.models import UserDelete
from base.messages import ERROR_CODE
from django.utils import timezone
from base.constants import NUMBER
from junior.models import JuniorPoints
# Define delete
# user account condition,
# Define delete
# user account
# condition for social
# login account,
# Update junior account,
# Update guardian account,
# Define custom email for otp verification,
# Define support email for user's query,
# Define custom success response,
# Define custom error response,
# Generate access token,
# refresh token by using jwt,
# Define function for generating
# guardian code, junior code,
# referral code,
# Define function for generating
# alphanumeric code
# otp expiry
def delete_user_account_condition(user, user_type_data, user_type, user_tb, data, random_num):
"""delete user account"""
if user_type == '1' and user_type_data == '1':
junior_account_update(user_tb)
elif user_type == '2' and user_type_data == '2':
guardian_account_update(user_tb)
else:
raise serializers.ValidationError({"details": ERROR_CODE['2030'], "code": "400", "status": "failed"})
user_tb.email = str(random_num) + str('@D_') + '{}'.format(user_tb.username).lower()
user_tb.username = str(random_num) + str('@D_') + '{}'.format(user_tb.username).lower()
d_email = user_tb.email
o_mail = user.email
# update user email with dummy email
user_tb.save()
"""create object in user delete model"""
instance = UserDelete.objects.create(user=user_tb, d_email=d_email, old_email=o_mail,
is_active=True, reason=data)
return instance
def delete_user_account_condition_social(user, user_type,user_tb, data, random_num):
"""delete user account"""
if user_type == '1':
junior_account_update(user_tb)
elif user_type == '2':
guardian_account_update(user_tb)
else:
raise serializers.ValidationError({"details": ERROR_CODE['2030'], "code": "400", "status": "failed"})
user_tb.email = str(random_num) + str('@D_') + '{}'.format(user_tb.username).lower()
user_tb.username = str(random_num) + str('@D_') + '{}'.format(user_tb.username).lower()
dummy_email = user_tb.email
old_mail = user.email
# update user email with dummy email
user_tb.save()
"""create object in user delete model"""
instance_data = UserDelete.objects.create(user=user_tb, d_email=dummy_email, old_email=old_mail,
is_active=True, reason=data)
return instance_data
def junior_account_update(user_tb):
"""junior account delete"""
junior_data = Junior.objects.filter(auth__email=user_tb.email).first()
if junior_data:
# Update junior account
junior_data.is_active = False
junior_data.is_verified = False
junior_data.guardian_code = '{}'
junior_data.guardian_code_status = str(NUMBER['one'])
junior_data.save()
JuniorPoints.objects.filter(junior=junior_data).delete()
def guardian_account_update(user_tb):
"""update guardian account after delete the user account"""
guardian_data = Guardian.objects.filter(user__email=user_tb.email).first()
if guardian_data:
# Update guardian account
guardian_data.is_active = False
guardian_data.is_verified = False
guardian_data.save()
jun_data = Junior.objects.filter(guardian_code__icontains=str(guardian_data.guardian_code))
"""Disassociate relation between guardian and junior"""
for data in jun_data:
data.guardian_code.remove(guardian_data.guardian_code)
data.save()
def send_otp_email(recipient_email, otp):
"""Send otp on email with template"""
from_email = settings.EMAIL_FROM_ADDRESS
recipient_list = [recipient_email]
"""Send otp on email"""
send_templated_mail(
template_name='email_otp_verification.email',
from_email=from_email,
recipient_list=recipient_list,
context={
'otp': otp
}
)
return otp
def send_support_email(name, sender, subject, message):
"""Send otp on email with template"""
to_email = [settings.EMAIL_FROM_ADDRESS]
from_email = settings.DEFAULT_ADDRESS
"""Send support email to zod bank support team"""
send_templated_mail(
template_name='support_mail.email',
from_email=from_email,
recipient_list=to_email,
context={
'name': name.title(),
'sender': sender,
'subject': subject,
'message': message
}
)
return name
def custom_response(detail, data=None, response_status=status.HTTP_200_OK, count=None):
"""Custom response code"""
if not data:
"""when data is none"""
data = None
return Response({"data": data, "message": detail, "status": "success", "code": response_status, "count": count})
def custom_error_response(detail, response_status):
"""
function is used for getting same global error response for all
:param detail: error message .
:param response_status: http status.
:return: Json response
"""
if not detail:
"""when details is empty"""
detail = {}
return Response({"error": detail, "status": "failed", "code": response_status})
def get_user_data(attrs):
"""
used to decode token
"""
user_data = jwt.decode(jwt=attrs['token'], options={'verify_signature': False},
algorithms=['RS256'])
return user_data
def generate_jwt_token(token_type: str, now_time: int, data: dict = dict):
"""
used to generate jwt token
"""
if type(data) == type:
data = {}
"""Update data dictionary"""
data.update({
'token_type': token_type,
'iss': 'your_site_url',
'iat': timegm(datetime.utcnow().utctimetuple()),
'jti': uuid4().hex
})
"""Access and Refresh token"""
TOKEN_TYPE = ["access", "refresh"]
if token_type == TOKEN_TYPE[1]:
"""Refresh token"""
exp = now_time + settings.SIMPLE_JWT['REFRESH_TOKEN_LIFETIME']
else:
"""access token"""
exp = now_time + settings.SIMPLE_JWT['ACCESS_TOKEN_LIFETIME']
data.update({
"exp": timegm(exp.utctimetuple())
})
signing_key = secrets.token_hex(32)
return jwt.encode(payload=data, key=signing_key,
algorithm='HS256')
def get_token(data: dict = dict):
""" create access and refresh token """
now_time = datetime.utcnow()
"""generate access token"""
access = generate_jwt_token('access', now_time, data)
"""generate refresh token"""
refresh = generate_jwt_token('refresh', now_time, data)
return {
'access': access,
'refresh': refresh
}
def generate_alphanumeric_code(length):
"""Generate alphanumeric code"""
alphabet = string.ascii_letters + string.digits
code = ''.join(secrets.choice(alphabet) for _ in range(length))
return code
def generate_code(value, user_id):
"""generate referral, junior and guardian code"""
code = value + str(user_id).zfill(3)
return code
OTP_EXPIRY = timezone.now() + timezone.timedelta(days=1)