【问题标题】:How to create partitions with a schedule in Dagster?如何在 Dagster 中使用计划创建分区?
【发布时间】:2021-08-27 21:35:10
【问题描述】:

我正在尝试在 Dagster 中创建允许我进行回填的分区。该文档有一个示例,但它使用星期几(我能够复制)。但是,我正在尝试创建带有日期的分区。

DATE_FORMAT = "%Y-%m-%d"
BACKFILL_DATE = "2021-04-01"
TODAY = datetime.today()


def get_number_of_days():
    backfill_date_obj = datetime.strptime(BACKFILL_DATE, DATE_FORMAT)
    delta = TODAY - backfill_date_obj

    return delta


def get_date_partitions():
    return [
        Partition(
            [
                datetime.strftime(TODAY - timedelta(days=x), DATE_FORMAT)
                for x in range(get_number_of_days().days)
            ]
        )
    ]


def run_config_for_date_partition(partition):
    date = partition.value
    return {"solids": {"data_to_process": {"config": {"date": date}}}}


# ----------------------------------------------------------------------
date_partition_set = PartitionSetDefinition(
    name="date_partition_set",
    pipeline_name="my_pipeline",
    partition_fn=get_date_partitions,
    run_config_fn_for_partition=run_config_for_date_partition,
)
# EXAMPLE CODE FROM DAGSTER DOCS.
# def weekday_partition_selector(
#     ctx: ScheduleExecutionContext, partition_set: PartitionSetDefinition
# ) -> Union[Partition, List[Partition]]:
#     """Maps a schedule execution time to the corresponding partition or list
#     of partitions that
#     should be executed at that time"""
#     partitions = partition_set.get_partitions(ctx.scheduled_execution_time)
#     weekday = ctx.scheduled_execution_time.weekday() if ctx.scheduled_execution_time else 0
#     return partitions[weekday]

# My attempt. I do not want to partition by the weekday name, but just by the date. 
# Instead of returnng the partition_set, I think I need to do something else with it
# but I'm not sure what it is.
def daily_partition_selector(
    ctx: ScheduleExecutionContext, partition_set: PartitionSetDefinition
) -> Union[Partition, List[Partition]]:
    return partition_set.get_partitions(ctx.scheduled_execution_time)

my_schedule = date_partition_set.create_schedule_definition(
    "my_schedule",
    "15 8 * * *",
    partition_selector=daily_partition_selector,
    execution_timezone="UTC",
)

当前的 dagster UI 将所有日期集中在分区部分。 Actual Results

Expected Results

我缺少什么可以给我预期的结果?

【问题讨论】:

    标签: python dagster


    【解决方案1】:

    在与 Dagster 的人交谈后,他们向我指出了这份文档 https://docs.dagster.io/concepts/partitions-schedules-sensors/schedules#partition-based-schedules

    这很简单,我最终得到了

    @daily_schedule(
        pipeline_name="my_pipeline",
        start_date=datetime(2021, 4, 1),
        execution_time=time(hour=8, minute=15),
        execution_timezone="UTC",
    )
    def my_schedule(date):
        return {
            "solids": {
                "data_to_process": {
                    "config": {
                        "date": date.strftime("%Y-%m-%d")
                    }
                }
            }
        }
    

    【讨论】:

      猜你喜欢
      • 2021-06-06
      • 2021-10-29
      • 2012-10-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-04-30
      • 2020-09-18
      • 2011-02-02
      相关资源
      最近更新 更多