mirror of
https://github.com/HamzaSha1/zod-backend.git
synced 2025-07-15 18:07:02 +00:00
103 lines
2.8 KiB
Python
103 lines
2.8 KiB
Python
"""Account utils"""
|
|
"""Import django"""
|
|
from django.conf import settings
|
|
from rest_framework import viewsets, status
|
|
from rest_framework.response import Response
|
|
"""Third party Django app"""
|
|
from templated_email import send_templated_mail
|
|
import jwt
|
|
from datetime import datetime
|
|
from calendar import timegm
|
|
from uuid import uuid4
|
|
import secrets
|
|
|
|
def send_otp_email(recipient_email, otp):
|
|
"""Send otp on email with template"""
|
|
from_email = settings.EMAIL_FROM_ADDRESS
|
|
recipient_list = [recipient_email]
|
|
send_templated_mail(
|
|
template_name='email_otp_verification.email',
|
|
from_email=from_email,
|
|
recipient_list=recipient_list,
|
|
context={
|
|
'otp': otp
|
|
}
|
|
)
|
|
return otp
|
|
|
|
def custom_response(detail, data=None, response_status=status.HTTP_200_OK):
|
|
"""Custom response code"""
|
|
if not data:
|
|
"""when data is none"""
|
|
data = None
|
|
return Response({"data": data, "message": detail, "status": "success", "code": response_status})
|
|
|
|
|
|
def custom_error_response(detail, response_status):
|
|
"""
|
|
function is used for getting same global error response for all
|
|
:param detail: error message .
|
|
:param response_status: http status.
|
|
:return: Json response
|
|
"""
|
|
if not detail:
|
|
"""when details is empty"""
|
|
detail = {}
|
|
return Response({"error": detail, "status": "failed", "code": response_status})
|
|
|
|
|
|
def get_user_data(attrs):
|
|
"""
|
|
used to decode token
|
|
"""
|
|
user_data = jwt.decode(jwt=attrs['token'], options={'verify_signature': False},
|
|
algorithms=['RS256'])
|
|
return user_data
|
|
|
|
|
|
def generate_jwt_token(token_type: str, now_time: int, data: dict = dict):
|
|
"""
|
|
used to generate jwt token
|
|
"""
|
|
if type(data) == type:
|
|
data = {}
|
|
"""Update data dictionary"""
|
|
data.update({
|
|
'token_type': token_type,
|
|
'iss': 'your_site_url',
|
|
'iat': timegm(datetime.utcnow().utctimetuple()),
|
|
'jti': uuid4().hex
|
|
})
|
|
"""Access and Refresh token"""
|
|
TOKEN_TYPE = ["access", "refresh"]
|
|
if token_type == TOKEN_TYPE[1]:
|
|
"""Refresh token"""
|
|
exp = now_time + settings.SIMPLE_JWT['REFRESH_TOKEN_LIFETIME']
|
|
else:
|
|
"""access token"""
|
|
exp = now_time + settings.SIMPLE_JWT['ACCESS_TOKEN_LIFETIME']
|
|
|
|
data.update({
|
|
"exp": timegm(exp.utctimetuple())
|
|
})
|
|
|
|
|
|
signing_key = secrets.token_hex(32)
|
|
|
|
return jwt.encode(payload=data, key=signing_key,
|
|
algorithm='HS256')
|
|
|
|
|
|
def get_token(data: dict = dict):
|
|
""" create access and refresh token """
|
|
now_time = datetime.utcnow()
|
|
"""generate access token"""
|
|
access = generate_jwt_token('access', now_time, data)
|
|
"""generate refresh token"""
|
|
refresh = generate_jwt_token('refresh', now_time, data)
|
|
|
|
return {
|
|
'access': access,
|
|
'refresh': refresh
|
|
}
|