""" notifications utils file """ # third party imports from fcm_django.models import FCMDevice from celery import shared_task from firebase_admin.messaging import Message, Notification as FirebaseNotification # django imports from django.contrib.auth import get_user_model from account.models import UserNotification from notifications.constants import NOTIFICATION_DICT from notifications.models import Notification # local imports User = get_user_model() def register_fcm_token(user_id, registration_id, device_id, device_type): """ used to register the fcm device token""" device, _ = FCMDevice.objects.update_or_create(device_id=device_id, defaults={'user_id': user_id, 'type': device_type, 'active': True, 'registration_id': registration_id}) return device def remove_fcm_token(user_id: int, access_token: str, registration_id) -> None: """ remove access_token and registration_token """ try: # remove fcm token for this device FCMDevice.objects.filter(user_id=user_id).delete() except Exception as e: print(e) def get_basic_detail(notification_type, from_user_id, to_user_id): """ used to get the basic details """ notification_data = NOTIFICATION_DICT[notification_type] from_user = User.objects.get(id=from_user_id) if from_user_id else None to_user = User.objects.get(id=to_user_id) return notification_data, from_user, to_user @shared_task() def send_notification(notification_type, from_user_id, to_user_id, extra_data): """ used to send the push for the given notification type """ (notification_data, from_user, to_user) = get_basic_detail(notification_type, from_user_id, to_user_id) user_notification_type = UserNotification.objects.filter(user=to_user).first() data = notification_data Notification.objects.create(notification_type=notification_type, notification_from=from_user, notification_to=to_user, data=data) if user_notification_type.push_notification: data.update({'badge': Notification.objects.filter(notification_to=to_user, is_read=False).count()}) send_push(to_user, data) def send_push(user, data): """ used to send push notification to specific user """ notification_data = data.pop('data', None) user.fcmdevice_set.filter(active=True).send_message( Message(notification=FirebaseNotification(data['title'], data['body']), data=notification_data) )