Files
zod-backend/notifications/utils.py
abutalib-kiwi 4002d1e561 celery settings
2023-07-19 17:48:27 +05:30

72 lines
2.7 KiB
Python

"""
notifications utils file
"""
# third party imports
from fcm_django.models import FCMDevice
from celery import shared_task
from firebase_admin.messaging import Message, Notification as FirebaseNotification
# django imports
from django.contrib.auth import get_user_model
from account.models import UserNotification
from notifications.constants import NOTIFICATION_DICT
from notifications.models import Notification
# local imports
User = get_user_model()
def register_fcm_token(user_id, registration_id, device_id, device_type):
""" used to register the fcm device token"""
device, _ = FCMDevice.objects.update_or_create(device_id=device_id,
defaults={'user_id': user_id, 'type': device_type,
'active': True,
'registration_id': registration_id})
return device
def remove_fcm_token(user_id: int, access_token: str, registration_id) -> None:
"""
remove access_token and registration_token
"""
try:
# remove fcm token for this device
FCMDevice.objects.filter(user_id=user_id).delete()
except Exception as e:
print(e)
def get_basic_detail(notification_type, from_user_id, to_user_id):
""" used to get the basic details """
notification_data = NOTIFICATION_DICT[notification_type]
from_user = User.objects.get(id=from_user_id) if from_user_id else None
to_user = User.objects.get(id=to_user_id)
return notification_data, from_user, to_user
@shared_task()
def send_notification(notification_type, from_user_id, to_user_id, extra_data):
""" used to send the push for the given notification type """
(notification_data, from_user, to_user) = get_basic_detail(notification_type, from_user_id, to_user_id)
print(notification_data, to_user)
user_notification_type = UserNotification.objects.filter(user=to_user).first()
# data = notification_data.data
data = notification_data
Notification.objects.create(notification_type=notification_type, notification_from=from_user,
notification_to=to_user, data=data)
if user_notification_type.push_notification:
data.update({'badge': Notification.objects.filter(notification_to=to_user, is_read=False).count()})
send_push(to_user, data)
def send_push(user, data):
""" used to send push notification to specific user """
# if user.push_notification:
notification_data = data.pop('data', None)
user.fcmdevice_set.filter(active=True).send_message(
Message(notification=FirebaseNotification(data['title'], data['body']), data=notification_data)
)