""" web_admin analytics view file """ # python imports import datetime import csv import io import pandas as pd import xlsxwriter # third party imports from rest_framework.viewsets import GenericViewSet from rest_framework.decorators import action from rest_framework.permissions import IsAuthenticated # django imports from django.contrib.auth import get_user_model from django.db.models import Q from django.db.models import Count from django.db.models.functions import TruncDate from django.db.models import F, Window from django.db.models.functions.window import Rank from django.http import HttpResponse # local imports from account.utils import custom_response from base.constants import PENDING, IN_PROGRESS, REJECTED, REQUESTED, COMPLETED, EXPIRED, DATE_FORMAT, TASK_STATUS from guardian.models import JuniorTask from junior.models import JuniorPoints from web_admin.pagination import CustomPageNumberPagination from web_admin.permission import AdminPermission from web_admin.serializers.analytics_serializer import LeaderboardSerializer, UserCSVReportSerializer from web_admin.serializers.user_management_serializer import UserManagementListSerializer USER = get_user_model() class AnalyticsViewSet(GenericViewSet): """ analytics api view to get user report (active users, guardians and juniors counts) to get new user sign up report to get task report (completed, in-progress, requested and rejected tasks count) to get junior leaderboard and ranking """ serializer_class = None # permission_classes = [IsAuthenticated, AdminPermission] def get_queryset(self): user_qs = USER.objects.filter( (Q(junior_profile__is_verified=True) | Q(guardian_profile__is_verified=True)), is_superuser=False ).prefetch_related('guardian_profile', 'junior_profile' ).exclude(junior_profile__isnull=True, guardian_profile__isnull=True).order_by('date_joined') return user_qs @action(methods=['get'], url_name='users-count', url_path='users-count', detail=False) def total_users_count(self, request, *args, **kwargs): """ api method to get total users, guardians and juniors :param request: start_date: date format (yyyy-mm-dd) :param request: end_date: date format (yyyy-mm-dd) :return: """ end_date = datetime.date.today() start_date = end_date - datetime.timedelta(days=6) if request.query_params.get('start_date') and request.query_params.get('end_date'): start_date = datetime.datetime.strptime(request.query_params.get('start_date'), DATE_FORMAT) end_date = datetime.datetime.strptime(request.query_params.get('end_date'), DATE_FORMAT) user_qs = self.get_queryset() queryset = user_qs.filter(date_joined__range=(start_date, (end_date + datetime.timedelta(days=1)))) data = {'total_users': queryset.count(), 'total_guardians': queryset.filter(junior_profile__isnull=True).count(), 'total_juniors': queryset.filter(guardian_profile__isnull=True).count()} return custom_response(None, data) @action(methods=['get'], url_name='new-signups', url_path='new-signups', detail=False) def new_signups(self, request, *args, **kwargs): """ api method to get new signups :param request: start_date: date format (yyyy-mm-dd) :param request: end_date: date format (yyyy-mm-dd) :return: """ end_date = datetime.date.today() start_date = end_date - datetime.timedelta(days=6) if request.query_params.get('start_date') and request.query_params.get('end_date'): start_date = datetime.datetime.strptime(request.query_params.get('start_date'), DATE_FORMAT) end_date = datetime.datetime.strptime(request.query_params.get('end_date'), DATE_FORMAT) user_qs = self.get_queryset() signup_data = user_qs.filter(date_joined__range=[start_date, (end_date + datetime.timedelta(days=1))] ).annotate(date=TruncDate('date_joined') ).values('date').annotate(signups=Count('id')).order_by('date') return custom_response(None, signup_data) @action(methods=['get'], url_name='assign-tasks', url_path='assign-tasks', detail=False) def assign_tasks_report(self, request, *args, **kwargs): """ api method to get assign tasks count for (completed, in-progress, requested and rejected) task :param request: start_date: date format (yyyy-mm-dd) :param request: end_date: date format (yyyy-mm-dd) :return: """ end_date = datetime.date.today() start_date = end_date - datetime.timedelta(days=6) if request.query_params.get('start_date') and request.query_params.get('end_date'): start_date = datetime.datetime.strptime(request.query_params.get('start_date'), DATE_FORMAT) end_date = datetime.datetime.strptime(request.query_params.get('end_date'), DATE_FORMAT) assign_tasks = JuniorTask.objects.filter( created_at__range=[start_date, (end_date + datetime.timedelta(days=1))] ).exclude(task_status__in=[PENDING, EXPIRED]) data = { 'task_completed': assign_tasks.filter(task_status=COMPLETED).count(), 'task_in_progress': assign_tasks.filter(task_status=IN_PROGRESS).count(), 'task_requested': assign_tasks.filter(task_status=REQUESTED).count(), 'task_rejected': assign_tasks.filter(task_status=REJECTED).count(), } return custom_response(None, data) @action(methods=['get'], url_name='junior-leaderboard', url_path='junior-leaderboard', detail=False, serializer_class=LeaderboardSerializer) def junior_leaderboard(self, request): """ to get junior leaderboard and rank :param request: :return: """ queryset = JuniorPoints.objects.prefetch_related('junior', 'junior__auth').annotate(rank=Window( expression=Rank(), order_by=[F('total_points').desc(), 'junior__created_at'] )).order_by('-total_points', 'junior__created_at') paginator = CustomPageNumberPagination() paginated_queryset = paginator.paginate_queryset(queryset, request) serializer = self.serializer_class(paginated_queryset, many=True) return custom_response(None, serializer.data) # @action(methods=['get'], url_name='generate-report', url_path='generate-report', detail=False) # def generate_report(self, request): # # response = HttpResponse(content_type='text/csv') # response['Content-Disposition'] = 'attachment; filename="ZOD_Bank_Analytics.csv"' # # end_date = datetime.date.today() # start_date = end_date - datetime.timedelta(days=6) # # if request.query_params.get('start_date') and request.query_params.get('end_date'): # start_date = datetime.datetime.strptime(request.query_params.get('start_date'), DATE_FORMAT) # end_date = datetime.datetime.strptime(request.query_params.get('end_date'), DATE_FORMAT) # # buffer = io.StringIO() # csv_writer = csv.writer(buffer) # # # sheet 1 for Total Users # user_qs = self.get_queryset() # queryset = user_qs.filter(date_joined__range=(start_date, (end_date + datetime.timedelta(days=1)))) # serializer = UserCSVReportSerializer(queryset, many=True) # # # writer = csv.writer(response) # csv_writer.writerow(['Users']) # csv_writer.writerow([ # 'First Name', 'Last Name', 'Email', 'Phone Number', 'User Type', 'status', 'Date Joined', # # Add more fields as needed # ]) # # for user_data in serializer.data: # row = [ # user_data['first_name'], user_data['last_name'], user_data['email'], # user_data['phone_number'], user_data['user_type'], # user_data['is_active'], user_data['date_joined'] # ] # csv_writer.writerow(row) # # response.write(buffer.getvalue()) # buffer.close() # # # sheet 2 for Assign Task # assign_tasks = JuniorTask.objects.filter( # created_at__range=[start_date, (end_date + datetime.timedelta(days=1))] # ).exclude(task_status__in=[PENDING, EXPIRED]) # buffer = io.StringIO() # csv_writer = csv.writer(buffer) # csv_writer.writerow([]) # csv_writer.writerow(['Assign Tasks']) # csv_writer.writerow([ # 'Task Name', 'Task Status' # # Add more fields as needed # ]) # # for task in assign_tasks: # row = [ # task.task_name, dict(TASK_STATUS).get(task.task_status).capitalize(), # ] # csv_writer.writerow(row) # response.write(buffer.getvalue()) # buffer.close() # # # sheet 3 for Juniors Leaderboard and rank # queryset = JuniorPoints.objects.prefetch_related('junior', 'junior__auth').annotate(rank=Window( # expression=Rank(), # order_by=[F('total_points').desc(), 'junior__created_at'] # )).order_by('-total_points', 'junior__created_at') # buffer = io.StringIO() # csv_writer = csv.writer(buffer) # csv_writer.writerow([]) # csv_writer.writerow(['Top 15 Juniors']) # csv_writer.writerow([ # 'Junior Name', 'Points', 'Rank' # # Add more fields as needed # ]) # # for junior in queryset: # row = [ # f"{junior.junior.auth.first_name}{junior.junior.auth.last_name}" if junior.junior.auth.last_name else junior.junior.auth.first_name, # junior.total_points, junior.rank # ] # csv_writer.writerow(row) # response.write(buffer.getvalue()) # buffer.close() # # return response @action(methods=['get'], url_name='generate-report', url_path='generate-report', detail=False) def generate_report(self, request): response = HttpResponse(content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') response['Content-Disposition'] = 'attachment; filename="ZOD_Bank_Analytics.xlsx"' end_date = datetime.date.today() start_date = end_date - datetime.timedelta(days=6) if request.query_params.get('start_date') and request.query_params.get('end_date'): start_date = datetime.datetime.strptime(request.query_params.get('start_date'), DATE_FORMAT) end_date = datetime.datetime.strptime(request.query_params.get('end_date'), DATE_FORMAT) buffer = io.BytesIO() # Use BytesIO for binary data # Create an XlsxWriter Workbook object workbook = xlsxwriter.Workbook(buffer) # Add sheets sheets = ['Users', 'Assign Tasks', 'Top 15 Juniors'] for sheet_name in sheets: worksheet = workbook.add_worksheet(name=sheet_name) # sheet 1 for Total Users if sheet_name == 'Users': user_qs = self.get_queryset() queryset = user_qs.filter(date_joined__range=(start_date, (end_date + datetime.timedelta(days=1)))) serializer = UserCSVReportSerializer(queryset, many=True) df_users = pd.DataFrame(serializer.data) for idx, col in enumerate(df_users.columns): worksheet.write(0, idx, col) # Write header for row_num, row in enumerate(df_users.values, start=1): for col_num, value in enumerate(row): worksheet.write(row_num, col_num, value) # sheet 2 for Assign Task elif sheet_name == 'Assign Tasks': assign_tasks = JuniorTask.objects.filter( created_at__range=[start_date, (end_date + datetime.timedelta(days=1))] ).exclude(task_status__in=[PENDING, EXPIRED]) df_tasks = pd.DataFrame([ {'Task Name': task.task_name, 'Task Status': dict(TASK_STATUS).get(task.task_status).capitalize()} for task in assign_tasks ]) for idx, col in enumerate(df_tasks.columns): worksheet.write(0, idx, col) # Write header for row_num, row in enumerate(df_tasks.values, start=1): for col_num, value in enumerate(row): worksheet.write(row_num, col_num, value) # sheet 3 for Juniors Leaderboard and rank elif sheet_name == 'Top 15 Juniors': queryset = JuniorPoints.objects.prefetch_related('junior', 'junior__auth').annotate(rank=Window( expression=Rank(), order_by=[F('total_points').desc(), 'junior__created_at'] )).order_by('-total_points', 'junior__created_at') df_leaderboard = pd.DataFrame([ { 'Junior Name': f"{junior.junior.auth.first_name} {junior.junior.auth.last_name}" if junior.junior.auth.last_name else junior.junior.auth.first_name, 'Points': junior.total_points, 'Rank': junior.rank } for junior in queryset ]) for idx, col in enumerate(df_leaderboard.columns): worksheet.write(0, idx, col) # Write header for row_num, row in enumerate(df_leaderboard.values, start=1): for col_num, value in enumerate(row): worksheet.write(row_num, col_num, value) # Close the workbook to save the content workbook.close() # Reset the buffer position and write the content to the response buffer.seek(0) response.write(buffer.getvalue()) buffer.close() return response