【问题标题】:Why event listener "pause" execution of async function to start new execution?为什么事件侦听器“暂停”执行异步函数以开始新的执行?
【发布时间】:2022-10-15 03:06:02
【问题描述】:

如果第二次发出相同的事件,为什么 Node.js 事件侦听器会“暂停”异步函数的执行以开始第二次执行?第二个问题:如何完成第一次执行,然后开始第二次执行?

即,如果在 Node.js 中启动此代码:

import { EventEmitter } from "events";

let event = new EventEmitter();

event.on("myEvent", async function () {
  console.log("Start");
  await new Promise((resolve, reject) => {
    setTimeout(() => {
      resolve(console.log("Do job"));
    }, 1000);
  });
  console.log("Finish");
});

event.emit("myEvent"); // first emit
event.emit("myEvent"); // second emit

然后我得到这样的结果:

Start
Start
Do job
Finish
Do job
Finish

但是我想看到这个:

Start
Do job
Finish
Start
Do job
Finish

更新 下面我放了包含所描述问题的真实代码

const web3 = new Web3(
  new Web3.providers.WebsocketProvider(
    "wss://eth-mainnet.g.alchemy.com/v2/<API-KEY>"
  )
);
let walletAddress = "0x123";

let options = {
  topics: [web3.utils.sha3("Transfer(address,address,uint256)")],
};

let subscription = web3.eth.subscribe("logs", options);

subscription.on("data", async (event) => {
  if (event.topics.length == 3) {
    let transaction = decodeTransaction(event); //just using web3.eth.abi.decodeLog(...)
    if (
      transaction.from === walletAddress ||
      transaction.to === walletAddress
    ) {
      const contract = new web3.eth.Contract(abi, event.address);
      let coinSymbol = await contract.methods.symbol().call(); //<-- The issue starts here
      await redisClient.hSet(
        walletAddress,
        coinSymbol,
        transaction.value
      );
    }
  }
});

【问题讨论】:

  • 那么,您添加的代码到底有什么问题?而且,如果它与collectData() 有关,请也显示该代码。
  • collectData() 只是一个调用另一个异步函数的函数。为了更清楚,我将 collectData() 替换为实际调用的异步函数。据我了解,由于这个函数是异步的,它会在 promise 解析时暂停执行。此时事件侦听器接收到新事件。由于之前的事件还没有完成执行,新的事件以错误的方式执行(因为从数据库中读取了不正确的数据)。
  • 也许您想在处理功能周围使用async-lock 之类的东西。
  • 但我不确定这甚至是你的问题,因为它看起来好像你正在尝试跟踪余额,但是你实际上做的是为每个钱包保存每个硬币的最后一个已知交易价值(不考虑传入或传出方向)。 - 即便如此,使用符号作为密钥可能不是最好的主意,应该是合约地址 - 原因是我可能会通过部署带有符号 USDT 的合约来搞砸你的数据库(这不是真正的 USDT当然是合约)并以这种方式向某人发送 1,000,000 "USDT"。

标签: node.js asynchronous async-await web3js emitter


【解决方案1】:

这里的关键是async 函数不会阻塞解释器,EventEmitter 事件不会等待async 事件处理程序解决它们的承诺。

所以,这就是发生的事情:

  1. 第一个event.emit() 被调用。这会同步触发 myEvent 处理函数被调用。
  2. 该函数执行。在输出start 后,它会命中await。这会导致它暂停函数的进一步执行,并立即将承诺返回给调用者。这会导致第一个event.emit(...) 被完成,因为eventEmitter 对象不是promise-aware - 它根本不注意您的事件处理函数返回的promise。
  3. 第二个event.emit() 被调用。这会同步触发 myEvent 处理函数被调用。
  4. 该函数执行。输出start 后,它命中await。这会导致它暂停函数的进一步执行,并立即将承诺返回给调用者。这会导致第二个event.emit(...) 完成。

    因此,这就是您的输出以以下开头的原因:

    Start
    Start
    

    然后,稍后(在setTimeout() 触发之后),console.log("Do job") 输出承诺得到解决,这导致await 得到满足,并且函数在await 之后恢复执行。然后输出Finish

    所以,此时,第一个计时器已经触发,你有:

    Start
    Start
    Do job
    Finish
    

    然后第二个setTimeout() 触发并执行相同的操作,然后您就拥有了完整的输出:

    Start
    Start
    Do job
    Finish
    Do job
    Finish
    

    这里的关键是 EventEmitter 类对于它的事件处理程序来说不是承诺感知的。它不注意您的 async 函数返回的承诺,因此在允许其余代码继续执行之前不会“等待”它解决。


    如果您在subscription.on('data', ...) 代码中尝试执行的操作是序列化每个事件的处理,以便在开始处理下一个事件之前完成处理一个事件,那么您可以将事件排队并仅在前一个已经完成。如果在您仍在处理前一个事件时有一个新事件到达,它只会被放入队列中并一直留在那里,直到前一个事件完成处理。

    以下是该代码的外观:

    const eventQueue = [];
    let eventInProgress = false;
    
    async function processEvent(event) {
        try {
            eventInProgress = true;
            if (event.topics.length == 3) {
                let transaction = decodeTransaction(event); //just using web3.eth.abi.decodeLog(...)
                if (
                    transaction.from === walletAddress ||
                    transaction.to === walletAddress
                ) {
                    const contract = new web3.eth.Contract(abi, event.address);
                    let coinSymbol = await contract.methods.symbol().call(); //<-- The issue starts here
                    await redisClient.hSet(
                        walletAddress,
                        coinSymbol,
                        transaction.value
                    );
                }
            }
        } catch (e) {
            // have to decide what to do with rejections from either of the await statements
            console.log(e);
        } finally {
            eventInProgress = false;
            // if there are more events to process, then process the oldest one
            if (eventQueue.length) {
                processEvent(eventQueue.shift());
            }
        }
    }
    
    subscription.on("data", (event) => {
        // serialize the processing of events
        if (eventInProgress) {
            eventQueue.push(event);
        } else {
            processEvent(event);
        }
    });
    

【讨论】:

  • 谢谢@jfriend00!任何适用的解决方法?
  • @VolodymyrNabok - 好吧,您不能使用 EventEmitter 对包含异步操作的事件处理程序进行排序。 EventEmitter 只是不这样做。如果您在这里展示了说明实际操作的真实代码,以便我们可以看到您尝试排序的内容,那么我们可以就如何解决实际问题提出实际建议。这就是为什么在您的问题中显示真实代码和真正问题通常对您有利,而不是一个化妆示例,因为我们可以在您这样做时更好地帮助您。
  • 当然,你是对的,我已经用新的 sn-p 更新了我的问题。
  • @VolodymyrNabok - 我在答案的末尾添加了一种方法,用于序列化您的 subscription.on("data", ...) 事件的处理,以便您在开始使用简单队列处理下一个事件之前完成处理一个。
猜你喜欢
  • 1970-01-01
  • 2023-02-18
  • 2018-04-15
  • 2011-11-29
  • 2020-04-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-04-22
相关资源
最近更新 更多