Files
zod-backend/notifications/utils.py
2023-08-16 16:40:34 +05:30

122 lines
4.5 KiB
Python

"""
notifications utils file
"""
# third party imports
from fcm_django.models import FCMDevice
from celery import shared_task
from firebase_admin.messaging import Message, Notification as FirebaseNotification
# django imports
from django.contrib.auth import get_user_model
from account.models import UserNotification
from account.utils import get_user_full_name
from guardian.models import Guardian
from junior.models import Junior
from notifications.constants import NOTIFICATION_DICT
from notifications.models import Notification
# local imports
User = get_user_model()
def register_fcm_token(user_id, registration_id, device_id, device_type):
""" used to register the fcm device token"""
device, _ = FCMDevice.objects.update_or_create(device_id=device_id,
defaults={'user_id': user_id, 'type': device_type,
'active': True,
'registration_id': registration_id})
return device
def remove_fcm_token(user_id: int, access_token: str, registration_id) -> None:
"""
remove access_token and registration_token
"""
try:
# remove fcm token for this device
FCMDevice.objects.filter(user_id=user_id).delete()
except Exception as e:
print(e)
def get_basic_detail(from_user_id, to_user_id):
"""
used to get the basic details
"""
from_user = User.objects.get(id=from_user_id) if from_user_id else None
to_user = User.objects.get(id=to_user_id)
return from_user, to_user
def get_notification_data(notification_type, from_user, to_user, extra_data):
"""
get notification and push data
:param notification_type: notification_type
:param from_user: from_user obj
:param to_user: to_user obj
:param extra_data: any extra data provided
:return: notification and push data
"""
push_data = NOTIFICATION_DICT[notification_type].copy()
notification_data = push_data.copy()
notification_data['to_user'] = get_user_full_name(to_user)
if from_user:
from_user_name = get_user_full_name(from_user)
push_data['body'] = push_data['body'].format(from_user=from_user_name)
notification_data['body'] = notification_data['body'].format(from_user=from_user_name)
notification_data['from_user'] = from_user_name
notification_data.update(extra_data)
return notification_data, push_data
def send_notification(notification_type, from_user, to_user, extra_data):
"""
used to send the push for the given notification type
"""
# (from_user, to_user) = get_basic_detail(from_user_id, to_user_id)
notification_data, push_data = get_notification_data(notification_type, from_user, to_user, extra_data)
user_notification_type = UserNotification.objects.filter(user=to_user).first()
notification_data.update({'badge': Notification.objects.filter(notification_to=to_user, is_read=False).count()})
Notification.objects.create(notification_type=notification_type, notification_from=from_user,
notification_to=to_user, data=notification_data)
if user_notification_type.push_notification:
send_push(to_user, push_data)
def send_push(user, data):
""" used to send push notification to specific user """
user.fcmdevice_set.filter(active=True).send_message(
Message(notification=FirebaseNotification(data['title'], data['body']), data=data)
)
@shared_task()
def send_notification_to_guardian(notification_type, from_user_id, to_user_id, extra_data):
from_user = None
if from_user_id:
from_user = Junior.objects.filter(auth_id=from_user_id).select_related('auth').first()
extra_data['from_user_image'] = from_user.image
from_user = from_user.auth
to_user = Guardian.objects.filter(user_id=to_user_id).select_related('user').first()
extra_data['to_user_image'] = to_user.image
send_notification(notification_type, from_user, to_user.user, extra_data)
@shared_task()
def send_notification_to_junior(notification_type, from_user_id, to_user_id, extra_data):
from_user = None
if from_user_id:
from_user = Guardian.objects.filter(user_id=from_user_id).select_related('user').first()
extra_data['from_user_image'] = from_user.image
from_user = from_user.user
to_user = Junior.objects.filter(auth_id=to_user_id).select_related('auth').first()
extra_data['to_user_image'] = to_user.image
send_notification(notification_type, from_user, to_user.auth, extra_data)