""" web_admin user_management views file """ # django imports from rest_framework.viewsets import GenericViewSet, mixins from rest_framework.filters import OrderingFilter, SearchFilter from rest_framework import status from rest_framework.decorators import action from rest_framework.permissions import IsAuthenticated, AllowAny from django.contrib.auth import get_user_model from django.db.models import Q # local imports from account.utils import custom_response, custom_error_response from base.constants import USER_TYPE from base.messages import SUCCESS_CODE, ERROR_CODE from web_admin.permission import AdminPermission from web_admin.serializers.user_management_serializer import (UserManagementListSerializer, UserManagementDetailSerializer, GuardianSerializer, JuniorSerializer) USER = get_user_model() class UserManagementViewSet(GenericViewSet, mixins.ListModelMixin, mixins.RetrieveModelMixin, mixins.UpdateModelMixin): """ api to manage (list, view, edit) user """ serializer_class = UserManagementListSerializer permission_classes = [IsAuthenticated, AdminPermission] queryset = USER.objects.filter( (Q(junior_profile__is_verified=True) | Q(guardian_profile__is_verified=True)), is_superuser=False).prefetch_related('guardian_profile', 'junior_profile' ).exclude(junior_profile__isnull=True, guardian_profile__isnull=True).order_by('-date_joined') filter_backends = (SearchFilter,) search_fields = ['first_name', 'last_name'] http_method_names = ['get', 'post', 'patch'] def get_queryset(self): if self.request.query_params.get('user_type') == dict(USER_TYPE).get('2'): queryset = self.queryset.filter(junior_profile__isnull=True) elif self.request.query_params.get('user_type') == dict(USER_TYPE).get('1'): queryset = self.queryset.filter(guardian_profile__isnull=True) else: queryset = self.queryset queryset = self.filter_queryset(queryset) return queryset def list(self, request, *args, **kwargs): """ api method to list all the user :param request: user_type {'guardian' for Guardian list, 'junior' for Junior list} :return: """ queryset = self.get_queryset() paginator = self.pagination_class() paginated_queryset = paginator.paginate_queryset(queryset, request) serializer = self.serializer_class(paginated_queryset, many=True) return custom_response(None, data=serializer.data, count=queryset.count()) def retrieve(self, request, *args, **kwargs): """ to get details of single user :param request: user_id along with user_type {'guardian' for Guardian, 'junior' for Junior} mandatory :return: user details """ if self.request.query_params.get('user_type') not in [dict(USER_TYPE).get('1'), dict(USER_TYPE).get('2')]: return custom_error_response(ERROR_CODE['2067'], status.HTTP_400_BAD_REQUEST) queryset = self.queryset if self.request.query_params.get('user_type') == dict(USER_TYPE).get('2'): queryset = queryset.filter(guardian_profile__user__id=kwargs['pk']) elif self.request.query_params.get('user_type') == dict(USER_TYPE).get('1'): queryset = queryset.filter(junior_profile__auth__id=kwargs['pk']) serializer = UserManagementDetailSerializer(queryset, many=True) return custom_response(None, data=serializer.data) def partial_update(self, request, *args, **kwargs): """ api method to update user detail (email, phone number) :param request: user_id along with user_type {'guardian' for Guardian, 'junior' for Junior} mandatory :return: success message """ if self.request.query_params.get('user_type') not in [dict(USER_TYPE).get('1'), dict(USER_TYPE).get('2')]: return custom_error_response(ERROR_CODE['2067'], status.HTTP_400_BAD_REQUEST) queryset = self.queryset if self.request.query_params.get('user_type') == dict(USER_TYPE).get('2'): user_obj = queryset.filter(guardian_profile__user__id=kwargs['pk']).first() serializer = GuardianSerializer(user_obj.guardian_profile.all().first(), request.data, context={'user_id': kwargs['pk']}) elif self.request.query_params.get('user_type') == dict(USER_TYPE).get('1'): user_obj = queryset.filter(junior_profile__auth__id=kwargs['pk']).first() serializer = JuniorSerializer(user_obj.junior_profile.all().first(), request.data, context={'user_id': kwargs['pk']}) serializer.is_valid(raise_exception=True) serializer.save() return custom_response(SUCCESS_CODE['3037']) @action(methods=['get'], url_name='change-status', url_path='change-status', detail=True) def change_status(self, request, *args, **kwargs): """ api to change user status (mark as active or inactive) :param request: user_id along with user_type {'guardian' for Guardian, 'junior' for Junior} mandatory :return: success message """ if self.request.query_params.get('user_type') not in [dict(USER_TYPE).get('1'), dict(USER_TYPE).get('2')]: return custom_error_response(ERROR_CODE['2067'], status.HTTP_400_BAD_REQUEST) queryset = self.queryset if self.request.query_params.get('user_type') == dict(USER_TYPE).get('2'): user_obj = queryset.filter(guardian_profile__user__id=kwargs['pk']).first() obj = user_obj.guardian_profile.all().first() obj.is_active = False if obj.is_active else True obj.save() elif self.request.query_params.get('user_type') == dict(USER_TYPE).get('1'): user_obj = queryset.filter(junior_profile__auth__id=kwargs['pk']).first() obj = user_obj.junior_profile.all().first() obj.is_active = False if obj.is_active else True obj.save() return custom_response(SUCCESS_CODE['3038'])