Files
zod-backend/notifications/utils.py
2023-10-06 18:10:56 +05:30

166 lines
7.1 KiB
Python

"""
notifications utils file
"""
# third party imports
from fcm_django.models import FCMDevice
from celery import shared_task
from firebase_admin.messaging import Message, Notification as FirebaseNotification
# django imports
from django.contrib.auth import get_user_model
from django.db.models import Q
# local imports
from account.models import UserNotification
from account.utils import get_user_full_name
from base.constants import GUARDIAN, JUNIOR
from guardian.models import Guardian, JuniorTask
from junior.models import Junior
from notifications.constants import NOTIFICATION_DICT
from notifications.models import Notification
User = get_user_model()
def register_fcm_token(user_id, registration_id, device_id, device_type):
""" used to register the fcm device token"""
FCMDevice.objects.filter(registration_id=registration_id).delete()
device, _ = FCMDevice.objects.update_or_create(user_id=user_id,
defaults={'device_id': device_id, 'type': device_type,
'active': True,
'registration_id': registration_id})
return device
def remove_fcm_token(user_id: int, access_token: str, registration_id) -> None:
"""
remove access_token and registration_token
"""
try:
# remove fcm token for this device
FCMDevice.objects.filter(user_id=user_id).delete()
except Exception as e:
print(e)
def get_from_user_details(from_user_id, from_user_type):
"""
used to get from user details
"""
from_user = None
from_user_name = None
from_user_image = None
if from_user_id:
if from_user_type == GUARDIAN:
guardian = Guardian.objects.filter(user_id=from_user_id).select_related('user').first()
from_user = guardian.user
from_user_name = get_user_full_name(from_user)
from_user_image = guardian.image
elif from_user_type == JUNIOR:
junior = Junior.objects.filter(auth_id=from_user_id).select_related('auth').first()
from_user = junior.auth
from_user_name = get_user_full_name(from_user)
from_user_image = junior.image
return from_user_name, from_user_image, from_user
def get_notification_data(notification_type, from_user_id, from_user_type, to_user_id, extra_data):
"""
get notification and push data
:param from_user_type: GUARDIAN or JUNIOR
:param notification_type: notification_type
:param from_user_id: from_user obj
:param to_user_id: to_user obj
:param extra_data: any extra data provided
:return: notification and push data
"""
push_data = NOTIFICATION_DICT[notification_type].copy()
notification_data = push_data.copy()
task_name = None
points = extra_data.get('points', None)
if 'task_id' in extra_data:
task = JuniorTask.objects.filter(id=extra_data.get('task_id')).first()
task_name = task.task_name
extra_data['task_name'] = task_name
extra_data['task_image'] = task.image if task.image else task.default_image
from_user_name, from_user_image, from_user = get_from_user_details(from_user_id, from_user_type)
push_data['body'] = push_data['body'].format(from_user=from_user_name, task_name=task_name, points=points)
notification_data['body'] = notification_data['body'].format(from_user=from_user_name,
task_name=task_name, points=points)
push_data['to_user_type'] = GUARDIAN if from_user_type == JUNIOR else JUNIOR
notification_data['to_user_type'] = GUARDIAN if from_user_type == JUNIOR else JUNIOR
notification_data['from_user'] = from_user_name
notification_data['from_user_image'] = from_user_image
notification_data.update(extra_data)
to_user = User.objects.filter(id=to_user_id).first()
return notification_data, push_data, from_user, to_user
@shared_task()
def send_notification(notification_type, from_user_id, from_user_type, to_user_id, extra_data):
"""
used to send the push for the given notification type
"""
notification_data, push_data, from_user, to_user = get_notification_data(notification_type, from_user_id,
from_user_type, to_user_id, extra_data)
user_notification_type = UserNotification.objects.filter(user=to_user).first()
# notification create method changed on 28sep as per changes required
task_id = extra_data['task_id'] if 'task_id' in extra_data else None
Notification.objects.update_or_create(data__has_key='task_id', data__task_id=task_id,
notification_from=from_user, notification_to=to_user,
defaults={
'notification_type': notification_type,
'notification_from': from_user,
'notification_to': to_user,
'data': notification_data
})
if user_notification_type and user_notification_type.push_notification:
send_push(to_user, push_data)
def send_push(user, data):
""" used to send push notification to specific user """
data['notification_type'] = str(data['notification_type'])
user.fcmdevice_set.filter(active=True).send_message(
Message(notification=FirebaseNotification(data['title'], data['body']), data=data)
)
def send_multiple_push(queryset, data):
""" used to send same notification to multiple users """
data['notification_type'] = str(data['notification_type'])
FCMDevice.objects.filter(user__in=queryset, active=True).send_message(
Message(notification=FirebaseNotification(data['title'], data['body']), data=data)
)
@shared_task()
def send_notification_multiple_user(notification_type, from_user_id, from_user_type,
extra_data: dict = {}):
"""
used to send notification to multiple user for the given notification type
"""
to_user_list = User.objects.filter(junior_profile__is_verified=True, is_superuser=False
).exclude(junior_profile__isnull=True, guardian_profile__isnull=True)
notification_data, push_data, from_user, _ = get_notification_data(notification_type, from_user_id,
from_user_type, None, extra_data)
notification_list = []
for user in to_user_list:
notification_list.append(Notification(notification_type=notification_type,
notification_to=user,
notification_from=from_user,
data=notification_data))
Notification.objects.bulk_create(notification_list)
to_user_list = to_user_list.filter(user_notification__push_notification=True)
send_multiple_push(to_user_list, push_data)