【发布时间】:2021-08-23 22:52:04
【问题描述】:
由于 Django 动态加载其代码,我目前面临无法使用类变量的问题。
我的意图:我想运行几个线程,一遍又一遍地重复做同样的事情。但是,线程应该只存在一次。这就是我将它们收集在字典中的原因。
问题:字典被一次又一次地重置,因为 Django 不断地重新加载代码。
现在的问题是:我怎样才能巧妙地解决这个问题?
这是带有线程和抽象父类的 Wrapper 模块。如果你想要一个特定的任务,你必须从这里继承:
class TaskThread(Thread):
def __init__(self, delay_time: timedelta = None, repeat_time: timedelta = None, task: Callable = None, thread_name: str = None, daemon=None, *args, **kwargs):
super().__init__(name=thread_name, daemon=daemon, args=args, kwargs=kwargs)
self.sleep_time = delay_time
if repeat_time is None:
raise ValueError('No repeat interval given')
else:
self.repeat_time = repeat_time
if task is None:
raise ValueError('No task given')
else:
self.task = task
self.next_execution: Optional[datetime.datetime] = None
def run(self):
if self.sleep_time:
time.sleep(self.sleep_time.total_seconds())
del self.sleep_time
self.execute_task()
def execute_task(self):
self.next_execution = self.__calculate_next_execution__()
time.sleep(self.repeat_time.total_seconds())
self.task()
self.execute_task()
def get_next_execution(self) -> datetime.datetime:
return self.next_execution
def __calculate_next_execution__(self) -> datetime.datetime:
current_time = timezone.now()
return current_time + self.repeat_time
REPEATING_TASKS: Dict[str, TaskThread] = dict()
class RepeatingTasks(ABC):
TIME_ZONE = pytz.timezone(LOCAL_TIME_ZONE)
@classmethod
def start(cls):
print('NAME: ', cls.__name__)
print('DICT: ', REPEATING_TASKS)
if not REPEATING_TASKS.get(cls.__name__):
print(f'NEW THREAD')
if not cls.start_immediately():
delay = cls.__calculate_first_start_time__(cls.start_first_time_clock(), cls.start_first_time_delay())
task_thread = TaskThread(delay_time=delay, repeat_time=cls.repeat_time(), task=cls.task, thread_name=cls.task_name(), daemon=True)
else:
task_thread = TaskThread(repeat_time=cls.repeat_time(), task=cls.task, thread_name=cls.task_name(), daemon=True)
REPEATING_TASKS[cls.__name__] = task_thread
task_thread.start()
else:
print(f'What do we say to the god of death? NOT TODAY!!!')
@classmethod
@abstractmethod
def task(cls):
pass
@classmethod
@abstractmethod
def task_name(cls) -> str:
pass
@classmethod
def start_immediately(cls) -> bool:
return True
@classmethod
def repeat_time(cls) -> timedelta:
return timedelta(weeks=0, days=0, hours=0, minutes=0, seconds=0, milliseconds=0, microseconds=0)
@classmethod
def start_first_time_clock(cls) -> datetime.time:
return datetime.time(hour=0, minute=0, second=0, microsecond=0)
@classmethod
def start_first_time_delay(cls) -> timedelta:
return timedelta(weeks=0, days=0, hours=0, minutes=0, seconds=0, milliseconds=0, microseconds=0)
@classmethod
def next_execution(cls) -> datetime:
if REPEATING_TASKS.get(cls.__name__):
return REPEATING_TASKS.get(cls.__name__).next_execution
else:
raise ValueError(f'Thread ({cls.task_name()}) does not exist. It may never have started.')
@classmethod
def __calculate_first_start_time__(cls, start_time_clock: datetime.time, start_time_date: timedelta) -> timedelta:
current_time = timezone.now()
start_thread_time: datetime = datetime.datetime(year=current_time.year, month=current_time.month, day=current_time.day)
if start_time_clock.tzinfo:
start_thread_time = pytz.timezone(start_time_clock.tzinfo.__str__()).localize(start_thread_time)
else:
start_thread_time = cls.TIME_ZONE.localize(start_thread_time)
start_thread_time = start_thread_time.replace(hour=start_time_clock.hour, minute=start_time_clock.minute, second=start_time_clock.second, microsecond=start_time_clock.microsecond)
if current_time > start_thread_time:
start_thread_time = start_thread_time + timedelta(days=1)
return (start_thread_time - current_time) + start_time_date
这是一个有任务的子类:
class GarbageCollectorDB(RepeatingTasks):
NAME = 'GarbageCollectorDB'
START_IMMEDIATELY = False
START_TIME = time(hour=12, minute=0, second=0, microsecond=0)
START_DELAY = timedelta(weeks=0, days=3, hours=0, minutes=0, seconds=0, milliseconds=0, microseconds=0)
REPEATING_TIME = timedelta(weeks=0, days=7, hours=0, minutes=0, seconds=0, milliseconds=0, microseconds=0)
@classmethod
def task_name(cls) -> str:
return cls.NAME
@classmethod
def start_immediately(cls) -> bool:
return cls.START_IMMEDIATELY
@classmethod
def repeat_time(cls) -> timedelta:
return cls.REPEATING_TIME
@classmethod
def start_first_time_clock(cls) -> time:
return cls.START_TIME
@classmethod
def start_first_time_delay(cls) -> timedelta:
return cls.START_DELAY
@classmethod
def task(cls):
cls.__delete_all_unverified_users__()
@classmethod
def __delete_all_unverified_users__(cls):
current_time = timezone.now()
allowed_delay = timedelta(seconds=EMAIL_TOKEN_LIFE_IN_SEC)
all_users = User.objects.all()
for user in all_users:
if not user.verified_email and (current_time - user.date_joined) > allowed_delay:
user.delete()
这是加载的类:
class UserConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'user'
def ready(self):
from user.models import User, Profile
from user.signals import __create_profile__, __save_profile__, __delete_profile_picture__
post_save.connect(__create_profile__, sender=User)
post_save.connect(__save_profile__, sender=User)
post_delete.connect(__delete_profile_picture__, sender=Profile)
if CLEAN_DB_FROM_INVALID_USERS:
from user.service import GarbageCollectorDB
GarbageCollectorDB.start()
这里的问题:字典(“REPEATING_TASKS”)一次又一次地被清空。因此我有多个线程,它们的作用相同。
【问题讨论】:
标签: python django multithreading