mirror of
https://github.com/HamzaSha1/zod-backend.git
synced 2025-07-11 15:48:13 +00:00
122 lines
4.5 KiB
Python
122 lines
4.5 KiB
Python
"""
|
|
notifications utils file
|
|
"""
|
|
# third party imports
|
|
from fcm_django.models import FCMDevice
|
|
from celery import shared_task
|
|
from firebase_admin.messaging import Message, Notification as FirebaseNotification
|
|
|
|
# django imports
|
|
from django.contrib.auth import get_user_model
|
|
|
|
from account.models import UserNotification
|
|
from account.utils import get_user_full_name
|
|
from guardian.models import Guardian
|
|
from junior.models import Junior
|
|
from notifications.constants import NOTIFICATION_DICT
|
|
from notifications.models import Notification
|
|
|
|
# local imports
|
|
|
|
|
|
User = get_user_model()
|
|
|
|
|
|
def register_fcm_token(user_id, registration_id, device_id, device_type):
|
|
""" used to register the fcm device token"""
|
|
device, _ = FCMDevice.objects.update_or_create(device_id=device_id,
|
|
defaults={'user_id': user_id, 'type': device_type,
|
|
'active': True,
|
|
'registration_id': registration_id})
|
|
return device
|
|
|
|
|
|
def remove_fcm_token(user_id: int, access_token: str, registration_id) -> None:
|
|
"""
|
|
remove access_token and registration_token
|
|
"""
|
|
try:
|
|
# remove fcm token for this device
|
|
FCMDevice.objects.filter(user_id=user_id).delete()
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
|
|
def get_basic_detail(from_user_id, to_user_id):
|
|
"""
|
|
used to get the basic details
|
|
"""
|
|
from_user = User.objects.get(id=from_user_id) if from_user_id else None
|
|
to_user = User.objects.get(id=to_user_id)
|
|
return from_user, to_user
|
|
|
|
|
|
def get_notification_data(notification_type, from_user, to_user, extra_data):
|
|
"""
|
|
get notification and push data
|
|
:param notification_type: notification_type
|
|
:param from_user: from_user obj
|
|
:param to_user: to_user obj
|
|
:param extra_data: any extra data provided
|
|
:return: notification and push data
|
|
"""
|
|
push_data = NOTIFICATION_DICT[notification_type].copy()
|
|
notification_data = push_data.copy()
|
|
notification_data['to_user'] = get_user_full_name(to_user)
|
|
if from_user:
|
|
from_user_name = get_user_full_name(from_user)
|
|
push_data['body'] = push_data['body'].format(from_user=from_user_name)
|
|
notification_data['body'] = notification_data['body'].format(from_user=from_user_name)
|
|
notification_data['from_user'] = from_user_name
|
|
|
|
notification_data.update(extra_data)
|
|
|
|
return notification_data, push_data
|
|
|
|
|
|
def send_notification(notification_type, from_user, to_user, extra_data):
|
|
"""
|
|
used to send the push for the given notification type
|
|
"""
|
|
# (from_user, to_user) = get_basic_detail(from_user_id, to_user_id)
|
|
notification_data, push_data = get_notification_data(notification_type, from_user, to_user, extra_data)
|
|
user_notification_type = UserNotification.objects.filter(user=to_user).first()
|
|
notification_data.update({'badge': Notification.objects.filter(notification_to=to_user, is_read=False).count()})
|
|
Notification.objects.create(notification_type=notification_type, notification_from=from_user,
|
|
notification_to=to_user, data=notification_data)
|
|
if user_notification_type.push_notification:
|
|
send_push(to_user, push_data)
|
|
|
|
|
|
def send_push(user, data):
|
|
""" used to send push notification to specific user """
|
|
user.fcmdevice_set.filter(active=True).send_message(
|
|
Message(notification=FirebaseNotification(data['title'], data['body']), data=data)
|
|
)
|
|
|
|
|
|
@shared_task()
|
|
def send_notification_to_guardian(notification_type, from_user_id, to_user_id, extra_data):
|
|
from_user = None
|
|
if from_user_id:
|
|
from_user = Junior.objects.filter(auth_id=from_user_id).select_related('auth').first()
|
|
extra_data['from_user_image'] = from_user.image
|
|
from_user = from_user.auth
|
|
to_user = Guardian.objects.filter(user_id=to_user_id).select_related('user').first()
|
|
extra_data['to_user_image'] = to_user.image
|
|
send_notification(notification_type, from_user, to_user.user, extra_data)
|
|
|
|
|
|
@shared_task()
|
|
def send_notification_to_junior(notification_type, from_user_id, to_user_id, extra_data):
|
|
from_user = None
|
|
if from_user_id:
|
|
from_user = Guardian.objects.filter(user_id=from_user_id).select_related('user').first()
|
|
extra_data['from_user_image'] = from_user.image
|
|
from_user = from_user.user
|
|
to_user = Junior.objects.filter(auth_id=to_user_id).select_related('auth').first()
|
|
extra_data['to_user_image'] = to_user.image
|
|
send_notification(notification_type, from_user, to_user.auth, extra_data)
|
|
|
|
|