Compare commits

..

9 Commits

8 changed files with 275 additions and 33 deletions

View File

@ -99,3 +99,6 @@ uritemplate==4.1.1
urllib3==1.26.16
vine==5.0.0
wcwidth==0.2.6
pandas==2.0.3
XlsxWriter==3.1.2

18
web_admin/pagination.py Normal file
View File

@ -0,0 +1,18 @@
"""
web_admin pagination file
"""
# third party imports
from rest_framework.pagination import PageNumberPagination
from base.constants import NUMBER
class CustomPageNumberPagination(PageNumberPagination):
"""
custom paginator class
"""
# Set the desired page size
page_size = NUMBER['ten']
page_size_query_param = 'page_size'
# Set a maximum page size if needed
max_page_size = NUMBER['hundred']

View File

@ -1,12 +1,24 @@
"""
web_admin analytics serializer file
"""
# third party imports
from rest_framework import serializers
# django imports
from django.contrib.auth import get_user_model
# local imports
from base.constants import USER_TYPE
from junior.models import JuniorPoints, Junior
USER = get_user_model()
class JuniorLeaderboardSerializer(serializers.ModelSerializer):
"""
junior leaderboard serializer
"""
name = serializers.SerializerMethodField()
first_name = serializers.SerializerMethodField()
last_name = serializers.SerializerMethodField()
@ -44,9 +56,75 @@ class JuniorLeaderboardSerializer(serializers.ModelSerializer):
class LeaderboardSerializer(serializers.ModelSerializer):
"""
leaderboard serializer
"""
junior = JuniorLeaderboardSerializer()
rank = serializers.IntegerField()
class Meta:
"""
meta class
"""
model = JuniorPoints
fields = ('total_points', 'rank', 'junior')
class UserCSVReportSerializer(serializers.ModelSerializer):
"""
user csv/xls report serializer
"""
phone_number = serializers.SerializerMethodField()
user_type = serializers.SerializerMethodField()
is_active = serializers.SerializerMethodField()
date_joined = serializers.SerializerMethodField()
class Meta:
"""
meta class
"""
model = USER
fields = ('first_name', 'last_name', 'email', 'phone_number', 'user_type', 'is_active', 'date_joined')
@staticmethod
def get_phone_number(obj):
"""
:param obj: user object
:return: user phone number
"""
if profile := (obj.guardian_profile.all().first() or obj.junior_profile.all().first()):
return f"+{profile.country_code}{profile.phone}" \
if profile.country_code and profile.phone else profile.phone
else:
return None
@staticmethod
def get_user_type(obj):
"""
:param obj: user object
:return: user type
"""
if obj.guardian_profile.all().first():
return dict(USER_TYPE).get('2').capitalize()
elif obj.junior_profile.all().first():
return dict(USER_TYPE).get('1').capitalize()
else:
return None
@staticmethod
def get_is_active(obj):
"""
:param obj: user object
:return: user type
"""
if profile := (obj.guardian_profile.all().first() or obj.junior_profile.all().first()):
return "Active" if profile.is_active else "Inactive"
@staticmethod
def get_date_joined(obj):
"""
:param obj: user obj
:return: formatted date
"""
date = obj.date_joined.strftime("%d %b %Y")
return date

View File

@ -240,7 +240,7 @@ class ArticleListSerializer(serializers.ModelSerializer):
"""complete all question"""
junior_article = JuniorArticle.objects.filter(article=obj).last()
if junior_article:
junior_article.is_completed
return junior_article.is_completed
return False
class ArticleQuestionSerializer(serializers.ModelSerializer):

View File

@ -5,7 +5,7 @@ web_admin user_management serializers file
from rest_framework import serializers
from django.contrib.auth import get_user_model
from base.constants import USER_TYPE
from base.constants import USER_TYPE, GUARDIAN, JUNIOR
# local imports
from base.messages import ERROR_CODE, SUCCESS_CODE
from guardian.models import Guardian
@ -210,10 +210,10 @@ class JuniorSerializer(serializers.ModelSerializer):
"""
instance.auth.email = self.validated_data.get('email', instance.auth.email)
instance.auth.username = self.validated_data.get('email', instance.auth.username)
instance.auth.save()
instance.auth.save(update_fields=['email', 'username'])
instance.country_code = validated_data.get('country_code', instance.country_code)
instance.phone = validated_data.get('phone', instance.phone)
instance.save()
instance.save(update_fields=['country_code', 'phone'])
return instance
@staticmethod
@ -265,33 +265,30 @@ class UserManagementDetailSerializer(serializers.ModelSerializer):
model = USER
fields = ('id', 'user_type', 'email', 'guardian_profile', 'junior_profile', 'associated_users')
@staticmethod
def get_user_type(obj):
def get_user_type(self, obj):
"""
:param obj: user object
:return: user type
"""
if obj.guardian_profile.all().first():
return dict(USER_TYPE).get('2')
elif obj.junior_profile.all().first():
return dict(USER_TYPE).get('1')
else:
return None
return GUARDIAN if self.context['user_type'] == GUARDIAN else JUNIOR
@staticmethod
def get_associated_users(obj):
def get_associated_users(self, obj):
"""
:param obj: user object
:return: associated user
"""
if profile := obj.guardian_profile.all().first():
if self.context['user_type'] == GUARDIAN:
profile = obj.guardian_profile.all().only('user_id', 'guardian_code').first()
if profile.guardian_code:
junior = Junior.objects.filter(guardian_code__contains=[profile.guardian_code], is_verified=True)
junior = Junior.objects.filter(guardian_code__contains=[profile.guardian_code],
is_verified=True).select_related('auth')
serializer = JuniorSerializer(junior, many=True)
return serializer.data
elif profile := obj.junior_profile.all().first():
elif self.context['user_type'] == JUNIOR:
profile = obj.junior_profile.all().only('auth_id', 'guardian_code').first()
if profile.guardian_code:
guardian = Guardian.objects.filter(guardian_code__in=profile.guardian_code, is_verified=True)
guardian = Guardian.objects.filter(guardian_code__in=profile.guardian_code,
is_verified=True).select_related('user')
serializer = GuardianSerializer(guardian, many=True)
return serializer.data
else:

View File

@ -1,22 +1,38 @@
"""
web_admin analytics view file
"""
# python imports
import datetime
import io
import pandas as pd
import xlsxwriter
import tempfile
import oss2
from django.conf import settings
# third party imports
from rest_framework.viewsets import GenericViewSet
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
# django imports
from django.contrib.auth import get_user_model
from django.db.models import Q
from django.db.models import Count
from django.db.models.functions import TruncDate
from django.db.models import F, Window
from django.db.models.functions.window import Rank
from django.http import HttpResponse
# local imports
from account.utils import custom_response
from base.constants import PENDING, IN_PROGRESS, REJECTED, REQUESTED, COMPLETED, EXPIRED, DATE_FORMAT
from base.constants import PENDING, IN_PROGRESS, REJECTED, REQUESTED, COMPLETED, EXPIRED, DATE_FORMAT, TASK_STATUS
from guardian.models import JuniorTask
from junior.models import JuniorPoints
from web_admin.serializers.analytics_serializer import LeaderboardSerializer
from web_admin.pagination import CustomPageNumberPagination
from web_admin.permission import AdminPermission
from web_admin.serializers.analytics_serializer import LeaderboardSerializer, UserCSVReportSerializer
from web_admin.serializers.user_management_serializer import UserManagementListSerializer
USER = get_user_model()
@ -30,6 +46,7 @@ class AnalyticsViewSet(GenericViewSet):
to get junior leaderboard and ranking
"""
serializer_class = None
permission_classes = [IsAuthenticated, AdminPermission]
def get_queryset(self):
user_qs = USER.objects.filter(
@ -128,7 +145,125 @@ class AnalyticsViewSet(GenericViewSet):
expression=Rank(),
order_by=[F('total_points').desc(), 'junior__created_at']
)).order_by('-total_points', 'junior__created_at')
paginator = self.pagination_class()
paginator = CustomPageNumberPagination()
paginated_queryset = paginator.paginate_queryset(queryset, request)
serializer = self.serializer_class(paginated_queryset, many=True)
return custom_response(None, serializer.data)
@action(methods=['get'], url_name='export-excel', url_path='export-excel', detail=False)
def export_excel(self, request):
"""
to export users count, task details and top juniors in csv/excel file
:param request: start_date: date format (yyyy-mm-dd)
:param request: end_date: date format (yyyy-mm-dd)
:return:
"""
response = HttpResponse(content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
response['Content-Disposition'] = 'attachment; filename="ZOD_Bank_Analytics.xlsx"'
end_date = datetime.date.today()
start_date = end_date - datetime.timedelta(days=6)
if request.query_params.get('start_date') and request.query_params.get('end_date'):
start_date = datetime.datetime.strptime(request.query_params.get('start_date'), DATE_FORMAT)
end_date = datetime.datetime.strptime(request.query_params.get('end_date'), DATE_FORMAT)
# Use BytesIO for binary data
buffer = io.BytesIO()
# Create an XlsxWriter Workbook object
workbook = xlsxwriter.Workbook(buffer)
# Add sheets
sheets = ['Users', 'Assign Tasks', 'Juniors leaderboard']
for sheet_name in sheets:
worksheet = workbook.add_worksheet(name=sheet_name)
# sheet 1 for Total Users
if sheet_name == 'Users':
user_qs = self.get_queryset()
queryset = user_qs.filter(date_joined__range=(start_date, (end_date + datetime.timedelta(days=1))))
serializer = UserCSVReportSerializer(queryset, many=True)
df_users = pd.DataFrame([
{'Name': f"{user['first_name']} {user['last_name']}",
'Email': user['email'], 'Phone Number': user['phone_number'],
'User Type': user['user_type'], 'Status': user['is_active'],
'Date Joined': user['date_joined']}
for user in serializer.data
])
for idx, col in enumerate(df_users.columns):
# Write header
worksheet.write(0, idx, col)
for row_num, row in enumerate(df_users.values, start=1):
for col_num, value in enumerate(row):
worksheet.write(row_num, col_num, value)
# sheet 2 for Assign Task
elif sheet_name == 'Assign Tasks':
assign_tasks = JuniorTask.objects.filter(
created_at__range=[start_date, (end_date + datetime.timedelta(days=1))]
).exclude(task_status__in=[PENDING, EXPIRED])
df_tasks = pd.DataFrame([
{'Task Name': task.task_name, 'Task Status': dict(TASK_STATUS).get(task.task_status).capitalize()}
for task in assign_tasks
])
for idx, col in enumerate(df_tasks.columns):
# Write header
worksheet.write(0, idx, col)
for row_num, row in enumerate(df_tasks.values, start=1):
for col_num, value in enumerate(row):
worksheet.write(row_num, col_num, value)
# sheet 3 for Juniors Leaderboard and rank
elif sheet_name == 'Juniors leaderboard':
queryset = JuniorPoints.objects.prefetch_related('junior', 'junior__auth').annotate(rank=Window(
expression=Rank(),
order_by=[F('total_points').desc(), 'junior__created_at']
)).order_by('-total_points', 'junior__created_at')
df_leaderboard = pd.DataFrame([
{
'Junior Name': f"{junior.junior.auth.first_name} {junior.junior.auth.last_name}"
if junior.junior.auth.last_name else junior.junior.auth.first_name,
'Points': junior.total_points,
'Rank': junior.rank
}
for junior in queryset
])
for idx, col in enumerate(df_leaderboard.columns):
# Write header
worksheet.write(0, idx, col)
for row_num, row in enumerate(df_leaderboard.values, start=1):
for col_num, value in enumerate(row):
worksheet.write(row_num, col_num, value)
# Close the workbook to save the content
workbook.close()
# Reset the buffer position and write the content to the response
buffer.seek(0)
response.write(buffer.getvalue())
buffer.close()
filename = f"{'analytics'}/{'ZOD_Bank_Analytics.xlsx'}"
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
"""write image in temporary file"""
temp_file.write(response.content)
"""auth of bucket"""
auth = oss2.Auth(settings.ALIYUN_OSS_ACCESS_KEY_ID, settings.ALIYUN_OSS_ACCESS_KEY_SECRET)
"""fetch bucket details"""
bucket = oss2.Bucket(auth, settings.ALIYUN_OSS_ENDPOINT, settings.ALIYUN_OSS_BUCKET_NAME)
# Upload the temporary file to Alibaba OSS
bucket.put_object_from_file(filename, temp_file.name)
"""create perfect url for image"""
new_filename = filename.replace(' ', '%20')
link = f"https://{settings.ALIYUN_OSS_BUCKET_NAME}.{settings.ALIYUN_OSS_ENDPOINT}/{new_filename}"
print(link)
return custom_response(None, link)

View File

@ -14,6 +14,8 @@ from django.db.models import Q
from account.utils import custom_response, custom_error_response
from base.constants import USER_TYPE
from base.messages import SUCCESS_CODE, ERROR_CODE
from guardian.models import Guardian
from junior.models import Junior
from web_admin.permission import AdminPermission
from web_admin.serializers.user_management_serializer import (UserManagementListSerializer,
UserManagementDetailSerializer, GuardianSerializer,
@ -70,12 +72,14 @@ class UserManagementViewSet(GenericViewSet, mixins.ListModelMixin,
"""
if self.request.query_params.get('user_type') not in [dict(USER_TYPE).get('1'), dict(USER_TYPE).get('2')]:
return custom_error_response(ERROR_CODE['2067'], status.HTTP_400_BAD_REQUEST)
queryset = self.queryset
if self.request.query_params.get('user_type') == dict(USER_TYPE).get('2'):
queryset = queryset.filter(guardian_profile__user__id=kwargs['pk'])
elif self.request.query_params.get('user_type') == dict(USER_TYPE).get('1'):
queryset = queryset.filter(junior_profile__auth__id=kwargs['pk'])
serializer = UserManagementDetailSerializer(queryset, many=True)
queryset = queryset.filter(id=kwargs['pk'])
serializer = UserManagementDetailSerializer(
queryset, many=True,
context={'user_type': self.request.query_params.get('user_type')}
)
return custom_response(None, data=serializer.data)
def partial_update(self, request, *args, **kwargs):
@ -87,15 +91,14 @@ class UserManagementViewSet(GenericViewSet, mixins.ListModelMixin,
"""
if self.request.query_params.get('user_type') not in [dict(USER_TYPE).get('1'), dict(USER_TYPE).get('2')]:
return custom_error_response(ERROR_CODE['2067'], status.HTTP_400_BAD_REQUEST)
queryset = self.queryset
if self.request.query_params.get('user_type') == dict(USER_TYPE).get('2'):
user_obj = queryset.filter(guardian_profile__user__id=kwargs['pk']).first()
serializer = GuardianSerializer(user_obj.guardian_profile.all().first(),
guardian = Guardian.objects.filter(user_id=kwargs['pk'], is_verified=True).first()
serializer = GuardianSerializer(guardian,
request.data, context={'user_id': kwargs['pk']})
elif self.request.query_params.get('user_type') == dict(USER_TYPE).get('1'):
user_obj = queryset.filter(junior_profile__auth__id=kwargs['pk']).first()
serializer = JuniorSerializer(user_obj.junior_profile.all().first(),
junior = Junior.objects.filter(auth_id=kwargs['pk'], is_verified=True).select_related('auth').first()
serializer = JuniorSerializer(junior,
request.data, context={'user_id': kwargs['pk']})
serializer.is_valid(raise_exception=True)
@ -114,12 +117,12 @@ class UserManagementViewSet(GenericViewSet, mixins.ListModelMixin,
return custom_error_response(ERROR_CODE['2067'], status.HTTP_400_BAD_REQUEST)
queryset = self.queryset
if self.request.query_params.get('user_type') == dict(USER_TYPE).get('2'):
user_obj = queryset.filter(guardian_profile__user__id=kwargs['pk']).first()
user_obj = queryset.filter(id=kwargs['pk']).first()
obj = user_obj.guardian_profile.all().first()
obj.is_active = False if obj.is_active else True
obj.save()
elif self.request.query_params.get('user_type') == dict(USER_TYPE).get('1'):
user_obj = queryset.filter(junior_profile__auth__id=kwargs['pk']).first()
user_obj = queryset.filter(id=kwargs['pk']).first()
obj = user_obj.junior_profile.all().first()
obj.is_active = False if obj.is_active else True
obj.save()

View File

@ -125,6 +125,7 @@ SIMPLE_JWT = {
# Database
# https://docs.djangoproject.com/en/3.0/ref/settings/#databases
DATABASES = {
# default db setting
'default': {
'ENGINE': 'django.contrib.gis.db.backends.postgis',
'NAME':os.getenv('DB_NAME'),
@ -177,6 +178,9 @@ AUTH_PASSWORD_VALIDATORS = [
},
]
# database query logs settings
# Allows us to check db hits
# useful to optimize db query and hit
LOGGING = {
"version": 1,
"filters": {
@ -193,6 +197,7 @@ LOGGING = {
"class": "logging.StreamHandler"
}
},
# database logger
"loggers": {
"django.db.backends": {
"level": "DEBUG",
@ -242,6 +247,7 @@ CORS_ALLOW_HEADERS = (
'x-requested-with',
)
# CORS header settings
CORS_EXPOSE_HEADERS = (
'Access-Control-Allow-Origin: *',
)
@ -297,5 +303,7 @@ STATIC_URL = 'static/'
# define static root
STATIC_ROOT = 'static'
# media url
MEDIA_URL = "/media/"
# media path
MEDIA_ROOT = os.path.join(os.path.dirname(BASE_DIR), 'media')