【问题标题】:How to refresh kafka queue once our data has been processed?处理完数据后如何刷新kafka队列?
【发布时间】:2019-08-18 20:52:33
【问题描述】:

我有一个有 4 个主题的 kafka 制作人。当我根据 CSV 的类型上传 CSV 时,我会根据其主题将其发送给消费者。如果我第一次上传它会很好,但是一旦我上传另一个 CSV,它也会上传以前的数据,然后它会放入新的 CSV 数据。

我是 kafka 的新手,所以我无法找到有关此问题的适当解决方案。我尝试搜索偏移量但无法实现它。我尝试将数组重置为 null,但是在新文件到来时重置为 null 后,它们中既有以前的数据也有新的数据。

Producer.js 代码

 var kafka = require('kafka-node'),
        HighLevelProducer = kafka.HighLevelProducer,
        HighLevelConsumer = kafka.HighLevelConsumer,
        client = new kafka.Client(),
        producer = new HighLevelProducer(client),
        fs = require('fs'),
        consumer = new HighLevelConsumer(client, [{ topic: 'csvDealData', partition: 0 }, { topic: 'csvAssetData', partition: 0 }, { topic: 'csvPricingData', partition: 0 }, { topic: 'csvRedeemData', partition: 0 }], { autoCommit: false });
    var payloads;

var async = require('async');
console.log("STARTING PRODUCER");
var config = require("./config.json")
var http = require('http');
var express = require('express');
var app = express();
var port = '9094';
let tempCSVArray = [];
var server = http.createServer(app).listen(port);
server.timeout = 24000;
var totalDataLength = 0;
var tempIndex;

// var offset = new kafka.Offset(client)
// offset.fetchLatestOffsets([topic], (err, offsets) => {
//     if (err) {
//         console.log(`error fetching latest offsets ${err}`)
//         return
//     }
//     var latest = 1
//     Object.keys(offsets[topic]).forEach( o => {
//         latest = offsets[topic][o] > latest ? offsets[topic][o] : latest
//     })
//     console.log(" topic :::  "+topic);
//     console.log(" offsets :::  "+offsets);
//     console.log(" latest :::  "+latest);
//     consumer.setOffset(topic, 0, latest-1)
// });

var io = require('socket.io').listen(server, function () {
    console.log("Connected To Invoke Server.... ");
});
io.on('connection', function (socket) {
    socket.on('csvDataFromUI', function (data) {
        producer.on('error', function (err) { });
        tempCSVArray.push(data.dataArr);

        // here we are getting all the rows from CSV and we wait for the end line to come once we recived it we create and array and then send it tp and async function
        if (data.isEnd) {
            totalDataLength = tempCSVArray.length
            console.log(" \n length of data send to invoke function is ::::: " + totalDataLength + " \n  dataArray value :::: " + JSON.stringify(tempCSVArray));
            csvInvoke(tempCSVArray);
        }
    });
})

function csvInvoke(tempCSVArray) {
    async.eachOfSeries(tempCSVArray, (a, index, asyncCallback) => {
        a = a[0];
        tempIndex = index;
        let csvType = a.CsvType;
        if (csvType === "DealCaptureUpload") {
            var message = a;
            var originator = a.RepoData[0].Party[0].ParticipantID.trim();
            var collection = a.Collection;
            console.log(" originator  :::: " + originator);
            console.log("\nCollection: " + collection);
            if (a.RepoData[0].Trade[0].TransactionStatus == "NEW") {
                var newdata = { "originator": originator, "peers": ["0.0.0.0:" + config.peerPort[0][originator]], "channelName": "globalchannel", "chaincodeName": "DealPrivateCC", "fcn": "invoke", "Invokeargs": ["Capture", collection, message, "NEW", "DealPrivateCC", "globalchannel"], "username": "adminY", "orgName": config.orgList[0][originator] };
                payloads = [{ topic: 'csvDealData', messages: JSON.stringify(newdata), partition: 0, originator: originator }];
            } else if (a.RepoData[0].Trade[0].TransactionStatus == "CANCEL") {
                var newdata = { "originator": originator, "peers": ["0.0.0.0:" + config.peerPort[0][originator]], "channelName": "globalchannel", "chaincodeName": "DealPrivateCC", "fcn": "invoke", "Invokeargs": ["Capture", collection, message, "CANCEL", "PartyPrivateCC", "globalchannel"], "username": "adminY", "orgName": config.orgList[0][originator] };
                payloads = [{ topic: 'csvDealData', messages: JSON.stringify(newdata), partition: 0, originator: originator }];
            }
            setTimeout(() => {
                producer.send(payloads, function (err, data) {
                    if (err != null)
                        console.log("Error sending payload to consumer - " + err)
                    else
                        console.log("\n index ::: " + tempIndex + "\n Payloads  ::::::::  " + JSON.stringify(payloads));
                });
                asyncCallback();
            }, 6000);
        } else if (csvType === "AssetIssuanceUpload") {
            var message = a.Record[0];
            var originator = a.Party.trim();
            var collection = a.Collection;
            console.log(" \n originator  :::: " + originator);
            console.log("\n Collection: " + collection);
            var newdata = { "originator": originator, "peers": ["0.0.0.0:" + config.peerPort[0][originator]], "channelName": "globalchannel", "chaincodeName": "OwnershipPrivateCC", "fcn": "invokeInternal", "Invokeargs": ["Creation", collection, message], "username": "adminY", "orgName": config.orgList[0][originator] };
            payloads = [{ topic: 'csvAssetData', messages: JSON.stringify(newdata), partition: 0, originator: originator }];
            setTimeout(() => {
                producer.send(payloads, function (err, data) {
                    if (err != null)
                        console.log("Error sending payload to consumer - " + err)
                    else
                        console.log("\n index ::: " + tempIndex + "\n Payloads  ::::::::  " + JSON.stringify(payloads));
                });
                asyncCallback();
            }, 500);
        } else if (csvType === "PricingDataUpload") {
            var message = a.Record[0];
            var originator = a.Party.trim();
            console.log(" \n originator  :::: " + originator);
            var newdata = { "originator": originator, "peers": ["0.0.0.0:" + config.peerPort[0][originator]], "channelName": "globalchannel", "chaincodeName": "DataCC", "fcn": "invoke", "Invokeargs": ["DataSetup", message], "username": "adminY", "orgName": config.orgList[0][originator] };
            payloads = [{ topic: 'csvPricingData', messages: JSON.stringify(newdata), partition: 0, originator: originator }];
            setTimeout(() => {
                producer.send(payloads, function (err, data) {
                    if (err != null)
                        console.log("Error sending payload to consumer - " + err)
                    else
                        console.log("\n index ::: " + index + "\n Payloads  ::::::::  " + JSON.stringify(payloads));
                });
                asyncCallback();
            }, 500);
        } else if (csvType === "RedeemDataUpload") {
            var message = a.Record[0];
            var originator = a.Party.trim();
            var collection = a.Collection;
            console.log(" \n originator  :::: " + originator);
            console.log("\n Collection: " + collection);
            var newdata = { "originator": originator, "peers": ["0.0.0.0:" + config.peerPort[0][originator]], "channelName": "globalchannel", "chaincodeName": "PrivateCC", "fcn": "invoke", "Invokeargs": ["invokeWithdrawal", collection, message, "OwnershipPrivateCC"], "username": "adminY", "orgName": config.orgList[0][originator] };
            payloads = [{ topic: 'csvRedeemData', messages: JSON.stringify(newdata), partition: 0, originator: originator }];
            setTimeout(() => {
                producer.send(payloads, function (err, data) {
                    if (err != null)
                        console.log("Error sending payload to consumer - " + err)
                    else
                        console.log("\n index ::: " + index + "\n Payloads  ::::::::  " + JSON.stringify(payloads));
                });
                asyncCallback();
            }, 2000);
        }

        if (index === totalDataLength - 1) {
            tempCSVArray = [];
            //a = [];
            payloads = [];
            console.log(" We are flushing the tempCSVArray  ::::");
            console.log("\n final tempCSVArray  ::: " + JSON.stringify(tempCSVArray) + "    final a :::::: final  payloads ::::: " + JSON.stringify(payloads))

        }
        client.refreshMetadata(['csvDealData', 'csvAssetData', 'csvPricingData', 'csvRedeemData'], (err) => {
            if (err) {
                console.warn('Error refreshing kafka metadata', err);
            }
        });

    }, function (err) {
        if (err) console.error(err.message);
        console.warn('Error refreshing kafka metadata', err);
    });
}

消费者守则

var kafka = require('kafka-node');
var HighLevelConsumer = kafka.HighLevelConsumer;
var Client = kafka.Client;
var argv = require('optimist').argv;
var client = new Client('localhost:2181');
var topics = [{ topic: 'csvDealData' }, { topic: 'csvAssetData' }, { topic: 'csvPricingData' }, { topic: 'csvRedeemData' }];
var options = { autoCommit: true, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024 * 1024 };
var consumer = new HighLevelConsumer(client, topics, options);

console.log("STARTING CONSUMER");

var hfcSocket = require("socket.io-client");
var invoke1 = hfcSocket.connect('http://10.21.134.17:5001');
var invoke2 = hfcSocket.connect('http://10.21.134.17:5004');
var invoke3 = hfcSocket.connect('http://10.21.134.17:5005');
var invoke4 = hfcSocket.connect('http://10.21.134.17:5002');
var invoke5 = hfcSocket.connect('http://10.21.134.17:5006');
var invoke6 = hfcSocket.connect('http://10.21.134.17:5003');
var invoke7 = hfcSocket.connect('http://10.21.134.17:5007');
var invoke8 = hfcSocket.connect('http://10.21.134.17:5008');

consumer.on('message', function (message) {
   console.log(" message in consumer  :::: " + JSON.stringify(message));

   if (message.topic == "csvDealData") {
      console.log(" Message ::: " + JSON.stringify(message));
      var originator = JSON.parse(message.value).originator;
      if (originator == "Org1") 
            invoke1.emit('csvDealData', message)
      else if (originator == "Org2") 
            invoke2.emit('csvDealData', message);
      else if (originator == "Org3")
            invoke6.emit('csvDealData', message);
      else if (originator == "Org4") 
            invoke5.emit('csvDealData', message);
      else if (originator == "Org5") 
            invoke3.emit('csvDealData', message);
      else if (originator == "Org6") 
            invoke4.emit('csvDealData', message);
      else if (originator == "Org7") 
            invoke7.emit('csvDealData', message);
      else if (originator == "Org8") 
            invoke8.emit('csvDealData', message);
   } else if (message.topic == "csvAssetData") {
      var originator = JSON.parse(message.value).originator;
      if (originator == "Org1") 
            invoke1.emit('csvAssetData', message)
      else if (originator == "Org2") 
            invoke2.emit('csvAssetData', message);
      else if (originator == "Org3") 
            invoke6.emit('csvAssetData', message);
      else if (originator == "Org4") 
            invoke5.emit('csvAssetData', message);
      else if (originator == "Org5") 
            invoke3.emit('csvAssetData', message);
      else if (originator == "Org6") 
            invoke4.emit('csvAssetData', message);
      else if (originator == "Org7") 
            invoke7.emit('csvAssetData', message);
      else if (originator == "Org8")
            invoke8.emit('csvAssetData', message);
   } else if (message.topic == "csvPricingData") {
      var originator = JSON.parse(message.value).originator;
      if (originator == "Org1") 
            invoke1.emit('csvPricingData', message)
      else if (originator == "Org2")
            invoke2.emit('csvPricingData', message);
      else if (originator == "Org3") 
            invoke6.emit('csvPricingData', message);
      else if (originator == "Org4") 
            invoke5.emit('csvPricingData', message);
      else if (originator == "Org5") 
            invoke3.emit('csvPricingData', message);
      else if (originator == "Org6") 
            invoke4.emit('csvPricingData', message);
      else if (originator == "Org7") 
            invoke7.emit('csvPricingData', message);
      else if (originator == "Org8")
            invoke8.emit('csvPricingData', message);
   } else if (message.topic == "csvRedeemData") {
      var originator = JSON.parse(message.value).originator;
      if (originator == "Org1") 
            invoke1.emit('csvRedeemData', message)
      else if (originator == "Org2") 
            invoke2.emit('csvRedeemData', message);
      else if (originator == "Org3")
            invoke6.emit('csvRedeemData', message);
      else if (originator == "Org4") 
            invoke5.emit('csvRedeemData', message);
      else if (originator == "Org5") 
            invoke3.emit('csvRedeemData', message);
      else if (originator == "Org6") 
            invoke4.emit('csvRedeemData', message);
      else if (originator == "Org7") 
            invoke7.emit('csvRedeemData', message);
      else if (originator == "Org8") 
            invoke8.emit('csvRedeemData', message);
   }
});

consumer.on('error', function (err) {
   console.log('error', err);
});

我期待如果用户已经上传了一个 CSV 并上传了另一种类型的 CSV,它不应该妨碍以前 CSV 的数据,然后它也应该在不调用以前的数据的情况下正确发送新的 CSV。

my fiddle link for producer.js

link for consumer

【问题讨论】:

    标签: javascript arrays apache-kafka kafka-producer-api


    【解决方案1】:

    如果您需要将 CSV 文件中的数据导入 Kafka,请使用 Kafka Connect。 a connector 将摄取 CSV 文件。 Kafka Connect 是 Apache Kafka 的一部分,只需要简单的 JSON 配置即可使用。

    【讨论】:

    • 谢谢,但我已经能够逐行获取数据,直到卡夫卡现在我想弄清楚如何从队列中删除以前的 csv 数据,以便为其他 CSV 数据让路一旦旧的完成。
    猜你喜欢
    • 2021-01-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-12-21
    • 2017-10-12
    • 2018-04-16
    • 2023-03-26
    相关资源
    最近更新 更多