Files
zod-backend/guardian/utils.py
2023-07-25 19:13:25 +05:30

101 lines
3.8 KiB
Python

"""Utiles file of guardian"""
"""Django import"""
import oss2
"""Import setting"""
from django.conf import settings
import logging
import requests
from django.core.exceptions import ObjectDoesNotExist
"""Import tempfile"""
import tempfile
# Import date time module's function
from datetime import datetime, time
# import Number constant
from base.constants import NUMBER, time_url
# Import Junior's model
from junior.models import Junior, JuniorPoints
# Import guardian's model
from .models import JuniorTask
# Import app from celery
from zod_bank.celery import app
# Define upload image on
# ali baba cloud
# firstly save image
# in temporary file
# then check bucket name
# then upload on ali baba
# bucket and reform the image url"""
# fetch real time data without depend on system time
# convert time delta into date time object
# update junior total points
# update referral points
# if any one use referral code of the junior
# junior earn 5 points
def upload_image_to_alibaba(image, filename):
"""upload image on oss alibaba bucket"""
# Save the image object to a temporary file
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
"""write image in temporary file"""
temp_file.write(image.read())
"""auth of bucket"""
auth = oss2.Auth(settings.ALIYUN_OSS_ACCESS_KEY_ID, settings.ALIYUN_OSS_ACCESS_KEY_SECRET)
"""fetch bucket details"""
bucket = oss2.Bucket(auth, settings.ALIYUN_OSS_ENDPOINT, settings.ALIYUN_OSS_BUCKET_NAME)
# Upload the temporary file to Alibaba OSS
bucket.put_object_from_file(filename, temp_file.name)
"""create perfect url for image"""
new_filename = filename.replace(' ', '%20')
return f"https://{settings.ALIYUN_OSS_BUCKET_NAME}.{settings.ALIYUN_OSS_ENDPOINT}/{new_filename}"
def real_time():
"""fetch real time from world time api"""
url = time_url
response = requests.get(url)
if response.status_code == 200:
data = response.json()
time_str = data['datetime']
realtime = datetime.fromisoformat(time_str.replace('Z', '+00:00')).replace(tzinfo=None)
return realtime
else:
logging.error("Could not fetch error")
return None
def convert_timedelta_into_datetime(time_difference):
"""convert date time"""
# convert timedelta into datetime format
hours = time_difference.seconds // NUMBER['thirty_six_hundred']
minutes = (time_difference.seconds // NUMBER['sixty']) % NUMBER['sixty']
seconds = time_difference.seconds % NUMBER['sixty']
microseconds = time_difference.microseconds
# Create a new time object with the extracted time components
time_only = time(hours, minutes, seconds, microseconds)
return time_only
def update_referral_points(referral_code, referral_code_used):
"""Update benefit of referral points"""
junior_queryset = Junior.objects.filter(referral_code=referral_code_used).last()
if junior_queryset and junior_queryset.referral_code != referral_code:
# create data if junior points is not exist
junior_query, created = JuniorPoints.objects.get_or_create(junior=junior_queryset)
if junior_query:
# update junior total points and referral points
junior_query.total_points = junior_query.total_points + NUMBER['five']
junior_query.referral_points = junior_query.referral_points + NUMBER['five']
junior_query.save()
@app.task
def update_expired_task_status(data=None):
"""
Update task of the status if due date is in past
"""
try:
task_status = [str(NUMBER['one']), str(NUMBER['two'])]
JuniorTask.objects.filter(due_date__lt=datetime.today().date(),
task_status__in=task_status).update(task_status=str(NUMBER['six']))
except ObjectDoesNotExist as e:
logging.error(str(e))