""" Celery Basic configuration """ # python imports import os # third party imports from celery import Celery from dotenv import load_dotenv from celery.schedules import crontab # OR, the same with increased verbosity: load_dotenv(verbose=True) env_path = os.path.join(os.path.abspath(os.path.join('.env', os.pardir)), '.env') load_dotenv(dotenv_path=env_path) os.environ.setdefault('DJANGO_SETTINGS_MODULE', os.getenv('SETTINGS')) # celery app app = Celery() app.config_from_object('django.conf:settings') # Load task modules from all registered Django apps. app.autodiscover_tasks() app.conf.beat_schedule = { "expired_task": { "task": "guardian.utils.update_expired_task_status", "schedule": crontab(minute=0, hour=0), }, 'notify_task_expiry': { 'task': 'base.tasks.notify_task_expiry', 'schedule': crontab(minute='0', hour='13'), }, 'notify_top_junior': { 'task': 'base.tasks.notify_top_junior', 'schedule': crontab(minute='0', hour='*/1'), }, } @app.task(bind=True) def debug_task(self): """ celery debug task """ print(f'Request: {self.request!r}')