【发布时间】:2019-08-18 20:52:33
【问题描述】:
我有一个有 4 个主题的 kafka 制作人。当我根据 CSV 的类型上传 CSV 时,我会根据其主题将其发送给消费者。如果我第一次上传它会很好,但是一旦我上传另一个 CSV,它也会上传以前的数据,然后它会放入新的 CSV 数据。
我是 kafka 的新手,所以我无法找到有关此问题的适当解决方案。我尝试搜索偏移量但无法实现它。我尝试将数组重置为 null,但是在新文件到来时重置为 null 后,它们中既有以前的数据也有新的数据。
Producer.js 代码
var kafka = require('kafka-node'),
HighLevelProducer = kafka.HighLevelProducer,
HighLevelConsumer = kafka.HighLevelConsumer,
client = new kafka.Client(),
producer = new HighLevelProducer(client),
fs = require('fs'),
consumer = new HighLevelConsumer(client, [{ topic: 'csvDealData', partition: 0 }, { topic: 'csvAssetData', partition: 0 }, { topic: 'csvPricingData', partition: 0 }, { topic: 'csvRedeemData', partition: 0 }], { autoCommit: false });
var payloads;
var async = require('async');
console.log("STARTING PRODUCER");
var config = require("./config.json")
var http = require('http');
var express = require('express');
var app = express();
var port = '9094';
let tempCSVArray = [];
var server = http.createServer(app).listen(port);
server.timeout = 24000;
var totalDataLength = 0;
var tempIndex;
// var offset = new kafka.Offset(client)
// offset.fetchLatestOffsets([topic], (err, offsets) => {
// if (err) {
// console.log(`error fetching latest offsets ${err}`)
// return
// }
// var latest = 1
// Object.keys(offsets[topic]).forEach( o => {
// latest = offsets[topic][o] > latest ? offsets[topic][o] : latest
// })
// console.log(" topic ::: "+topic);
// console.log(" offsets ::: "+offsets);
// console.log(" latest ::: "+latest);
// consumer.setOffset(topic, 0, latest-1)
// });
var io = require('socket.io').listen(server, function () {
console.log("Connected To Invoke Server.... ");
});
io.on('connection', function (socket) {
socket.on('csvDataFromUI', function (data) {
producer.on('error', function (err) { });
tempCSVArray.push(data.dataArr);
// here we are getting all the rows from CSV and we wait for the end line to come once we recived it we create and array and then send it tp and async function
if (data.isEnd) {
totalDataLength = tempCSVArray.length
console.log(" \n length of data send to invoke function is ::::: " + totalDataLength + " \n dataArray value :::: " + JSON.stringify(tempCSVArray));
csvInvoke(tempCSVArray);
}
});
})
function csvInvoke(tempCSVArray) {
async.eachOfSeries(tempCSVArray, (a, index, asyncCallback) => {
a = a[0];
tempIndex = index;
let csvType = a.CsvType;
if (csvType === "DealCaptureUpload") {
var message = a;
var originator = a.RepoData[0].Party[0].ParticipantID.trim();
var collection = a.Collection;
console.log(" originator :::: " + originator);
console.log("\nCollection: " + collection);
if (a.RepoData[0].Trade[0].TransactionStatus == "NEW") {
var newdata = { "originator": originator, "peers": ["0.0.0.0:" + config.peerPort[0][originator]], "channelName": "globalchannel", "chaincodeName": "DealPrivateCC", "fcn": "invoke", "Invokeargs": ["Capture", collection, message, "NEW", "DealPrivateCC", "globalchannel"], "username": "adminY", "orgName": config.orgList[0][originator] };
payloads = [{ topic: 'csvDealData', messages: JSON.stringify(newdata), partition: 0, originator: originator }];
} else if (a.RepoData[0].Trade[0].TransactionStatus == "CANCEL") {
var newdata = { "originator": originator, "peers": ["0.0.0.0:" + config.peerPort[0][originator]], "channelName": "globalchannel", "chaincodeName": "DealPrivateCC", "fcn": "invoke", "Invokeargs": ["Capture", collection, message, "CANCEL", "PartyPrivateCC", "globalchannel"], "username": "adminY", "orgName": config.orgList[0][originator] };
payloads = [{ topic: 'csvDealData', messages: JSON.stringify(newdata), partition: 0, originator: originator }];
}
setTimeout(() => {
producer.send(payloads, function (err, data) {
if (err != null)
console.log("Error sending payload to consumer - " + err)
else
console.log("\n index ::: " + tempIndex + "\n Payloads :::::::: " + JSON.stringify(payloads));
});
asyncCallback();
}, 6000);
} else if (csvType === "AssetIssuanceUpload") {
var message = a.Record[0];
var originator = a.Party.trim();
var collection = a.Collection;
console.log(" \n originator :::: " + originator);
console.log("\n Collection: " + collection);
var newdata = { "originator": originator, "peers": ["0.0.0.0:" + config.peerPort[0][originator]], "channelName": "globalchannel", "chaincodeName": "OwnershipPrivateCC", "fcn": "invokeInternal", "Invokeargs": ["Creation", collection, message], "username": "adminY", "orgName": config.orgList[0][originator] };
payloads = [{ topic: 'csvAssetData', messages: JSON.stringify(newdata), partition: 0, originator: originator }];
setTimeout(() => {
producer.send(payloads, function (err, data) {
if (err != null)
console.log("Error sending payload to consumer - " + err)
else
console.log("\n index ::: " + tempIndex + "\n Payloads :::::::: " + JSON.stringify(payloads));
});
asyncCallback();
}, 500);
} else if (csvType === "PricingDataUpload") {
var message = a.Record[0];
var originator = a.Party.trim();
console.log(" \n originator :::: " + originator);
var newdata = { "originator": originator, "peers": ["0.0.0.0:" + config.peerPort[0][originator]], "channelName": "globalchannel", "chaincodeName": "DataCC", "fcn": "invoke", "Invokeargs": ["DataSetup", message], "username": "adminY", "orgName": config.orgList[0][originator] };
payloads = [{ topic: 'csvPricingData', messages: JSON.stringify(newdata), partition: 0, originator: originator }];
setTimeout(() => {
producer.send(payloads, function (err, data) {
if (err != null)
console.log("Error sending payload to consumer - " + err)
else
console.log("\n index ::: " + index + "\n Payloads :::::::: " + JSON.stringify(payloads));
});
asyncCallback();
}, 500);
} else if (csvType === "RedeemDataUpload") {
var message = a.Record[0];
var originator = a.Party.trim();
var collection = a.Collection;
console.log(" \n originator :::: " + originator);
console.log("\n Collection: " + collection);
var newdata = { "originator": originator, "peers": ["0.0.0.0:" + config.peerPort[0][originator]], "channelName": "globalchannel", "chaincodeName": "PrivateCC", "fcn": "invoke", "Invokeargs": ["invokeWithdrawal", collection, message, "OwnershipPrivateCC"], "username": "adminY", "orgName": config.orgList[0][originator] };
payloads = [{ topic: 'csvRedeemData', messages: JSON.stringify(newdata), partition: 0, originator: originator }];
setTimeout(() => {
producer.send(payloads, function (err, data) {
if (err != null)
console.log("Error sending payload to consumer - " + err)
else
console.log("\n index ::: " + index + "\n Payloads :::::::: " + JSON.stringify(payloads));
});
asyncCallback();
}, 2000);
}
if (index === totalDataLength - 1) {
tempCSVArray = [];
//a = [];
payloads = [];
console.log(" We are flushing the tempCSVArray ::::");
console.log("\n final tempCSVArray ::: " + JSON.stringify(tempCSVArray) + " final a :::::: final payloads ::::: " + JSON.stringify(payloads))
}
client.refreshMetadata(['csvDealData', 'csvAssetData', 'csvPricingData', 'csvRedeemData'], (err) => {
if (err) {
console.warn('Error refreshing kafka metadata', err);
}
});
}, function (err) {
if (err) console.error(err.message);
console.warn('Error refreshing kafka metadata', err);
});
}
消费者守则
var kafka = require('kafka-node');
var HighLevelConsumer = kafka.HighLevelConsumer;
var Client = kafka.Client;
var argv = require('optimist').argv;
var client = new Client('localhost:2181');
var topics = [{ topic: 'csvDealData' }, { topic: 'csvAssetData' }, { topic: 'csvPricingData' }, { topic: 'csvRedeemData' }];
var options = { autoCommit: true, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024 * 1024 };
var consumer = new HighLevelConsumer(client, topics, options);
console.log("STARTING CONSUMER");
var hfcSocket = require("socket.io-client");
var invoke1 = hfcSocket.connect('http://10.21.134.17:5001');
var invoke2 = hfcSocket.connect('http://10.21.134.17:5004');
var invoke3 = hfcSocket.connect('http://10.21.134.17:5005');
var invoke4 = hfcSocket.connect('http://10.21.134.17:5002');
var invoke5 = hfcSocket.connect('http://10.21.134.17:5006');
var invoke6 = hfcSocket.connect('http://10.21.134.17:5003');
var invoke7 = hfcSocket.connect('http://10.21.134.17:5007');
var invoke8 = hfcSocket.connect('http://10.21.134.17:5008');
consumer.on('message', function (message) {
console.log(" message in consumer :::: " + JSON.stringify(message));
if (message.topic == "csvDealData") {
console.log(" Message ::: " + JSON.stringify(message));
var originator = JSON.parse(message.value).originator;
if (originator == "Org1")
invoke1.emit('csvDealData', message)
else if (originator == "Org2")
invoke2.emit('csvDealData', message);
else if (originator == "Org3")
invoke6.emit('csvDealData', message);
else if (originator == "Org4")
invoke5.emit('csvDealData', message);
else if (originator == "Org5")
invoke3.emit('csvDealData', message);
else if (originator == "Org6")
invoke4.emit('csvDealData', message);
else if (originator == "Org7")
invoke7.emit('csvDealData', message);
else if (originator == "Org8")
invoke8.emit('csvDealData', message);
} else if (message.topic == "csvAssetData") {
var originator = JSON.parse(message.value).originator;
if (originator == "Org1")
invoke1.emit('csvAssetData', message)
else if (originator == "Org2")
invoke2.emit('csvAssetData', message);
else if (originator == "Org3")
invoke6.emit('csvAssetData', message);
else if (originator == "Org4")
invoke5.emit('csvAssetData', message);
else if (originator == "Org5")
invoke3.emit('csvAssetData', message);
else if (originator == "Org6")
invoke4.emit('csvAssetData', message);
else if (originator == "Org7")
invoke7.emit('csvAssetData', message);
else if (originator == "Org8")
invoke8.emit('csvAssetData', message);
} else if (message.topic == "csvPricingData") {
var originator = JSON.parse(message.value).originator;
if (originator == "Org1")
invoke1.emit('csvPricingData', message)
else if (originator == "Org2")
invoke2.emit('csvPricingData', message);
else if (originator == "Org3")
invoke6.emit('csvPricingData', message);
else if (originator == "Org4")
invoke5.emit('csvPricingData', message);
else if (originator == "Org5")
invoke3.emit('csvPricingData', message);
else if (originator == "Org6")
invoke4.emit('csvPricingData', message);
else if (originator == "Org7")
invoke7.emit('csvPricingData', message);
else if (originator == "Org8")
invoke8.emit('csvPricingData', message);
} else if (message.topic == "csvRedeemData") {
var originator = JSON.parse(message.value).originator;
if (originator == "Org1")
invoke1.emit('csvRedeemData', message)
else if (originator == "Org2")
invoke2.emit('csvRedeemData', message);
else if (originator == "Org3")
invoke6.emit('csvRedeemData', message);
else if (originator == "Org4")
invoke5.emit('csvRedeemData', message);
else if (originator == "Org5")
invoke3.emit('csvRedeemData', message);
else if (originator == "Org6")
invoke4.emit('csvRedeemData', message);
else if (originator == "Org7")
invoke7.emit('csvRedeemData', message);
else if (originator == "Org8")
invoke8.emit('csvRedeemData', message);
}
});
consumer.on('error', function (err) {
console.log('error', err);
});
我期待如果用户已经上传了一个 CSV 并上传了另一种类型的 CSV,它不应该妨碍以前 CSV 的数据,然后它也应该在不调用以前的数据的情况下正确发送新的 CSV。
【问题讨论】:
标签: javascript arrays apache-kafka kafka-producer-api