Files
zod-backend/notifications/views.py
2023-08-07 17:06:31 +05:30

72 lines
3.0 KiB
Python

"""
notifications views file
"""
# django imports
from django.db.models import Q
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework import viewsets, status, views
# local imports
from account.utils import custom_response, custom_error_response
from base.messages import SUCCESS_CODE, ERROR_CODE
from notifications.constants import TEST_NOTIFICATION
from notifications.serializers import RegisterDevice, NotificationListSerailizer, ReadNotificationSerializer
from notifications.utils import send_notification
from notifications.models import Notification
class NotificationViewSet(viewsets.GenericViewSet):
""" used to do the notification actions """
serializer_class = RegisterDevice
permission_classes = [IsAuthenticated, ]
@action(methods=['post'], detail=False, url_path='device', url_name='device', serializer_class=RegisterDevice)
def fcm_registration(self, request):
"""
used to save the fcm token
"""
serializer = self.get_serializer_class()(data=request.data,
context={'user_id': request.auth.payload['user_id']})
serializer.is_valid(raise_exception=True)
serializer.save()
return custom_response(SUCCESS_CODE["3000"])
@action(methods=['get'], detail=False, url_path='test', url_name='test')
def send_test_notification(self, request):
"""
to send test notification
:return:
"""
send_notification.delay(TEST_NOTIFICATION, None, request.auth.payload['user_id'], {})
return custom_response(SUCCESS_CODE["3000"])
@action(methods=['get'], detail=False, url_path='list', url_name='list',
serializer_class=NotificationListSerailizer)
def notification_list(self, request):
"""
notification list
"""
try:
queryset = Notification.objects.filter(notification_to=request.user)
serializer = NotificationListSerailizer(queryset, many=True)
return custom_response(None, serializer.data, response_status=status.HTTP_200_OK)
except Exception as e:
return custom_error_response(str(e), response_status=status.HTTP_400_BAD_REQUEST)
class ReadNotification(views.APIView):
"""Update notification API"""
serializer_class = ReadNotificationSerializer
model = Notification
permission_classes = [IsAuthenticated]
def put(self, request, format=None):
try:
notification_id = self.request.data.get('notification_id')
notification_queryset = Notification.objects.filter(id__in=notification_id,
notification_to=self.request.user).update(is_read=True)
if notification_queryset:
return custom_response(SUCCESS_CODE['3039'], response_status=status.HTTP_200_OK)
except Exception as e:
return custom_error_response(str(e), response_status=status.HTTP_400_BAD_REQUEST)