"""Views of Guardian""" # django imports # Import IsAuthenticated # Import viewsets and status # Import PageNumberPagination # Import User # Import timezone from rest_framework.permissions import IsAuthenticated from rest_framework import viewsets, status from rest_framework.pagination import PageNumberPagination from django.contrib.auth.models import User from base.constants import guardian_code_tuple from rest_framework.filters import SearchFilter from django.utils import timezone # Import guardian's model, # Import junior's model, # Import account's model, # Import constant from # base package, # Import messages from # base package, # Import some functions # from utils file # Import account's serializer # Import account's task # Import notification constant # Import send_notification function from .serializers import (UserSerializer, CreateGuardianSerializer, TaskSerializer, TaskDetailsSerializer, TopJuniorSerializer, ApproveJuniorSerializer, ApproveTaskSerializer, GuardianDetailListSerializer) from .models import Guardian, JuniorTask from junior.models import Junior, JuniorPoints, JuniorGuardianRelationship from account.models import UserEmailOtp, UserNotification, UserDeviceDetails from .tasks import generate_otp from account.utils import custom_response, custom_error_response, OTP_EXPIRY, send_otp_email from base.messages import ERROR_CODE, SUCCESS_CODE from base.constants import NUMBER, GUARDIAN_CODE_STATUS from .utils import upload_image_to_alibaba from notifications.constants import REGISTRATION, TASK_ASSIGNED, ASSOCIATE_APPROVED, ASSOCIATE_REJECTED from notifications.utils import send_notification_to_junior """ Define APIs """ # Define Signup API, # update guardian profile, # list of all task # list of task according to the status of the task # create task API # search task by name of the task API # top junior API, # approve junior API # approve task API # Create your views here. # create approve task API class SignupViewset(viewsets.ModelViewSet): """Signup view set""" queryset = User.objects.all() serializer_class = UserSerializer http_method_names = ('post',) def create(self, request, *args, **kwargs): """Create user profile""" device_id = request.META.get('HTTP_DEVICE_ID') if request.data['user_type'] in [str(NUMBER['one']), str(NUMBER['two'])]: serializer = UserSerializer(context=request.data['user_type'], data=request.data) if serializer.is_valid(): user = serializer.save() """Generate otp""" otp = generate_otp() # expire otp after 1 day expiry = timezone.now() + timezone.timedelta(days=1) # create user email otp object UserEmailOtp.objects.create(email=request.data['email'], otp=otp, user_type=str(request.data['user_type']), expired_at=expiry) """Send email to the register user""" send_otp_email.delay(request.data['email'], otp) UserDeviceDetails.objects.create(user=user, device_id=device_id) return custom_response(SUCCESS_CODE['3001'], response_status=status.HTTP_200_OK) return custom_error_response(serializer.errors, response_status=status.HTTP_400_BAD_REQUEST) else: return custom_error_response(ERROR_CODE['2028'], response_status=status.HTTP_400_BAD_REQUEST) class UpdateGuardianProfile(viewsets.ModelViewSet): """Update guardian profile""" serializer_class = CreateGuardianSerializer permission_classes = [IsAuthenticated] http_method_names = ('post',) def create(self, request, *args, **kwargs): """Create guardian profile""" try: data = request.data image = request.data.get('image') image_url = '' if image: if image and image.size == NUMBER['zero']: return custom_error_response(ERROR_CODE['2035'], response_status=status.HTTP_400_BAD_REQUEST) filename = f"images/{image.name}" # upload image on ali baba image_url = upload_image_to_alibaba(image, filename) data = {"image":image_url} serializer = CreateGuardianSerializer(context={"user":request.user, "first_name":request.data.get('first_name'), "last_name": request.data.get('last_name'), "image":image_url}, data=data) if serializer.is_valid(): """save serializer""" serializer.save() return custom_response(None, serializer.data, response_status=status.HTTP_200_OK) return custom_error_response(serializer.errors, response_status=status.HTTP_400_BAD_REQUEST) except Exception as e: return custom_error_response(str(e), response_status=status.HTTP_400_BAD_REQUEST) class AllTaskListAPIView(viewsets.ModelViewSet): """Update guardian profile""" serializer_class = TaskDetailsSerializer permission_classes = [IsAuthenticated] def list(self, request, *args, **kwargs): """Create guardian profile""" queryset = JuniorTask.objects.filter(guardian__user=request.user) # use TaskDetailsSerializer serializer serializer = TaskDetailsSerializer(queryset, many=True) return custom_response(None, serializer.data, response_status=status.HTTP_200_OK) class TaskListAPIView(viewsets.ModelViewSet): """Task list Params status search page""" serializer_class = TaskDetailsSerializer permission_classes = [IsAuthenticated] filter_backends = (SearchFilter,) search_fields = ['task_name', ] pagination_class = PageNumberPagination http_method_names = ('get',) def get_queryset(self): queryset = JuniorTask.objects.filter(guardian__user=self.request.user ).prefetch_related('junior', 'junior__auth' ).order_by('due_date', 'created_at') queryset = self.filter_queryset(queryset) return queryset def list(self, request, *args, **kwargs): """Create guardian profile""" status_value = self.request.GET.get('status') queryset = self.get_queryset() if status_value and status_value != '0': queryset = queryset.filter(task_status=status_value) paginator = self.pagination_class() # use Pagination paginated_queryset = paginator.paginate_queryset(queryset, request) # use TaskDetailsSerializer serializer serializer = self.serializer_class(paginated_queryset, many=True) return custom_response(None, serializer.data, response_status=status.HTTP_200_OK) class CreateTaskAPIView(viewsets.ModelViewSet): """create task for junior""" serializer_class = TaskSerializer queryset = JuniorTask.objects.all() http_method_names = ('post', ) def create(self, request, *args, **kwargs): """ image should be in form data """ try: image = request.data['default_image'] junior = request.data['junior'] junior_id = Junior.objects.filter(id=junior).last() guardian_data = Guardian.objects.filter(user=request.user).last() if (guardian_data.guardian_code not in junior_id.guardian_code or junior_id.guardian_code_status in guardian_code_tuple): return custom_error_response(ERROR_CODE['2078'], response_status=status.HTTP_400_BAD_REQUEST) allowed_extensions = ['.jpg', '.jpeg', '.png'] if not any(extension in str(image) for extension in allowed_extensions): return custom_error_response(ERROR_CODE['2048'], response_status=status.HTTP_400_BAD_REQUEST) if not junior.isnumeric(): """junior value must be integer""" return custom_error_response(ERROR_CODE['2047'], response_status=status.HTTP_400_BAD_REQUEST) data = request.data if 'https' in str(image): image_data = image else: filename = f"images/{image}" if image and image.size == NUMBER['zero']: return custom_error_response(ERROR_CODE['2035'], response_status=status.HTTP_400_BAD_REQUEST) image_url = upload_image_to_alibaba(image, filename) image_data = image_url data.pop('default_image') # use TaskSerializer serializer serializer = TaskSerializer(context={"user":request.user, "image":image_data}, data=data) if serializer.is_valid(): # save serializer task = serializer.save() send_notification_to_junior.delay(TASK_ASSIGNED, request.auth.payload['user_id'], junior_id.auth.id, {'task_id': task.id}) return custom_response(SUCCESS_CODE['3018'], serializer.data, response_status=status.HTTP_200_OK) return custom_error_response(serializer.errors, response_status=status.HTTP_400_BAD_REQUEST) except Exception as e: return custom_error_response(str(e), response_status=status.HTTP_400_BAD_REQUEST) class SearchTaskListAPIView(viewsets.ModelViewSet): """Filter task""" serializer_class = TaskDetailsSerializer permission_classes = [IsAuthenticated] pagination_class = PageNumberPagination http_method_names = ('get',) def get_queryset(self): """Get the queryset for the view""" title = self.request.GET.get('title') # fetch junior query junior_queryset = JuniorTask.objects.filter(guardian__user=self.request.user, task_name__icontains=title)\ .order_by('due_date', 'created_at') return junior_queryset def list(self, request, *args, **kwargs): """Filter task""" try: queryset = self.get_queryset() paginator = self.pagination_class() # use pagination paginated_queryset = paginator.paginate_queryset(queryset, request) # use TaskSerializer serializer serializer = TaskDetailsSerializer(paginated_queryset, many=True) return custom_response(None, serializer.data, response_status=status.HTTP_200_OK) except Exception as e: return custom_error_response(str(e), response_status=status.HTTP_400_BAD_REQUEST) class TopJuniorListAPIView(viewsets.ModelViewSet): """Top juniors list No Params""" queryset = JuniorPoints.objects.all() serializer_class = TopJuniorSerializer permission_classes = [IsAuthenticated] http_method_names = ('get',) def get_serializer_context(self): # context list context = super().get_serializer_context() context.update({'view': self}) return context def list(self, request, *args, **kwargs): """Fetch junior list of those who complete their tasks""" try: junior_total_points = self.get_queryset().order_by('-total_points') # Update the position field for each JuniorPoints object for index, junior in enumerate(junior_total_points): junior.position = index + 1 junior.save() serializer = self.get_serializer(junior_total_points[:NUMBER['fifteen']], many=True) return custom_response(None, serializer.data, response_status=status.HTTP_200_OK) except Exception as e: return custom_error_response(str(e), response_status=status.HTTP_400_BAD_REQUEST) class ApproveJuniorAPIView(viewsets.ModelViewSet): """approve junior by guardian""" serializer_class = ApproveJuniorSerializer permission_classes = [IsAuthenticated] http_method_names = ('post',) def create(self, request, *args, **kwargs): """ Use below param {"junior_id":"75", "action":"1"} """ try: guardian = Guardian.objects.filter(user__email=self.request.user).last() # fetch junior query junior_queryset = Junior.objects.filter(id=self.request.data.get('junior_id')).last() if junior_queryset and (junior_queryset.is_deleted or not junior_queryset.is_active): return custom_error_response(ERROR_CODE['2073'], response_status=status.HTTP_400_BAD_REQUEST) # action 1 is use for approve and 2 for reject if request.data['action'] == '1': # use ApproveJuniorSerializer serializer serializer = ApproveJuniorSerializer(context={"guardian_code": guardian.guardian_code, "junior": junior_queryset, "action": request.data['action']}, data=request.data) if serializer.is_valid(): # save serializer serializer.save() send_notification_to_junior.delay(ASSOCIATE_APPROVED, guardian.user.id, junior_queryset.auth.id) return custom_response(SUCCESS_CODE['3023'], serializer.data, response_status=status.HTTP_200_OK) else: junior_queryset.guardian_code = None junior_queryset.guardian_code_status = str(NUMBER['one']) junior_queryset.save() send_notification_to_junior.delay(ASSOCIATE_REJECTED, guardian.user.id, junior_queryset.auth.id) return custom_response(SUCCESS_CODE['3024'], response_status=status.HTTP_200_OK) except Exception as e: return custom_error_response(str(e), response_status=status.HTTP_400_BAD_REQUEST) class ApproveTaskAPIView(viewsets.ModelViewSet): """approve junior by guardian""" serializer_class = ApproveTaskSerializer permission_classes = [IsAuthenticated] http_method_names = ('post',) def create(self, request, *args, **kwargs): """ Params {"junior_id":"82", "task_id":"43", "action":"1"} action 1 for approve 2 for reject """ # action 1 is use for approve and 2 for reject try: guardian = Guardian.objects.filter(user__email=self.request.user).last() # task query task_queryset = JuniorTask.objects.filter(id=self.request.data.get('task_id'), guardian=guardian, junior=self.request.data.get('junior_id')).last() if task_queryset and (task_queryset.junior.is_deleted or not task_queryset.junior.is_active): return custom_error_response(ERROR_CODE['2072'], response_status=status.HTTP_400_BAD_REQUEST) # use ApproveJuniorSerializer serializer serializer = ApproveTaskSerializer(context={"guardian_code": guardian.guardian_code, "task_instance": task_queryset, "action": str(request.data['action']), "junior": self.request.data['junior_id']}, data=request.data) unexpected_task_status = [str(NUMBER['five']), str(NUMBER['six'])] if (str(request.data['action']) == str(NUMBER['one']) and serializer.is_valid() and task_queryset and task_queryset.task_status not in unexpected_task_status): # save serializer serializer.save() return custom_response(SUCCESS_CODE['3025'], response_status=status.HTTP_200_OK) elif (str(request.data['action']) == str(NUMBER['two']) and serializer.is_valid() and task_queryset and task_queryset.task_status not in unexpected_task_status): # save serializer serializer.save() return custom_response(SUCCESS_CODE['3026'], response_status=status.HTTP_200_OK) else: return custom_error_response(ERROR_CODE['2038'], response_status=status.HTTP_400_BAD_REQUEST) except Exception as e: return custom_error_response(str(e), response_status=status.HTTP_400_BAD_REQUEST) class GuardianListAPIView(viewsets.ModelViewSet): """Guardian list of assosicated junior""" serializer_class = GuardianDetailListSerializer permission_classes = [IsAuthenticated] http_method_names = ('get',) def list(self, request, *args, **kwargs): """ Guardian list of assosicated junior No Params""" try: guardian_data = JuniorGuardianRelationship.objects.filter(junior__auth__email=self.request.user) # fetch junior object if guardian_data: # use GuardianDetailListSerializer serializer serializer = GuardianDetailListSerializer(guardian_data, many=True) return custom_response(None, serializer.data, response_status=status.HTTP_200_OK) return custom_response({"status": GUARDIAN_CODE_STATUS[1][0]}, response_status=status.HTTP_200_OK) except Exception as e: return custom_error_response(str(e), response_status=status.HTTP_400_BAD_REQUEST)