mirror of
https://github.com/HamzaSha1/zod-backend.git
synced 2025-07-15 10:05:21 +00:00
127 lines
6.3 KiB
Python
127 lines
6.3 KiB
Python
"""
|
|
web_admin user_management views file
|
|
"""
|
|
# django imports
|
|
from rest_framework.viewsets import GenericViewSet, mixins
|
|
from rest_framework.filters import OrderingFilter, SearchFilter
|
|
from rest_framework import status
|
|
from rest_framework.decorators import action
|
|
from rest_framework.permissions import IsAuthenticated, AllowAny
|
|
from django.contrib.auth import get_user_model
|
|
from django.db.models import Q
|
|
|
|
# local imports
|
|
from account.utils import custom_response, custom_error_response
|
|
from base.constants import USER_TYPE
|
|
from base.messages import SUCCESS_CODE, ERROR_CODE
|
|
from web_admin.permission import AdminPermission
|
|
from web_admin.serializers.user_management_serializer import (UserManagementListSerializer,
|
|
UserManagementDetailSerializer, GuardianSerializer,
|
|
JuniorSerializer)
|
|
|
|
USER = get_user_model()
|
|
|
|
|
|
class UserManagementViewSet(GenericViewSet, mixins.ListModelMixin,
|
|
mixins.RetrieveModelMixin, mixins.UpdateModelMixin):
|
|
"""
|
|
api to manage (list, view, edit) user
|
|
"""
|
|
serializer_class = UserManagementListSerializer
|
|
permission_classes = [IsAuthenticated, AdminPermission]
|
|
queryset = USER.objects.filter(
|
|
(Q(junior_profile__is_verified=True) | Q(guardian_profile__is_verified=True)),
|
|
is_superuser=False).prefetch_related('guardian_profile',
|
|
'junior_profile'
|
|
).exclude(junior_profile__isnull=True,
|
|
guardian_profile__isnull=True).order_by('-date_joined')
|
|
filter_backends = (SearchFilter,)
|
|
search_fields = ['first_name', 'last_name']
|
|
http_method_names = ['get', 'post', 'patch']
|
|
|
|
def get_queryset(self):
|
|
if self.request.query_params.get('user_type') == dict(USER_TYPE).get('2'):
|
|
queryset = self.queryset.filter(junior_profile__isnull=True)
|
|
elif self.request.query_params.get('user_type') == dict(USER_TYPE).get('1'):
|
|
queryset = self.queryset.filter(guardian_profile__isnull=True)
|
|
else:
|
|
queryset = self.queryset
|
|
queryset = self.filter_queryset(queryset)
|
|
return queryset
|
|
|
|
def list(self, request, *args, **kwargs):
|
|
"""
|
|
api method to list all the user
|
|
:param request: user_type {'guardian' for Guardian list, 'junior' for Junior list}
|
|
:return:
|
|
"""
|
|
queryset = self.get_queryset()
|
|
paginator = self.pagination_class()
|
|
paginated_queryset = paginator.paginate_queryset(queryset, request)
|
|
serializer = self.serializer_class(paginated_queryset, many=True)
|
|
return custom_response(None, data=serializer.data, count=queryset.count())
|
|
|
|
def retrieve(self, request, *args, **kwargs):
|
|
"""
|
|
to get details of single user
|
|
:param request: user_id along with
|
|
user_type {'guardian' for Guardian, 'junior' for Junior} mandatory
|
|
:return: user details
|
|
"""
|
|
if self.request.query_params.get('user_type') not in [dict(USER_TYPE).get('1'), dict(USER_TYPE).get('2')]:
|
|
return custom_error_response(ERROR_CODE['2067'], status.HTTP_400_BAD_REQUEST)
|
|
queryset = self.queryset
|
|
if self.request.query_params.get('user_type') == dict(USER_TYPE).get('2'):
|
|
queryset = queryset.filter(guardian_profile__user__id=kwargs['pk'])
|
|
elif self.request.query_params.get('user_type') == dict(USER_TYPE).get('1'):
|
|
queryset = queryset.filter(junior_profile__auth__id=kwargs['pk'])
|
|
serializer = UserManagementDetailSerializer(queryset, many=True)
|
|
return custom_response(None, data=serializer.data)
|
|
|
|
def partial_update(self, request, *args, **kwargs):
|
|
"""
|
|
api method to update user detail (email, phone number)
|
|
:param request: user_id along with
|
|
user_type {'guardian' for Guardian, 'junior' for Junior} mandatory
|
|
:return: success message
|
|
"""
|
|
if self.request.query_params.get('user_type') not in [dict(USER_TYPE).get('1'), dict(USER_TYPE).get('2')]:
|
|
return custom_error_response(ERROR_CODE['2067'], status.HTTP_400_BAD_REQUEST)
|
|
queryset = self.queryset
|
|
if self.request.query_params.get('user_type') == dict(USER_TYPE).get('2'):
|
|
user_obj = queryset.filter(guardian_profile__user__id=kwargs['pk']).first()
|
|
serializer = GuardianSerializer(user_obj.guardian_profile.all().first(),
|
|
request.data, context={'user_id': kwargs['pk']})
|
|
|
|
elif self.request.query_params.get('user_type') == dict(USER_TYPE).get('1'):
|
|
user_obj = queryset.filter(junior_profile__auth__id=kwargs['pk']).first()
|
|
serializer = JuniorSerializer(user_obj.junior_profile.all().first(),
|
|
request.data, context={'user_id': kwargs['pk']})
|
|
|
|
serializer.is_valid(raise_exception=True)
|
|
serializer.save()
|
|
return custom_response(SUCCESS_CODE['3037'])
|
|
|
|
@action(methods=['get'], url_name='change-status', url_path='change-status', detail=True)
|
|
def change_status(self, request, *args, **kwargs):
|
|
"""
|
|
api to change user status (mark as active or inactive)
|
|
:param request: user_id along with
|
|
user_type {'guardian' for Guardian, 'junior' for Junior} mandatory
|
|
:return: success message
|
|
"""
|
|
if self.request.query_params.get('user_type') not in [dict(USER_TYPE).get('1'), dict(USER_TYPE).get('2')]:
|
|
return custom_error_response(ERROR_CODE['2067'], status.HTTP_400_BAD_REQUEST)
|
|
queryset = self.queryset
|
|
if self.request.query_params.get('user_type') == dict(USER_TYPE).get('2'):
|
|
user_obj = queryset.filter(guardian_profile__user__id=kwargs['pk']).first()
|
|
obj = user_obj.guardian_profile.all().first()
|
|
obj.is_active = False if obj.is_active else True
|
|
obj.save()
|
|
elif self.request.query_params.get('user_type') == dict(USER_TYPE).get('1'):
|
|
user_obj = queryset.filter(junior_profile__auth__id=kwargs['pk']).first()
|
|
obj = user_obj.junior_profile.all().first()
|
|
obj.is_active = False if obj.is_active else True
|
|
obj.save()
|
|
return custom_response(SUCCESS_CODE['3038'])
|