Files
zod-backend/guardian/views.py

404 lines
19 KiB
Python

"""Views of Guardian"""
import math
# django imports
# Import IsAuthenticated
# Import viewsets and status
# Import PageNumberPagination
# Import User
# Import timezone
from django.db.models import F, Window
from django.db.models.functions.window import Rank
from rest_framework.permissions import IsAuthenticated
from rest_framework import viewsets, status
from rest_framework.pagination import PageNumberPagination
from django.contrib.auth.models import User
from base.constants import guardian_code_tuple
from rest_framework.filters import SearchFilter
from django.utils import timezone
from base.pagination import CustomPageNumberPagination
# Import guardian's model,
# Import junior's model,
# Import account's model,
# Import constant from
# base package,
# Import messages from
# base package,
# Import some functions
# from utils file
# Import account's serializer
# Import account's task
# Import notification constant
# Import send_notification function
from .serializers import (UserSerializer, CreateGuardianSerializer, TaskSerializer, TaskDetailsSerializer,
TopJuniorSerializer, ApproveJuniorSerializer, ApproveTaskSerializer,
GuardianDetailListSerializer)
from .models import Guardian, JuniorTask
from junior.models import Junior, JuniorPoints, JuniorGuardianRelationship
from account.models import UserEmailOtp, UserNotification, UserDeviceDetails
from .tasks import generate_otp
from account.utils import custom_response, custom_error_response, send_otp_email, task_status_fun
from base.messages import ERROR_CODE, SUCCESS_CODE
from base.constants import NUMBER, GUARDIAN_CODE_STATUS, GUARDIAN
from .utils import upload_image_to_alibaba
from notifications.constants import REGISTRATION, TASK_ASSIGNED, ASSOCIATE_APPROVED, ASSOCIATE_REJECTED
from notifications.utils import send_notification
""" Define APIs """
# Define Signup API,
# update guardian profile,
# list of all task
# list of task according to the status of the task
# create task API
# search task by name of the task API
# top junior API,
# approve junior API
# approve task API
# Create your views here.
# create approve task API
class SignupViewset(viewsets.ModelViewSet):
"""Signup view set"""
queryset = User.objects.all()
serializer_class = UserSerializer
http_method_names = ('post',)
def create(self, request, *args, **kwargs):
"""Create user profile"""
device_id = request.META.get('HTTP_DEVICE_ID')
if request.data['user_type'] in [str(NUMBER['one']), str(NUMBER['two'])]:
serializer = UserSerializer(context=request.data['user_type'], data=request.data)
if serializer.is_valid():
user = serializer.save()
"""Generate otp"""
otp = generate_otp()
# expire otp after 1 day
expiry = timezone.now() + timezone.timedelta(days=1)
# create user email otp object
UserEmailOtp.objects.create(email=request.data['email'], otp=otp,
user_type=str(request.data['user_type']), expired_at=expiry)
"""Send email to the register user"""
send_otp_email.delay(request.data['email'], otp)
UserDeviceDetails.objects.create(user=user, device_id=device_id)
return custom_response(SUCCESS_CODE['3001'],
response_status=status.HTTP_200_OK)
return custom_error_response(serializer.errors, response_status=status.HTTP_400_BAD_REQUEST)
else:
return custom_error_response(ERROR_CODE['2028'], response_status=status.HTTP_400_BAD_REQUEST)
class UpdateGuardianProfile(viewsets.ModelViewSet):
"""Update guardian profile"""
serializer_class = CreateGuardianSerializer
permission_classes = [IsAuthenticated]
http_method_names = ('post',)
def create(self, request, *args, **kwargs):
"""Create guardian profile"""
try:
data = request.data
image = request.data.get('image')
image_url = ''
if image:
if image and image.size == NUMBER['zero']:
return custom_error_response(ERROR_CODE['2035'], response_status=status.HTTP_400_BAD_REQUEST)
filename = f"images/{image.name}"
# upload image on ali baba
image_url = upload_image_to_alibaba(image, filename)
data = {"image":image_url}
serializer = CreateGuardianSerializer(context={"user":request.user,
"first_name":request.data.get('first_name'),
"last_name": request.data.get('last_name'),
"image":image_url},
data=data)
if serializer.is_valid():
"""save serializer"""
serializer.save()
return custom_response(None, serializer.data, response_status=status.HTTP_200_OK)
return custom_error_response(serializer.errors, response_status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
return custom_error_response(str(e), response_status=status.HTTP_400_BAD_REQUEST)
class AllTaskListAPIView(viewsets.ModelViewSet):
"""Update guardian profile"""
serializer_class = TaskDetailsSerializer
permission_classes = [IsAuthenticated]
def list(self, request, *args, **kwargs):
"""Create guardian profile"""
queryset = JuniorTask.objects.filter(guardian__user=request.user)
# use TaskDetailsSerializer serializer
serializer = TaskDetailsSerializer(queryset, many=True)
return custom_response(None, serializer.data, response_status=status.HTTP_200_OK)
class TaskListAPIView(viewsets.ModelViewSet):
"""Task list
Params
status
search
page
junior"""
serializer_class = TaskDetailsSerializer
permission_classes = [IsAuthenticated]
filter_backends = (SearchFilter,)
search_fields = ['task_name', ]
pagination_class = PageNumberPagination
http_method_names = ('get',)
def get_queryset(self):
queryset = JuniorTask.objects.filter(guardian__user=self.request.user
).select_related('junior', 'junior__auth'
).order_by('due_date', 'created_at')
queryset = self.filter_queryset(queryset)
return queryset
def list(self, request, *args, **kwargs):
"""Create guardian profile"""
status_value = self.request.GET.get('status')
junior = self.request.GET.get('junior')
queryset = self.get_queryset()
task_status = task_status_fun(status_value)
if status_value:
queryset = queryset.filter(task_status__in=task_status)
if junior:
queryset = queryset.filter(junior=int(junior))
paginator = CustomPageNumberPagination()
# use Pagination
paginated_queryset = paginator.paginate_queryset(queryset, request)
# use TaskDetailsSerializer serializer
serializer = self.serializer_class(paginated_queryset, many=True)
return paginator.get_paginated_response(serializer.data)
class CreateTaskAPIView(viewsets.ModelViewSet):
"""create task for junior"""
serializer_class = TaskSerializer
queryset = JuniorTask.objects.all()
http_method_names = ('post', )
def create(self, request, *args, **kwargs):
"""
image should be in form data
"""
try:
image = request.data['default_image']
juniors = request.data['junior'].split(',')
for junior in juniors:
junior_id = Junior.objects.filter(id=junior).last()
if junior_id:
guardian_data = Guardian.objects.filter(user=request.user).last()
index = junior_id.guardian_code.index(guardian_data.guardian_code)
status_index = junior_id.guardian_code_status[index]
if status_index == str(NUMBER['three']):
return custom_error_response(ERROR_CODE['2078'], response_status=status.HTTP_400_BAD_REQUEST)
allowed_extensions = ['.jpg', '.jpeg', '.png']
if not any(extension in str(image) for extension in allowed_extensions):
return custom_error_response(ERROR_CODE['2048'], response_status=status.HTTP_400_BAD_REQUEST)
if not junior.isnumeric():
"""junior value must be integer"""
return custom_error_response(ERROR_CODE['2047'], response_status=status.HTTP_400_BAD_REQUEST)
data = request.data
if 'https' in str(image):
image_data = image
else:
filename = f"images/{image}"
if image and image.size == NUMBER['zero']:
return custom_error_response(ERROR_CODE['2035'],
response_status=status.HTTP_400_BAD_REQUEST)
image_url = upload_image_to_alibaba(image, filename)
image_data = image_url
data.pop('default_image')
junior_data = data.pop('junior')
# use TaskSerializer serializer
serializer = TaskSerializer(context={"user":request.user, "image":image_data,
"junior_data":junior_data}, data=data)
if serializer.is_valid():
# save serializer
task = serializer.save()
send_notification.delay(TASK_ASSIGNED, request.auth.payload['user_id'], GUARDIAN,
junior_id.auth.id, {'task_id': task.id})
return custom_response(SUCCESS_CODE['3018'], response_status=status.HTTP_200_OK)
return custom_error_response(serializer.errors, response_status=status.HTTP_400_BAD_REQUEST)
else:
return custom_error_response(ERROR_CODE['2047'], response_status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
return custom_error_response(str(e), response_status=status.HTTP_400_BAD_REQUEST)
class SearchTaskListAPIView(viewsets.ModelViewSet):
"""Filter task"""
serializer_class = TaskDetailsSerializer
permission_classes = [IsAuthenticated]
pagination_class = PageNumberPagination
http_method_names = ('get',)
def get_queryset(self):
"""Get the queryset for the view"""
title = self.request.GET.get('title')
# fetch junior query
junior_queryset = JuniorTask.objects.filter(guardian__user=self.request.user, task_name__icontains=title)\
.order_by('due_date', 'created_at')
return junior_queryset
def list(self, request, *args, **kwargs):
"""Filter task"""
try:
queryset = self.get_queryset()
paginator = self.pagination_class()
# use pagination
paginated_queryset = paginator.paginate_queryset(queryset, request)
# use TaskSerializer serializer
serializer = TaskDetailsSerializer(paginated_queryset, many=True)
return custom_response(None, serializer.data, response_status=status.HTTP_200_OK)
except Exception as e:
return custom_error_response(str(e), response_status=status.HTTP_400_BAD_REQUEST)
class TopJuniorListAPIView(viewsets.ModelViewSet):
"""Top juniors list
No Params"""
serializer_class = TopJuniorSerializer
permission_classes = [IsAuthenticated]
http_method_names = ('get',)
def get_serializer_context(self):
# context list
context = super().get_serializer_context()
context.update({'view': self})
return context
def get_queryset(self):
queryset = JuniorPoints.objects.filter(
junior__is_verified=True
).select_related(
'junior', 'junior__auth'
).annotate(rank=Window(
expression=Rank(),
order_by=[F('total_points').desc(), 'junior__created_at'])
).order_by('-total_points', 'junior__created_at')
return queryset
def list(self, request, *args, **kwargs):
"""Fetch junior list of those who complete their tasks"""
try:
junior_total_points = self.get_queryset()[:15]
serializer = self.get_serializer(junior_total_points, many=True)
return custom_response(None, serializer.data, response_status=status.HTTP_200_OK)
except Exception as e:
return custom_error_response(str(e), response_status=status.HTTP_400_BAD_REQUEST)
class ApproveJuniorAPIView(viewsets.ModelViewSet):
"""approve junior by guardian"""
serializer_class = ApproveJuniorSerializer
permission_classes = [IsAuthenticated]
http_method_names = ('post',)
def create(self, request, *args, **kwargs):
""" Use below param
{"junior_id":"75",
"action":"1"}
"""
try:
guardian = Guardian.objects.filter(user__email=self.request.user).last()
# fetch junior query
junior_queryset = Junior.objects.filter(id=self.request.data.get('junior_id')).last()
if junior_queryset and (junior_queryset.is_deleted or not junior_queryset.is_active):
return custom_error_response(ERROR_CODE['2073'], response_status=status.HTTP_400_BAD_REQUEST)
# action 1 is use for approve and 2 for reject
if request.data['action'] == '1':
# use ApproveJuniorSerializer serializer
serializer = ApproveJuniorSerializer(context={"guardian_code": guardian.guardian_code,
"junior": junior_queryset,
"action": request.data['action']},
data=request.data)
if serializer.is_valid():
# save serializer
serializer.save()
send_notification.delay(ASSOCIATE_APPROVED, guardian.user.id, GUARDIAN,
junior_queryset.auth.id, {})
return custom_response(SUCCESS_CODE['3023'], serializer.data, response_status=status.HTTP_200_OK)
else:
if junior_queryset.guardian_code and ('-' in junior_queryset.guardian_code):
junior_queryset.guardian_code.remove('-')
if junior_queryset.guardian_code_status and ('-' in junior_queryset.guardian_code_status):
junior_queryset.guardian_code_status.remove('-')
index = junior_queryset.guardian_code.index(guardian.guardian_code)
junior_queryset.guardian_code.remove(guardian.guardian_code)
junior_queryset.guardian_code_status.pop(index)
junior_queryset.save()
send_notification.delay(ASSOCIATE_REJECTED, guardian.user.id, GUARDIAN, junior_queryset.auth.id, {})
return custom_response(SUCCESS_CODE['3024'], response_status=status.HTTP_200_OK)
except Exception as e:
return custom_error_response(str(e), response_status=status.HTTP_400_BAD_REQUEST)
class ApproveTaskAPIView(viewsets.ModelViewSet):
"""approve junior by guardian"""
serializer_class = ApproveTaskSerializer
permission_classes = [IsAuthenticated]
http_method_names = ('post',)
def create(self, request, *args, **kwargs):
""" Params
{"junior_id":"82",
"task_id":"43",
"action":"1"}
action 1 for approve
2 for reject
"""
# action 1 is use for approve and 2 for reject
try:
guardian = Guardian.objects.filter(user__email=self.request.user).last()
# task query
task_queryset = JuniorTask.objects.filter(id=self.request.data.get('task_id'),
guardian=guardian,
junior=self.request.data.get('junior_id')).last()
if task_queryset and guardian.guardian_code not in task_queryset.junior.guardian_code:
return custom_error_response(ERROR_CODE['2084'], response_status=status.HTTP_400_BAD_REQUEST)
if task_queryset and (task_queryset.junior.is_deleted or not task_queryset.junior.is_active):
return custom_error_response(ERROR_CODE['2072'], response_status=status.HTTP_400_BAD_REQUEST)
# use ApproveJuniorSerializer serializer
serializer = ApproveTaskSerializer(context={"guardian_code": guardian.guardian_code,
"task_instance": task_queryset,
"action": str(request.data['action']),
"junior": self.request.data['junior_id']},
data=request.data)
unexpected_task_status = [str(NUMBER['five']), str(NUMBER['six'])]
if (str(request.data['action']) == str(NUMBER['one']) and serializer.is_valid()
and task_queryset and task_queryset.task_status not in unexpected_task_status):
# save serializer
serializer.save()
return custom_response(SUCCESS_CODE['3025'], response_status=status.HTTP_200_OK)
elif (str(request.data['action']) == str(NUMBER['two']) and serializer.is_valid()
and task_queryset and task_queryset.task_status not in unexpected_task_status):
# save serializer
serializer.save()
return custom_response(SUCCESS_CODE['3026'], response_status=status.HTTP_200_OK)
else:
return custom_error_response(ERROR_CODE['2038'], response_status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
return custom_error_response(str(e), response_status=status.HTTP_400_BAD_REQUEST)
#
class GuardianListAPIView(viewsets.ModelViewSet):
"""Guardian list of assosicated junior"""
serializer_class = GuardianDetailListSerializer
permission_classes = [IsAuthenticated]
http_method_names = ('get',)
def list(self, request, *args, **kwargs):
""" Guardian list of assosicated junior
No Params"""
try:
guardian_data = JuniorGuardianRelationship.objects.filter(junior__auth__email=self.request.user)
# fetch junior object
if guardian_data:
# use GuardianDetailListSerializer serializer
serializer = GuardianDetailListSerializer(guardian_data, many=True)
return custom_response(None, serializer.data, response_status=status.HTTP_200_OK)
return custom_response({"status": GUARDIAN_CODE_STATUS[1][0]}, response_status=status.HTTP_200_OK)
except Exception as e:
return custom_error_response(str(e), response_status=status.HTTP_400_BAD_REQUEST)