""" web_admin analytics view file """ # python imports import datetime import io import pandas as pd import xlsxwriter # third party imports from rest_framework.viewsets import GenericViewSet from rest_framework.decorators import action from rest_framework.permissions import IsAuthenticated # django imports from django.contrib.auth import get_user_model from django.db.models import Q from django.db.models import Count from django.db.models.functions import TruncDate from django.db.models import F, Window from django.db.models.functions.window import Rank from django.http import HttpResponse # local imports from account.utils import custom_response, get_user_full_name from base.constants import PENDING, IN_PROGRESS, REJECTED, REQUESTED, COMPLETED, EXPIRED, DATE_FORMAT, TASK_STATUS from guardian.models import JuniorTask from guardian.utils import upload_excel_file_to_alibaba from junior.models import JuniorPoints from web_admin.pagination import CustomPageNumberPagination from web_admin.permission import AdminPermission from web_admin.serializers.analytics_serializer import LeaderboardSerializer, UserCSVReportSerializer from web_admin.utils import get_dates USER = get_user_model() class AnalyticsViewSet(GenericViewSet): """ analytics api view to get user report (active users, guardians and juniors counts) to get new user sign up report to get task report (completed, in-progress, requested and rejected tasks count) to get junior leaderboard and ranking """ serializer_class = None permission_classes = [IsAuthenticated, AdminPermission] def get_queryset(self): user_qs = USER.objects.filter( (Q(junior_profile__is_verified=True) | Q(guardian_profile__is_verified=True)), is_superuser=False ).prefetch_related('guardian_profile', 'junior_profile' ).exclude(junior_profile__isnull=True, guardian_profile__isnull=True).order_by('-date_joined') return user_qs @action(methods=['get'], url_name='users-count', url_path='users-count', detail=False) def total_users_count(self, request, *args, **kwargs): """ api method to get total users, guardians and juniors :param request: start_date: date format (yyyy-mm-dd) :param request: end_date: date format (yyyy-mm-dd) :return: """ start_date, end_date = get_dates(request.query_params.get('start_date'), request.query_params.get('end_date')) user_qs = self.get_queryset() queryset = user_qs.filter(date_joined__range=(start_date, (end_date + datetime.timedelta(days=1)))) data = {'total_users': queryset.count(), 'total_guardians': queryset.filter(junior_profile__isnull=True).count(), 'total_juniors': queryset.filter(guardian_profile__isnull=True).count()} return custom_response(None, data) @action(methods=['get'], url_name='new-signups', url_path='new-signups', detail=False) def new_signups(self, request, *args, **kwargs): """ api method to get new signups :param request: start_date: date format (yyyy-mm-dd) :param request: end_date: date format (yyyy-mm-dd) :return: """ start_date, end_date = get_dates(request.query_params.get('start_date'), request.query_params.get('end_date')) user_qs = self.get_queryset() signup_data = user_qs.filter(date_joined__range=[start_date, (end_date + datetime.timedelta(days=1))] ).annotate(date=TruncDate('date_joined') ).values('date').annotate(signups=Count('id')).order_by('date') return custom_response(None, signup_data) @action(methods=['get'], url_name='assign-tasks', url_path='assign-tasks', detail=False) def assign_tasks_report(self, request, *args, **kwargs): """ api method to get assign tasks count for (completed, in-progress, requested and rejected) task :param request: start_date: date format (yyyy-mm-dd) :param request: end_date: date format (yyyy-mm-dd) :return: """ start_date, end_date = get_dates(request.query_params.get('start_date'), request.query_params.get('end_date')) assign_tasks = JuniorTask.objects.filter( created_at__range=[start_date, (end_date + datetime.timedelta(days=1))] ) data = { 'task_completed': assign_tasks.filter(task_status=COMPLETED).count(), 'task_pending': assign_tasks.filter(task_status=PENDING).count(), 'task_in_progress': assign_tasks.filter(task_status=IN_PROGRESS).count(), 'task_requested': assign_tasks.filter(task_status=REQUESTED).count(), 'task_rejected': assign_tasks.filter(task_status=REJECTED).count(), 'task_expired': assign_tasks.filter(task_status=EXPIRED).count(), } return custom_response(None, data) @action(methods=['get'], url_name='junior-leaderboard', url_path='junior-leaderboard', detail=False, serializer_class=LeaderboardSerializer) def junior_leaderboard(self, request): """ to get junior leaderboard and rank :param request: :return: """ queryset = JuniorPoints.objects.filter( junior__is_verified=True ).select_related('junior', 'junior__auth').annotate(rank=Window( expression=Rank(), order_by=[F('total_points').desc(), 'junior__created_at'] )).order_by('-total_points', 'junior__created_at') paginator = CustomPageNumberPagination() paginated_queryset = paginator.paginate_queryset(queryset, request) serializer = self.serializer_class(paginated_queryset, many=True) return custom_response(None, serializer.data) @action(methods=['get'], url_name='export-excel', url_path='export-excel', detail=False) def export_excel(self, request): """ to export users count, task details and top juniors in csv/excel file :param request: start_date: date format (yyyy-mm-dd) :param request: end_date: date format (yyyy-mm-dd) :return: """ response = HttpResponse(content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') response['Content-Disposition'] = 'attachment; filename="ZOD_Bank_Analytics.xlsx"' start_date, end_date = get_dates(request.query_params.get('start_date'), request.query_params.get('end_date')) # Use BytesIO for binary data buffer = io.BytesIO() # Create an XlsxWriter Workbook object workbook = xlsxwriter.Workbook(buffer) # Add sheets sheets = ['Users', 'Assign Tasks', 'Juniors Leaderboard'] for sheet_name in sheets: worksheet = workbook.add_worksheet(name=sheet_name) # sheet 1 for Total Users if sheet_name == 'Users': user_qs = self.get_queryset() queryset = user_qs.filter(date_joined__range=(start_date, (end_date + datetime.timedelta(days=1)))) serializer = UserCSVReportSerializer(queryset, many=True) df_users = pd.DataFrame([ {'Name': user['name'], 'Email': user['email'], 'Phone Number': user['phone_number'], 'User Type': user['user_type'], 'Status': user['is_active'], 'Date Joined': user['date_joined']} for user in serializer.data ]) write_excel_worksheet(worksheet, df_users) # sheet 2 for Assign Task elif sheet_name == 'Assign Tasks': assign_tasks = JuniorTask.objects.filter( created_at__range=[start_date, (end_date + datetime.timedelta(days=1))] ).select_related('junior__auth', 'guardian__user').only('task_name', 'task_status', 'junior__auth__first_name', 'junior__auth__last_name', 'guardian__user__first_name', 'guardian__user__last_name',) df_tasks = pd.DataFrame([ {'Task Name': task.task_name, 'Assign To': get_user_full_name(task.junior.auth), 'Assign By': get_user_full_name(task.guardian.user), 'Task Status': dict(TASK_STATUS).get(task.task_status).capitalize()} for task in assign_tasks ]) write_excel_worksheet(worksheet, df_tasks) # sheet 3 for Juniors Leaderboard and rank elif sheet_name == 'Juniors Leaderboard': queryset = JuniorPoints.objects.filter( junior__is_verified=True ).select_related('junior', 'junior__auth').annotate(rank=Window( expression=Rank(), order_by=[F('total_points').desc(), 'junior__created_at'] )).order_by('-total_points', 'junior__created_at')[:15] df_leaderboard = pd.DataFrame([ { 'Name': get_user_full_name(junior.junior.auth), 'Points': junior.total_points, 'Rank': junior.rank } for junior in queryset ]) write_excel_worksheet(worksheet, df_leaderboard) # Close the workbook to save the content workbook.close() # Reset the buffer position and write the content to the response buffer.seek(0) response.write(buffer.getvalue()) buffer.close() filename = f"{'analytics'}/{'ZOD_Bank_Analytics.xlsx'}" file_link = upload_excel_file_to_alibaba(response, filename) return custom_response(None, file_link) def write_excel_worksheet(worksheet, dataframe): """ to perform write action on worksheets :param worksheet: :param dataframe: :return: worksheet """ for idx, col in enumerate(dataframe.columns): # Write header worksheet.write(0, idx, col) for row_num, row in enumerate(dataframe.values, start=1): for col_num, value in enumerate(row): worksheet.write(row_num, col_num, value) return worksheet