【问题标题】:How to save static/class variables in Django如何在 Django 中保存静态/类变量
【发布时间】:2021-08-23 22:52:04
【问题描述】:

由于 Django 动态加载其代码,我目前面临无法使用类变量的问题。

我的意图:我想运行几个线程,一遍又一遍地重复做同样的事情。但是,线程应该只存在一次。这就是我将它们收集在字典中的原因。

问题:字典被一次又一次地重置,因为 Django 不断地重新加载代码。

现在的问题是:我怎样才能巧妙地解决这个问题?

这是带有线程和抽象父类的 Wrapper 模块。如果你想要一个特定的任务,你必须从这里继承:

class TaskThread(Thread):

    def __init__(self, delay_time: timedelta = None, repeat_time: timedelta = None, task: Callable = None, thread_name: str = None, daemon=None, *args, **kwargs):
        super().__init__(name=thread_name, daemon=daemon, args=args, kwargs=kwargs)
        self.sleep_time = delay_time
        if repeat_time is None:
            raise ValueError('No repeat interval given')
        else:
            self.repeat_time = repeat_time
        if task is None:
            raise ValueError('No task given')
        else:
            self.task = task
        self.next_execution: Optional[datetime.datetime] = None

    def run(self):
        if self.sleep_time:
            time.sleep(self.sleep_time.total_seconds())
            del self.sleep_time
        self.execute_task()

    def execute_task(self):
        self.next_execution = self.__calculate_next_execution__()
        time.sleep(self.repeat_time.total_seconds())
        self.task()
        self.execute_task()

    def get_next_execution(self) -> datetime.datetime:
        return self.next_execution

    def __calculate_next_execution__(self) -> datetime.datetime:
        current_time = timezone.now()
        return current_time + self.repeat_time


REPEATING_TASKS: Dict[str, TaskThread] = dict()


class RepeatingTasks(ABC):

    TIME_ZONE = pytz.timezone(LOCAL_TIME_ZONE)

    @classmethod
    def start(cls):
        print('NAME: ', cls.__name__)
        print('DICT: ', REPEATING_TASKS)
        if not REPEATING_TASKS.get(cls.__name__):
            print(f'NEW THREAD')
            if not cls.start_immediately():
                delay = cls.__calculate_first_start_time__(cls.start_first_time_clock(), cls.start_first_time_delay())
                task_thread = TaskThread(delay_time=delay, repeat_time=cls.repeat_time(), task=cls.task, thread_name=cls.task_name(), daemon=True)
            else:
                task_thread = TaskThread(repeat_time=cls.repeat_time(), task=cls.task, thread_name=cls.task_name(), daemon=True)
            REPEATING_TASKS[cls.__name__] = task_thread
            task_thread.start()
        else:
            print(f'What do we say to the god of death? NOT TODAY!!!')

    @classmethod
    @abstractmethod
    def task(cls):
        pass

    @classmethod
    @abstractmethod
    def task_name(cls) -> str:
        pass

    @classmethod
    def start_immediately(cls) -> bool:
        return True

    @classmethod
    def repeat_time(cls) -> timedelta:
        return timedelta(weeks=0, days=0, hours=0, minutes=0, seconds=0, milliseconds=0, microseconds=0)

    @classmethod
    def start_first_time_clock(cls) -> datetime.time:
        return datetime.time(hour=0, minute=0, second=0, microsecond=0)

    @classmethod
    def start_first_time_delay(cls) -> timedelta:
        return timedelta(weeks=0, days=0, hours=0, minutes=0, seconds=0, milliseconds=0, microseconds=0)

    @classmethod
    def next_execution(cls) -> datetime:
        if REPEATING_TASKS.get(cls.__name__):
            return REPEATING_TASKS.get(cls.__name__).next_execution
        else:
            raise ValueError(f'Thread ({cls.task_name()}) does not exist. It may never have started.')

    @classmethod
    def __calculate_first_start_time__(cls, start_time_clock: datetime.time, start_time_date: timedelta) -> timedelta:
        current_time = timezone.now()
        start_thread_time: datetime = datetime.datetime(year=current_time.year, month=current_time.month, day=current_time.day)
        if start_time_clock.tzinfo:
            start_thread_time = pytz.timezone(start_time_clock.tzinfo.__str__()).localize(start_thread_time)
        else:
            start_thread_time = cls.TIME_ZONE.localize(start_thread_time)
        start_thread_time = start_thread_time.replace(hour=start_time_clock.hour, minute=start_time_clock.minute, second=start_time_clock.second, microsecond=start_time_clock.microsecond)
        if current_time > start_thread_time:
            start_thread_time = start_thread_time + timedelta(days=1)
        return (start_thread_time - current_time) + start_time_date

这是一个有任务的子类:

class GarbageCollectorDB(RepeatingTasks):

    NAME = 'GarbageCollectorDB'
    START_IMMEDIATELY = False
    START_TIME = time(hour=12, minute=0, second=0, microsecond=0)
    START_DELAY = timedelta(weeks=0, days=3, hours=0, minutes=0, seconds=0, milliseconds=0, microseconds=0)
    REPEATING_TIME = timedelta(weeks=0, days=7, hours=0, minutes=0, seconds=0, milliseconds=0, microseconds=0)

    @classmethod
    def task_name(cls) -> str:
        return cls.NAME

    @classmethod
    def start_immediately(cls) -> bool:
        return cls.START_IMMEDIATELY

    @classmethod
    def repeat_time(cls) -> timedelta:
        return cls.REPEATING_TIME

    @classmethod
    def start_first_time_clock(cls) -> time:
        return cls.START_TIME

    @classmethod
    def start_first_time_delay(cls) -> timedelta:
        return cls.START_DELAY

    @classmethod
    def task(cls):
        cls.__delete_all_unverified_users__()

    @classmethod
    def __delete_all_unverified_users__(cls):
        current_time = timezone.now()
        allowed_delay = timedelta(seconds=EMAIL_TOKEN_LIFE_IN_SEC)
        all_users = User.objects.all()
        for user in all_users:
            if not user.verified_email and (current_time - user.date_joined) > allowed_delay:
                user.delete()

这是加载的类:

class UserConfig(AppConfig):
    default_auto_field = 'django.db.models.BigAutoField'
    name = 'user'

    def ready(self):
        from user.models import User, Profile
        from user.signals import __create_profile__, __save_profile__, __delete_profile_picture__

        post_save.connect(__create_profile__, sender=User)
        post_save.connect(__save_profile__, sender=User)
        post_delete.connect(__delete_profile_picture__, sender=Profile)
        if CLEAN_DB_FROM_INVALID_USERS:
            from user.service import GarbageCollectorDB
            GarbageCollectorDB.start()

这里的问题:字典(“REPEATING_TASKS”)一次又一次地被清空。因此我有多个线程,它们的作用相同。

【问题讨论】:

    标签: python django multithreading


    【解决方案1】:

    好的,我找到了解决方案:os.environ.get('RUN_MAIN'),如下所述:

    https://stackoverflow.com/questions/6791911/execute-code-when-django-starts-once-only

    【讨论】:

      猜你喜欢
      • 2010-12-06
      • 1970-01-01
      • 1970-01-01
      • 2021-10-20
      • 2020-11-17
      • 1970-01-01
      • 2013-08-05
      • 1970-01-01
      • 2018-06-23
      相关资源
      最近更新 更多