我必须跳过你的其他问题才能理解你在这里实际问的是什么。不过,您的一般情况似乎归结为几件事:
- 如何最好地定期运行任务。
- 如何避免在更新时从提要中添加重复数据
所以基本上这里最好的办法是管理通过 MongoDB“upserts”加载到集合中的提要数据,这应该只在某些内容不存在时创建新文档。不过,要做到这一点,您需要稍微处理从提要接收的内容,或者主要只是使用默认的_id 作为提要中的唯一标识符。
以下是节点中一些助手的基本流程:
var async = require('async'),
time = require('time'),
CronJob = require('cron').CronJob,
mongoose = require('mongoose'),
Schema = mongoose.Schema,
FeedParser = require('feedparser'),
request = require('request');
mongoose.connect('mongodb://localhost/test');
var feedSchema = new Schema({
_id: String
},{ strict: false });
var Feed = mongoose.model('Feed',feedSchema);
var job = new CronJob({
cronTime: '0 0-59 * * * *',
onTick: function() {
var req = request('https://itunes.apple.com/us/rss/customerreviews/id=662900426/sortBy=mostRecent/xml'),
feedparser = new FeedParser();
var bulk = Feed.collection.initializeUnorderedBulkOp();
req.on('error',function(err) {
throw err;
});
req.on('response',function(res) {
var stream = this;
if (res.statusCode != 200) {
return this.emit('error', new Error('Bad status code'));
} else {
console.log("res OK");
}
stream.pipe(feedparser);
});
feedparser.on('error',function(err) {
throw err;
});
feedparser.on('readable',function() {
var stream = this,
meta = this.meta,
item;
while ( item = stream.read() ) {
item._id = item.guid;
delete item.guid;
bulk.find({ _id: item._id }).upsert().updateOne({ "$set": item });
}
});
feedparser.on('end',function() {
console.log('at end');
bulk.execute(function(err,response) {
// Shouldn't be one as errors should be in the response
// but just in case there was a problem connecting the op
if (err) throw err;
// Just dumping the response for demo purposes
console.log( JSON.stringify( response, undefined, 4 ) );
});
});
},
start: true
});
mongoose.connection.on('open',function(err,db) {
job.start();
});
我首先提到的一些事情。此处的 Schema 定义使用strict:false,主要是因为我不想指定所有字段但让猫鼬为我处理。虽然_id 有一个定义为“String”,但这是因为您要从提要数据中使用的“id”的类型是正确的。
一般情况下,使用该节点模块在“cron”作业中设置。这会设置一个定期“作业”以按指定的计划运行。我在这里使用的时间是每分钟,只是为了演示。
其他部分实现“feedparser”模块功能,其中对内容提出请求,然后通过 feedparser 将其处理成您可以使用的数据。你可以选择在外部设置该部分,但只是在此处的“工作”定义中作为示例。
为了将数据放入 MongoDB,我在这里使用批量操作 API。您不必这样做,但它确实通过您稍后获得的写入响应更清楚地了解正在发生的事情。否则,指定“upsert”的一般猫鼬方法都可以,例如.findByIdAndUpdate()。
这发生在解析器流可读时触发的事件中。每个.read() 请求都会从提要中返回当前的“项目”。为了让一切变得愉快,我们将“guid”字段从原始字段数据更改为_id 字段。然后你只需设置“upsert”请求。在批量操作中,这只是将请求“排队”到此处。
最后,批量操作被执行,并因此发送到服务器。在这里,我们检查响应以了解实际发生的情况。
在“作业”的定义之外,仅当与数据库的连接可用时才包含在“启动”作业中。一般来说,这是一个很好的做法,但如果使用 mongoose 模型方法进行“upserts”,那么 mongoose 应该“排队”操作,直到建立连接为止。
现在发生的情况是该作业应该在启动时触发,因为它是这样定义的,并且每分钟该作业将再次运行,请求提要数据并“更新”它。在第一次运行时,空白集合上的写入响应的实际输出将是这样的:
{
"ok": 1,
"writeErrors": [],
"writeConcernErrors": [],
"nInserted": 0,
"nUpserted": 51,
"nMatched": 0,
"nModified": 0,
"nRemoved": 0,
"upserted": [
{
"index": 0,
"_id": "https://itunes.apple.com/us/app/cox-contour-for-ipad/id662900426?mt=8&uo=2"
},
{
"index": 1,
"_id": "1024220540"
},
{
"index": 2,
"_id": "1023922797"
},
{
"index": 3,
"_id": "1023784213"
},
{
"index": 4,
"_id": "1023592061"
}
]
}
等等,无论在提要中返回多少项目,因为这些项目是新插入到集合中的。但是当下一个“tick”运行时:
{
"ok": 1,
"writeErrors": [],
"writeConcernErrors": [],
"nInserted": 0,
"nUpserted": 0,
"nMatched": 51,
"nModified": 0,
"nRemoved": 0,
"upserted": []
}
由于没有任何新内容,也没有实际更改,它只是报告项目是“匹配的”,实际上并没有做任何其他事情来“修改”或“插入”。只要如图所示使用 $set 运算符,MongoDB 通常就足够聪明了。
如果提要中的数据确实发生了某些变化,则在数据不同的情况下将被“修改”,在提要中出现新项目的情况下将被“更新”。
根据需要进行更改,但有一种方法可以定期进行设置,并且避免在决定是否插入之前检查数据库中的每个项目是否存在。