【问题标题】:Spring Cloud Data Flow : Asynchronous DeploymentPartitionHanlderSpring Cloud 数据流:异步部署PartitionHanlder
【发布时间】:2021-09-08 00:46:00
【问题描述】:

TL;DR

我使用this example 构建了一个简单的应用程序,该应用程序使用 Spring Batch(远程分区)和 Spring Cloud 数据流在 Kubernetes 上部署 worker pod。

查看在 Kubernetes 上创建的“partitionedJob”pod 的日志,我看到工作步骤(pod)正在按顺序启动。启动一个工作 pod 所需的时间大约为 10-15 秒(有时会高达 2 分钟,如下所示)。因此,worker pod 会以 10-15 秒的间隔一个接一个地启动。


日志:

[info 2021/06/26 14:30:29.089 UTC <main> tid=0x1] Job: [SimpleJob: [name=job]] launched with the following parameters: [{maxWorkers=40, chunkSize=5000, run.id=13, batch.worker-app=docker://docker-myhost.artifactrepository.net/my-project/myjob:0.1, grideSize=40}]

[info 2021/06/26 14:30:29.155 UTC <main> tid=0x1] The job execution id 26 was run within the task execution 235

[info 2021/06/26 14:30:29.184 UTC <main> tid=0x1] Executing step: [masterStep]

2021-06-26 14:30:29 INFO  AuditRecordPartitioner:51 - Creating partitions. [gridSize=40]

[info 2021/06/26 14:32:41.128 UTC <main> tid=0x1] Using Docker entry point style: exec

[info 2021/06/26 14:34:51.560 UTC <main> tid=0x1] Using Docker image: docker-myhost.artifactrepository.net/myproject/myjob:0.1

[info 2021/06/26 14:34:51.560 UTC <main> tid=0x1] Using Docker entry point style: exec

[info 2021/06/26 14:36:39.464 UTC <main> tid=0x1] Using Docker image: docker-myhost.artifactrepository.net/myproject/myjob:0.1

[info 2021/06/26 14:36:39.464 UTC <main> tid=0x1] Using Docker entry point style: exec

[info 2021/06/26 14:38:34.203 UTC <main> tid=0x1] Using Docker image: docker-myhost.artifactrepository.net/myproject/myjob:0.1

[info 2021/06/26 14:38:34.203 UTC <main> tid=0x1] Using Docker entry point style: exec

[info 2021/06/26 14:40:44.544 UTC <main> tid=0x1] Using Docker image: docker-myhost.artifactrepository.net/myproject/myjob:0.1

[info 2021/06/26 14:40:44.544 UTC <main> tid=0x1] Using Docker entry point style: exec

在 Kubernetes 上创建 40 个 Pod 大约需要 7-8 分钟。 (有时这个数字高达 20 分钟)理想的做法是一次性异步启动所有分区步骤(工作 pod)。

问题:我们如何配置 Spring Cloud Data Flow /Spring Batch 以异步/并行而不是顺序启动 worker pod(分区步骤)?如果 SCDF 确实是一口气创建了 40 个分区,为什么实际上 master 作业正在以非常慢的速度逐个创建这些分区? (如日志中所示)。我不认为这是一个基础设施问题,因为我可以使用 Task DSL 快速启动任务

相关代码:

@EnableTask
@EnableBatchProcessing
@SpringBootApplication
public class BatchApplication {

    public static void main(String[] args) {
        SpringApplication.run(BatchApplication.class, args);
    }
}


/**
 * 
 * Main job controller
 * 
 * 
 */
@Profile("master")
@Configuration
public class MasterConfiguration {

    private static final Logger LOGGER = LoggerFactory.getLogger(MasterConfiguration.class);

    @Autowired
    private ApplicationArguments applicationArguments;

    @Bean
    public Job job(JobBuilderFactory jobBuilderFactory) {
        LOGGER.info("Creating job...");
        SimpleJobBuilder jobBuilder = jobBuilderFactory.get("job").start(masterStep(null, null, null));

        jobBuilder.incrementer(new RunIdIncrementer());

        return jobBuilder.build();
    }

    @Bean
    public Step masterStep(StepBuilderFactory stepBuilderFactory, Partitioner partitioner,
            PartitionHandler partitionHandler) {
        LOGGER.info("Creating masterStep");
        return stepBuilderFactory.get("masterStep").partitioner("workerStep", partitioner)
                .partitionHandler(partitionHandler).build();
    }

    @Bean
    public DeployerPartitionHandler partitionHandler(@Value("${spring.profiles.active}") String activeProfile,
            @Value("${batch.worker-app}") String resourceLocation,
            @Value("${spring.application.name}") String applicationName, ApplicationContext context,
            TaskLauncher taskLauncher, JobExplorer jobExplorer, ResourceLoaderResolver resolver) {
        ResourceLoader resourceLoader = resolver.get(resourceLocation);
        Resource resource = resourceLoader.getResource(resourceLocation);
        DeployerPartitionHandler partitionHandler = new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
                "workerStep");

        List<String> commandLineArgs = new ArrayList<>();
        commandLineArgs.add("--spring.profiles.active=" + activeProfile.replace("master", "worker"));
        commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
        commandLineArgs.add("--spring.batch.initializer.enabled=false");

        commandLineArgs.addAll(Arrays.stream(applicationArguments.getSourceArgs()).filter(
                x -> !x.startsWith("--spring.profiles.active=") && !x.startsWith("--spring.cloud.task.executionid="))
                .collect(Collectors.toList()));
        commandLineArgs.addAll(applicationArguments.getNonOptionArgs());

        partitionHandler.setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
        partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());

        List<String> nonOptionArgs = applicationArguments.getNonOptionArgs();

        partitionHandler.setMaxWorkers(Integer.valueOf(getNonOptionArgValue(nonOptionArgs, 3)));
        partitionHandler.setGridSize(Integer.valueOf(getNonOptionArgValue(nonOptionArgs, 4)));
        partitionHandler.setApplicationName(applicationName);

        return partitionHandler;
    }

    @Bean("auditRecordPartitioner")
    public Partitioner auditRecordPartitioner() {
        
        return new AuditRecordPartitioner<>());
    }
    
    private String getNonOptionArgValue(List<String> nonOptionArgs, int index)  {
        return nonOptionArgs.get(index).split("=")[1];
    }
}


@Profile("worker")
@Configuration
public class WorkerConfiguration {

    private static final Logger LOGGER = LoggerFactory.getLogger(WorkerConfiguration.class);

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    private ApplicationArguments applicationArguments;

    @Bean
    public DeployerStepExecutionHandler stepExecutionHandler(ApplicationContext context, JobExplorer jobExplorer,
            JobRepository jobRepository) {
        LOGGER.info("stepExecutionHandler...");
        return new DeployerStepExecutionHandler(context, jobExplorer, jobRepository);
    }

    @Bean
    public Step workerStep(StepBuilderFactory stepBuilderFactory) {
        return stepBuilderFactory.get("workerStep").tasklet(workerTasklet(null)).build();
    }

    @Bean
    @StepScope
    public WorkerTasklet workerTasklet(@Value("#{stepExecutionContext['key']}") String key) {
        return new WorkerTasklet(key);
    }

    
}

注意,我将 gridSize 和 maxWorkers 作为输入参数传递给主步骤(在启动任务时从 SCDF UI)。

【问题讨论】:

    标签: java spring-boot spring-batch spring-cloud-dataflow spring-cloud-deployer-kubernetes


    【解决方案1】:

    出于演示目的,示例将最大工作人员数设置为 2 here。因此,对于您的 40 个分区,只有两个工作人员将并行启动,这让您认为您的分区正在按顺序处理。

    您需要更新示例(或使其可配置)并根据需要增加并发工作人员的数量。

    【讨论】:

    • 感谢您的回答。我使用 max-workers 为 40 而不是 2。创建所有这些需要 7-8 分钟的原因是因为主作业在 10-15 秒的间隙后按顺序创建子分区。到生成第 40 个 pod 请求时,已经过去了 7-8 分钟。这在进行实际处理时会损失 7-8 分钟。我想启动 100 个工作 pod,所以这很容易需要 15 分钟,直到创建第 100 个 pod。理想的是能够一次性发送 pod 创建请求(即异步而不是顺序)。
    • because the master job is creating child partitions sequentially after a gap of 10-15 seconds:不应该这样。是否使用与示例相同的代码? master 立即创建GRID_SIZE 分区。正如您所提到的,我看不出它可以在哪里按顺序创建它们。请分享您的minimal example 以便能够有效地帮助您,因为我认为我们谈论的不是同一件事。
    • 我花了很多精力为我的问题添加更多细节。 (代码以及来自 k8 的实际日志)。令人惊讶的是,我可以看到每个 pod 创建请求之间需要 2 分钟,而不是我之前观察到的 10-15 秒。如果 SCDF 确实是一口气创建了 40 个分区,那为什么实际上 master 作业正在以非常慢的速度一个一个地创建这些分区? (如日志中所示)。我不认为这是一个基础设施问题,因为我可以使用 Task DSL 快速启动任务
    • 感谢您的更新。似乎工人在这里按顺序启动:github.com/spring-cloud/spring-cloud-task/blob/…。如果你为你的 Kubernetes 集群提供足够的资源来一次启动 40 个 worker,这应该会很快,否则你将受到 k8s 本身的限制(即没有足够的资源来一次启动所有 worker,k8s 将等待 worker 完成后再启动新任务)。
    • 感谢您的回复。 Kubernetes 集群是一个企业集群,因此可能需要几个月的时间,企业才会同意提供更多资源。我尝试了另一种方法使用 - http://dataflow.spring.io/docs/feature-guides/batch/java-dsl 一次启动 40 个任务,我可以看到多达 20 个正在创建的 pod k8 上的秒数。这证明了 k8 集群确实有能力一次性创建至少 20 个 Pod。使用远程分区时,我无法利用这种并行性。从今天起我有什么办法可以做到这一点?
    【解决方案2】:

    正如 Mahmoud Ben Hassine 在 cmets 中提到的,工人是 launched sequentially

    private void launchWorkers(Set<StepExecution> candidates,
                Set<StepExecution> executed) {
            for (StepExecution execution : candidates) {
                if (this.currentWorkers < this.maxWorkers || this.maxWorkers < 0) {
                    launchWorker(execution);
                    this.currentWorkers++;
    
                    executed.add(execution);
                }
            }
        }
    

    正如 cmets 中提到的Glen Renfro,已为此创建了一个issue。如果有解决方案可用于异步启动工作人员,则此答案将被更新。

    【讨论】:

      猜你喜欢
      • 2016-10-01
      • 2023-03-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-11-21
      • 1970-01-01
      • 2020-08-30
      • 1970-01-01
      相关资源
      最近更新 更多