""" notifications utils file """ # third party imports from fcm_django.models import FCMDevice from celery import shared_task from firebase_admin.messaging import Message, Notification as FirebaseNotification # django imports from django.contrib.auth import get_user_model from django.db.models import Q # local imports from account.models import UserNotification from account.utils import get_user_full_name from base.constants import GUARDIAN, JUNIOR from guardian.models import Guardian, JuniorTask from junior.models import Junior from notifications.constants import NOTIFICATION_DICT from notifications.models import Notification User = get_user_model() def register_fcm_token(user_id, registration_id, device_id, device_type): """ used to register the fcm device token""" device, _ = FCMDevice.objects.update_or_create(device_id=device_id, defaults={'user_id': user_id, 'type': device_type, 'active': True, 'registration_id': registration_id}) return device def remove_fcm_token(user_id: int, access_token: str, registration_id) -> None: """ remove access_token and registration_token """ try: # remove fcm token for this device FCMDevice.objects.filter(user_id=user_id).delete() except Exception as e: print(e) def get_from_user_details(from_user_id, from_user_type): """ used to get from user details """ from_user = None from_user_name = None from_user_image = None if from_user_id: if from_user_type == GUARDIAN: guardian = Guardian.objects.filter(user_id=from_user_id).select_related('user').first() from_user = guardian.user from_user_name = get_user_full_name(from_user) from_user_image = guardian.image elif from_user_type == JUNIOR: junior = Junior.objects.filter(auth_id=from_user_id).select_related('auth').first() from_user = junior.auth from_user_name = get_user_full_name(from_user) from_user_image = junior.image return from_user_name, from_user_image, from_user def get_notification_data(notification_type, from_user_id, from_user_type, to_user_id, extra_data): """ get notification and push data :param from_user_type: GUARDIAN or JUNIOR :param notification_type: notification_type :param from_user_id: from_user obj :param to_user_id: to_user obj :param extra_data: any extra data provided :return: notification and push data """ push_data = NOTIFICATION_DICT[notification_type].copy() notification_data = push_data.copy() task_name = None if 'task_id' in extra_data: task = JuniorTask.objects.filter(id=extra_data.get('task_id')).first() task_name = task.task_name extra_data['task_name'] = task_name extra_data['task_image'] = task.image if task.image else task.default_image from_user_name, from_user_image, from_user = get_from_user_details(from_user_id, from_user_type) push_data['body'] = push_data['body'].format(from_user=from_user_name, task_name=task_name) notification_data['body'] = notification_data['body'].format(from_user=from_user_name, task_name=task_name) notification_data['from_user'] = from_user_name notification_data['from_user_image'] = from_user_image notification_data.update(extra_data) to_user = User.objects.get(id=to_user_id) return notification_data, push_data, from_user, to_user @shared_task() def send_notification(notification_type, from_user_id, from_user_type, to_user_id, extra_data): """ used to send the push for the given notification type """ notification_data, push_data, from_user, to_user = get_notification_data(notification_type, from_user_id, from_user_type, to_user_id, extra_data) user_notification_type = UserNotification.objects.filter(user=to_user).first() notification_data.update({'badge': Notification.objects.filter(notification_to=to_user, is_read=False).count()}) Notification.objects.create(notification_type=notification_type, notification_from=from_user, notification_to=to_user, data=notification_data) if user_notification_type and user_notification_type.push_notification: send_push(to_user, push_data) def send_push(user, data): """ used to send push notification to specific user """ user.fcmdevice_set.filter(active=True).send_message( Message(notification=FirebaseNotification(data['title'], data['body']), data=data) ) def send_multiple_push(queryset, data): """ used to send same notification to multiple users """ FCMDevice.objects.filter(user__in=queryset, active=True).send_message( Message(notification=FirebaseNotification(data['title'], data['body']), data=data) ) @shared_task() def send_notification_multiple_user(notification_type, from_user_id, from_user_type, extra_data: dict = {}): """ used to send notification to multiple user for the given notification type """ to_user_list = User.objects.filter(junior_profile__is_verified=True, is_superuser=False ).exclude(junior_profile__isnull=True, guardian_profile__isnull=True) push_data = NOTIFICATION_DICT[notification_type].copy() notification_data = push_data.copy() points = extra_data.get('points', None) from_user_name, from_user_image, from_user = get_from_user_details(from_user_id, from_user_type) push_data['body'] = push_data['body'].format(from_user=from_user_name, points=points) notification_data['body'] = notification_data['body'].format(from_user=from_user_name, points=points) notification_data['from_user'] = from_user_name notification_data['from_user_image'] = from_user_image notification_list = [] for user in to_user_list: notification_list.append(Notification(notification_type=notification_type, notification_to=user, notification_from=from_user, data=notification_data)) Notification.objects.bulk_create(notification_list) to_user_list = to_user_list.filter(user_notification__push_notification=True) send_multiple_push(to_user_list, push_data) @shared_task() def send_notification_to_guardian(notification_type, from_user_id, to_user_id, extra_data): """ :param notification_type: :param from_user_id: :param to_user_id: :param extra_data: :return: """ if from_user_id: from_user = Junior.objects.filter(auth_id=from_user_id).first() extra_data['from_user_image'] = from_user.image send_notification(notification_type, from_user_id, to_user_id, extra_data) @shared_task() def send_notification_to_junior(notification_type, from_user_id, to_user_id, extra_data): """ :param notification_type: :param from_user_id: :param to_user_id: :param extra_data: :return: """ if from_user_id: from_user = Guardian.objects.filter(user_id=from_user_id).first() extra_data['from_user_image'] = from_user.image send_notification(notification_type, from_user_id, to_user_id, extra_data)