【问题标题】:unique jobs with kue for node.jskue for node.js 的独特作业
【发布时间】:2012-01-27 11:17:31
【问题描述】:

如果系统中已有相同的作业,我希望jobs.create 失败。有什么办法可以实现吗?

我需要每 24 小时运行一次相同的作业,但有些作业可能需要超过 24 小时,因此我需要确保在添加之前该作业尚未在系统中(活动、排队或失败)它。

更新: 好的,我将简化问题以便能够在这里进行解释。 恐怕我有一个分析服务,我必须每天向我的用户发送一次报告。有时完成这些报告(只是少数情况,但有可能)需要几个小时甚至超过一天。

我需要一种方法来了解哪些是当前正在运行的作业,以避免重复作业。我在 ´´´´kue´´´´ API 中找不到任何东西来了解当前正在运行哪些作业。我还需要在需要更多工作时触发某种事件,然后致电我的getMoreJobs 生产者。

也许我的方法是错误的,如果是这样,请告诉我一个更好的方法来解决我的问题。

这是我的简化代码:

var kue = require('kue'),   
    cluster = require('cluster'),
    numCPUs = require('os').cpus().length;

numCPUs = CONFIG.sync.workers || numCPUs; 

var jobs = kue.createQueue();

if (cluster.isMaster) {
    console.log('Starting master pid:' + process.pid);
    jobs.on('job complete', function(id){
    kue.Job.get(id, function(err, job){
        if (err || !job) return;
        job.remove(function(err){
            if (err) throw err;
            console.log('removed completed job #%d', job.id);
        });
    });

    function getMoreJobs() {
        console.log('looking for more jobs...');
        getOutdateReports(function (err, reports) {
            if (err) return setTimeout(getMoreJobs, 5 * 60 * 60 * 1000);

            reports.forEach(function(report) {
                jobs.create('reports', {
                    id: report.id,
                    title: report.name,
                    params: report.params
                }).attempts(5).save();
            });

            setTimeout(getMoreJobs, 60 * 60 * 1000);
        });
    }

    //Create the jobs
    getMoreJobs();

    console.log('Starting ', numCPUs, ' workers');
    for (var i = 0; i < numCPUs; i++) {
        cluster.fork();
    }

    cluster.on('death', function(worker) {
        console.log('worker pid:' + worker.pid + ' died!'.bold.red);
    });

} else {
    //Process the jobs
    console.log('Starting worker pid:' + process.pid);
    jobs.process('reports', 20, function(job, done){
        //completing my work here
        veryHardWorkGeneratingReports(function(err) {
            if (err) return done(err);
            return done();
        });
    });
}

【问题讨论】:

  • 需要更多信息,代码什么的...
  • @Teemu 我已经更新了我的问题,谢谢!

标签: javascript node.js parallel-processing


【解决方案1】:

您的一个问题的答案是,Kue 将它从 redis 队列中弹出的作业置于“活动”状态,除非您寻找它们,否则您永远不会得到它们。

另一个问题的答案是,您的分布式工作队列是消费者,而不是任务的生产者。像你一样混合它们是可以的,但是,这是一个泥泞的范例。我对 Kue 所做的是为 kue 的 json api 做一个包装器,以便可以从系统中的任何位置将作业放入队列中。由于您似乎需要铲除工作,我建议编写一个单独的生产者应用程序,除了获取外部工作​​并将它们放入您的 Kue 工作队列之外什么都不做。它可以监视工作队列,以了解何时作业运行不足并加载批处理,或者,我会做的是使其尽可能快地铲入作业,并汇总消费者应用程序的多个实例来处理负载更快。

重申:您的关注点分离在这里不是很好。您应该有一个完全独立于您的任务消费者应用程序的任务生产者。这为您提供了更大的灵活性、易于扩展(只需在另一台机器上启动另一个消费者,您就可以扩展!)以及代码管理的整体易用性。如果可能,您还应该允许为您“寻找”这些任务的任何人访问您的 Kue 服务器的 JSON api,而不是出去寻找它们。作业生产者可以使用 Kue 安排自己的任务。

【讨论】:

  • (是的,我知道我在回答一个老问题,但我很乐意看到 SE 的问答尽可能包含正确答案)
【解决方案2】:

https//github.com/LearnBoost/kue

在 json.js 脚本中检查第 64-112 行。在那里,您会找到返回包含作业的对象的方法,这些方法也使用类型、状态或 id-range 进行过滤。 (jobRange(), jobStateRange(), jobTypeRange().)

向下滚动主页到 JSON API - 部分,您将找到返回对象的示例。

关于如何调用和使用那些方法你比我清楚得多。

jobs.create() 会失败,如果你传递一个未知的关键字。我会创建一个函数来检查forEach-loop 中的当前作业,并返回一个关键字。然后在jobs.create() -parameters 中调用这个函数而不是文字关键字。

通过 json.js 中的这些方法获得的信息也可以帮助您创建“moreJobToDo”事件。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-12-26
    • 1970-01-01
    • 2016-04-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-01-18
    • 1970-01-01
    相关资源
    最近更新 更多