Files
zod-backend/guardian/views.py
2023-08-10 15:30:46 +05:30

379 lines
18 KiB
Python

"""Views of Guardian"""
# django imports
# Import IsAuthenticated
# Import viewsets and status
# Import PageNumberPagination
# Import User
# Import timezone
from rest_framework.permissions import IsAuthenticated
from rest_framework import viewsets, status
from rest_framework.pagination import PageNumberPagination
from django.contrib.auth.models import User
from rest_framework.filters import SearchFilter
from django.utils import timezone
# Import guardian's model,
# Import junior's model,
# Import account's model,
# Import constant from
# base package,
# Import messages from
# base package,
# Import some functions
# from utils file
# Import account's serializer
# Import account's task
# Import notification constant
# Import send_notification function
from .serializers import (UserSerializer, CreateGuardianSerializer, TaskSerializer, TaskDetailsSerializer,
TopJuniorSerializer, ApproveJuniorSerializer, ApproveTaskSerializer,
GuardianDetailListSerializer)
from .models import Guardian, JuniorTask
from junior.models import Junior, JuniorPoints, JuniorGuardianRelationship
from account.models import UserEmailOtp, UserNotification
from .tasks import generate_otp
from account.utils import custom_response, custom_error_response, OTP_EXPIRY, send_otp_email
from base.messages import ERROR_CODE, SUCCESS_CODE
from base.constants import NUMBER, GUARDIAN_CODE_STATUS
from .utils import upload_image_to_alibaba
from notifications.constants import REGISTRATION, TASK_CREATED, LEADERBOARD_RANKING
from notifications.utils import send_notification
""" Define APIs """
# Define Signup API,
# update guardian profile,
# list of all task
# list of task according to the status of the task
# create task API
# search task by name of the task API
# top junior API,
# approve junior API
# approve task API
# Create your views here.
# create approve task API
class SignupViewset(viewsets.ModelViewSet):
"""Signup view set"""
queryset = User.objects.all()
serializer_class = UserSerializer
def create(self, request, *args, **kwargs):
"""Create user profile"""
if request.data['user_type'] in [str(NUMBER['one']), str(NUMBER['two'])]:
serializer = UserSerializer(context=request.data['user_type'], data=request.data)
if serializer.is_valid():
user = serializer.save()
"""Generate otp"""
otp = generate_otp()
# expire otp after 1 day
expiry = OTP_EXPIRY
# create user email otp object
UserEmailOtp.objects.create(email=request.data['email'], otp=otp,
user_type=str(request.data['user_type']), expired_at=expiry)
"""Send email to the register user"""
send_otp_email(request.data['email'], otp)
# send push notification for registration
send_notification.delay(REGISTRATION, None, user.id, {})
return custom_response(SUCCESS_CODE['3001'],
response_status=status.HTTP_200_OK)
return custom_error_response(serializer.errors, response_status=status.HTTP_400_BAD_REQUEST)
else:
return custom_error_response(ERROR_CODE['2028'], response_status=status.HTTP_400_BAD_REQUEST)
class UpdateGuardianProfile(viewsets.ViewSet):
"""Update guardian profile"""
serializer_class = CreateGuardianSerializer
permission_classes = [IsAuthenticated]
def create(self, request, *args, **kwargs):
"""Create guardian profile"""
try:
data = request.data
image = request.data.get('image')
image_url = ''
if image:
if image and image.size == NUMBER['zero']:
return custom_error_response(ERROR_CODE['2035'], response_status=status.HTTP_400_BAD_REQUEST)
filename = f"images/{image.name}"
# upload image on ali baba
image_url = upload_image_to_alibaba(image, filename)
data = {"image":image_url}
serializer = CreateGuardianSerializer(context={"user":request.user,
"first_name":request.data.get('first_name'),
"last_name": request.data.get('last_name'),
"image":image_url},
data=data)
if serializer.is_valid():
"""save serializer"""
serializer.save()
return custom_response(None, serializer.data, response_status=status.HTTP_200_OK)
return custom_error_response(serializer.errors, response_status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
return custom_error_response(str(e), response_status=status.HTTP_400_BAD_REQUEST)
class AllTaskListAPIView(viewsets.ModelViewSet):
"""Update guardian profile"""
serializer_class = TaskDetailsSerializer
permission_classes = [IsAuthenticated]
def list(self, request, *args, **kwargs):
"""Create guardian profile"""
queryset = JuniorTask.objects.filter(guardian__user=request.user)
# use TaskDetailsSerializer serializer
serializer = TaskDetailsSerializer(queryset, many=True)
return custom_response(None, serializer.data, response_status=status.HTTP_200_OK)
# class TaskListAPIView(viewsets.ModelViewSet):
# """Update guardian profile"""
# serializer_class = TaskDetailsSerializer
# permission_classes = [IsAuthenticated]
# pagination_class = PageNumberPagination
# http_method_names = ('get',)
#
# def list(self, request, *args, **kwargs):
# """Create guardian profile"""
# try:
# status_value = self.request.GET.get('status')
# search = self.request.GET.get('search')
# if search and str(status_value) == '0':
# queryset = JuniorTask.objects.filter(guardian__user=request.user,
# task_name__icontains=search).order_by('due_date', 'created_at')
# elif search and str(status_value) != '0':
# queryset = JuniorTask.objects.filter(guardian__user=request.user,task_name__icontains=search,
# task_status=status_value).order_by('due_date', 'created_at')
# if search is None and str(status_value) == '0':
# queryset = JuniorTask.objects.filter(guardian__user=request.user).order_by('due_date', 'created_at')
# elif search is None and str(status_value) != '0':
# queryset = JuniorTask.objects.filter(guardian__user=request.user,
# task_status=status_value).order_by('due_date','created_at')
# paginator = self.pagination_class()
# # use Pagination
# paginated_queryset = paginator.paginate_queryset(queryset, request)
# # use TaskDetailsSerializer serializer
# serializer = TaskDetailsSerializer(paginated_queryset, many=True)
# return custom_response(None, serializer.data, response_status=status.HTTP_200_OK)
# except Exception as e:
# return custom_error_response(str(e), response_status=status.HTTP_400_BAD_REQUEST)
class TaskListAPIView(viewsets.ModelViewSet):
"""Update guardian profile"""
serializer_class = TaskDetailsSerializer
permission_classes = [IsAuthenticated]
filter_backends = (SearchFilter,)
search_fields = ['task_name', ]
pagination_class = PageNumberPagination
http_method_names = ('get',)
def get_queryset(self):
queryset = JuniorTask.objects.filter(guardian__user=self.request.user
).prefetch_related('junior', 'junior__auth'
).order_by('due_date', 'created_at')
queryset = self.filter_queryset(queryset)
return queryset
def list(self, request, *args, **kwargs):
"""Create guardian profile"""
status_value = self.request.GET.get('status')
queryset = self.get_queryset()
if status_value and status_value != '0':
queryset = queryset.filter(task_status=status_value)
paginator = self.pagination_class()
# use Pagination
paginated_queryset = paginator.paginate_queryset(queryset, request)
# use TaskDetailsSerializer serializer
serializer = self.serializer_class(paginated_queryset, many=True)
return custom_response(None, serializer.data, response_status=status.HTTP_200_OK)
class CreateTaskAPIView(viewsets.ModelViewSet):
"""create task for junior"""
serializer_class = TaskSerializer
queryset = JuniorTask.objects.all()
http_method_names = ('post', )
def create(self, request, *args, **kwargs):
try:
image = request.data['default_image']
junior = request.data['junior']
allowed_extensions = ['.jpg', '.jpeg', '.png']
if not any(extension in str(image) for extension in allowed_extensions):
return custom_error_response(ERROR_CODE['2048'], response_status=status.HTTP_400_BAD_REQUEST)
if not junior.isnumeric():
"""junior value must be integer"""
return custom_error_response(ERROR_CODE['2047'], response_status=status.HTTP_400_BAD_REQUEST)
data = request.data
if 'https' in str(image):
image_data = image
else:
filename = f"images/{image}"
if image and image.size == NUMBER['zero']:
return custom_error_response(ERROR_CODE['2035'], response_status=status.HTTP_400_BAD_REQUEST)
image_url = upload_image_to_alibaba(image, filename)
image_data = image_url
data.pop('default_image')
# use TaskSerializer serializer
serializer = TaskSerializer(context={"user":request.user, "image":image_data}, data=data)
if serializer.is_valid():
# save serializer
serializer.save()
junior_id = Junior.objects.filter(id=junior).last()
send_notification.delay(TASK_CREATED, None, junior_id.auth.id, {})
return custom_response(SUCCESS_CODE['3018'], serializer.data, response_status=status.HTTP_200_OK)
return custom_error_response(serializer.errors, response_status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
return custom_error_response(str(e), response_status=status.HTTP_400_BAD_REQUEST)
class SearchTaskListAPIView(viewsets.ModelViewSet):
"""Update guardian profile"""
serializer_class = TaskDetailsSerializer
permission_classes = [IsAuthenticated]
pagination_class = PageNumberPagination
def get_queryset(self):
"""Get the queryset for the view"""
title = self.request.GET.get('title')
# fetch junior query
junior_queryset = JuniorTask.objects.filter(guardian__user=self.request.user, task_name__icontains=title)\
.order_by('due_date', 'created_at')
return junior_queryset
def list(self, request, *args, **kwargs):
"""Create guardian profile"""
try:
queryset = self.get_queryset()
paginator = self.pagination_class()
# use pagination
paginated_queryset = paginator.paginate_queryset(queryset, request)
# use TaskSerializer serializer
serializer = TaskDetailsSerializer(paginated_queryset, many=True)
return custom_response(None, serializer.data, response_status=status.HTTP_200_OK)
except Exception as e:
return custom_error_response(str(e), response_status=status.HTTP_400_BAD_REQUEST)
class TopJuniorListAPIView(viewsets.ModelViewSet):
"""Top juniors list"""
queryset = JuniorPoints.objects.all()
serializer_class = TopJuniorSerializer
permission_classes = [IsAuthenticated]
def get_serializer_context(self):
# context list
context = super().get_serializer_context()
context.update({'view': self})
return context
def list(self, request, *args, **kwargs):
"""Fetch junior list of those who complete their tasks"""
try:
junior_total_points = self.get_queryset().order_by('-total_points')
# Update the position field for each JuniorPoints object
for index, junior in enumerate(junior_total_points):
junior.position = index + 1
send_notification.delay(LEADERBOARD_RANKING, None, junior.junior.auth.id, {})
junior.save()
serializer = self.get_serializer(junior_total_points[:15], many=True)
return custom_response(None, serializer.data, response_status=status.HTTP_200_OK)
except Exception as e:
return custom_error_response(str(e), response_status=status.HTTP_400_BAD_REQUEST)
class ApproveJuniorAPIView(viewsets.ViewSet):
"""approve junior by guardian"""
serializer_class = ApproveJuniorSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
"""Get the queryset for the view"""
guardian = Guardian.objects.filter(user__email=self.request.user).last()
# fetch junior query
junior_queryset = Junior.objects.filter(id=self.request.data.get('junior_id')).last()
return guardian, junior_queryset
def create(self, request, *args, **kwargs):
""" junior list"""
try:
queryset = self.get_queryset()
# action 1 is use for approve and 2 for reject
if request.data['action'] == '1':
# use ApproveJuniorSerializer serializer
serializer = ApproveJuniorSerializer(context={"guardian_code": queryset[0].guardian_code,
"junior": queryset[1], "action": request.data['action']},
data=request.data)
if serializer.is_valid():
# save serializer
serializer.save()
return custom_response(SUCCESS_CODE['3023'], serializer.data, response_status=status.HTTP_200_OK)
else:
queryset[1].guardian_code = None
queryset[1].guardian_code_status = str(NUMBER['one'])
queryset[1].save()
return custom_response(SUCCESS_CODE['3024'], response_status=status.HTTP_200_OK)
except Exception as e:
return custom_error_response(str(e), response_status=status.HTTP_400_BAD_REQUEST)
class ApproveTaskAPIView(viewsets.ViewSet):
"""approve junior by guardian"""
serializer_class = ApproveTaskSerializer
permission_classes = [IsAuthenticated]
def get_queryset(self):
"""Get the queryset for the view"""
guardian = Guardian.objects.filter(user__email=self.request.user).last()
# task query
task_queryset = JuniorTask.objects.filter(id=self.request.data.get('task_id'),
guardian=guardian,
junior=self.request.data.get('junior_id')).last()
return guardian, task_queryset
def create(self, request, *args, **kwargs):
""" junior list"""
# action 1 is use for approve and 2 for reject
try:
queryset = self.get_queryset()
# use ApproveJuniorSerializer serializer
serializer = ApproveTaskSerializer(context={"guardian_code": queryset[0].guardian_code,
"task_instance": queryset[1],
"action": str(request.data['action']),
"junior": self.request.data['junior_id']},
data=request.data)
unexpected_task_status = [str(NUMBER['five']), str(NUMBER['six'])]
if (str(request.data['action']) == str(NUMBER['one']) and serializer.is_valid()
and queryset[1] and queryset[1].task_status not in unexpected_task_status):
# save serializer
serializer.save()
return custom_response(SUCCESS_CODE['3025'], response_status=status.HTTP_200_OK)
elif (str(request.data['action']) == str(NUMBER['two']) and serializer.is_valid()
and queryset[1] and queryset[1].task_status not in unexpected_task_status):
# save serializer
serializer.save()
return custom_response(SUCCESS_CODE['3026'], response_status=status.HTTP_200_OK)
else:
return custom_response(ERROR_CODE['2038'], response_status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
return custom_error_response(str(e), response_status=status.HTTP_400_BAD_REQUEST)
class GuardianListAPIView(viewsets.ModelViewSet):
"""Guardian list of assosicated junior"""
serializer_class = GuardianDetailListSerializer
permission_classes = [IsAuthenticated]
http_method_names = ('get',)
def list(self, request, *args, **kwargs):
""" junior list"""
try:
guardian_data = JuniorGuardianRelationship.objects.filter(junior__auth__email=self.request.user)
# fetch junior object
if guardian_data:
# use GuardianDetailListSerializer serializer
serializer = GuardianDetailListSerializer(guardian_data, many=True)
return custom_response(None, serializer.data, response_status=status.HTTP_200_OK)
return custom_response({"status": GUARDIAN_CODE_STATUS[1][0]}, response_status=status.HTTP_200_OK)
except Exception as e:
return custom_error_response(str(e), response_status=status.HTTP_400_BAD_REQUEST)