""" web_admin tasks file """ import datetime # third party imports from celery import shared_task from templated_email import send_templated_mail # django imports from django.conf import settings # local imports from base.constants import PENDING, IN_PROGRESS, JUNIOR from guardian.models import JuniorTask from notifications.constants import PENDING_TASK_EXPIRING, IN_PROGRESS_TASK_EXPIRING from notifications.utils import send_notification @shared_task def send_email_otp(email, verification_code): """ used to send otp on email :param email: e-mail :param verification_code: otp """ from_email = settings.EMAIL_FROM_ADDRESS recipient_list = [email] send_templated_mail( template_name='email_reset_verification.email', from_email=from_email, recipient_list=recipient_list, context={ 'verification_code': verification_code } ) return True @shared_task def send_mail(recipient_list, template, context: dict = None): """ used to send otp on email :param context: :param recipient_list: e-mail list :param template: email template """ if context is None: context = {} from_email = settings.EMAIL_FROM_ADDRESS recipient_list = recipient_list send_templated_mail( template_name=template, from_email=from_email, recipient_list=recipient_list, context=context ) return True @shared_task() def notify_task_expiry(): """ task to send notification for those task which expiring soon :return: """ all_pending_tasks = JuniorTask.objects.filter( task_status__in=[PENDING, IN_PROGRESS], due_date__range=[datetime.datetime.now().date(), (datetime.datetime.now().date() + datetime.timedelta(days=1))]) if pending_tasks := all_pending_tasks.filter(task_status=PENDING): for task in pending_tasks: send_notification(PENDING_TASK_EXPIRING, None, None, task.junior.auth.id, {'task_id': task.id}) if in_progress_tasks := all_pending_tasks.filter(task_status=IN_PROGRESS): for task in in_progress_tasks: send_notification(IN_PROGRESS_TASK_EXPIRING, task.junior.auth.id, JUNIOR, task.guardian.user.id, {'task_id': task.id}) return True