【问题标题】:Listening to the Stream created from List in Dart在 Dart 中监听从 List 创建的 Stream
【发布时间】:2014-02-21 23:20:13
【问题描述】:

我修改了教程https://www.dartlang.org/docs/tutorials/streams/中的小例子,在订阅后添加了项目:

import 'dart:async';

main() {
  var data = new List<int>();
  var stream = new Stream.fromIterable(data);  // create the stream

  // subscribe to the streams events
  stream.listen((value) {       //
    print("Received: $value");  // onData handler
  });                           //
  data.add(1);
}

运行这个程序后,我得到了:

Uncaught Error: Concurrent modification during iteration: _GrowableList len:1.
Stack Trace: 
#0      ListIterator.moveNext (dart:_collection-dev/iterable.dart:315)
#1      _IterablePendingEvents.handleNext (dart:async/stream_impl.dart:532)
#2      _PendingEvents.schedule.<anonymous closure> (dart:async/stream_impl.dart:661)
#3      _asyncRunCallback (dart:async/schedule_microtask.dart:18)
#4      _createTimer.<anonymous closure> (dart:async-patch/timer_patch.dart:11)
#5      _Timer._createTimerHandler._handleTimeout (timer_impl.dart:151)
#6      _Timer._createTimerHandler.<anonymous closure> (timer_impl.dart:166)
#7      _RawReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:93)

在添加侦听器之前输入data.add(1) 可以正常工作。

我检查了documentation about Stream 并没有发现我做错了什么。我原以为听众会在最好的情况下被解雇,而在最坏的情况下不会被解雇,但也不例外。

这是预期的行为吗?如果是,请描述原因。

【问题讨论】:

  • 能否在添加前锁定“数据”对象,添加后释放?
  • 锁是什么意思?据我所知,Dart 执行是单线程的,不需要进行任何线程同步。你能指点关于锁定/解锁的手册吗?
  • 对不起。是的,你不能:$。但是计时器事件将异步运行,并且 .add 在 listen() 调用之后立即运行。因此它们将同时运行(即在异步调用使用迭代器时更改数组)。

标签: dart dart-async


【解决方案1】:

异常来自您在迭代时尝试修改列表。这是 Dart (*) 中未指定的行为,使用的实现只是选择抛出异常。虽然它被Stream.fromIterable 中发生的异步事情混淆了,但它基本上与您尝试这样做一样:

var data = [1,2,3];
for(var d in data) {
    print(d);
    data.add(d+10);
}

如果您将data.add 包装在另一个异步调用中,例如使用Timer.run(() =&gt; data.add(2)),它将“工作”。我的意思是它不会抛出异常。

Received: 2 仍然不会被打印。该流将仅发送在调用 new Stream.fromIterable 时已经在列表中的元素。之后,流被关闭(onDone 将被调用),并且对原始列表的修改不会发送给您的侦听器。

(*) 来源:SDK 1.1.3 中的iterator.dart -- “如果迭代的对象在迭代过程中发生变化,则行为未指定。”为什么 api.dartlang.org 上的文字不同,我无法理解。

编辑

回答评论中的问题:一种方法是使用StreamController

// or new StreamController<int>.broadcast(), if you want to listen to the stream more than once
StreamController s = new StreamController<int>();
// produce periodic errors
new Timer.periodic(new Duration(seconds: 5), (Timer t) {
    s.isClosed ? t.cancel() : s.addError("I AM ERROR");
});
// add some elements before subscribing
s.add(6);
s.add(9);
// this will close the stream eventually
new Timer(new Duration(seconds: 20), () => s.close());
// start listening to the stream
s.stream.listen((v) => print(v), 
        onError: (err) => print("An error occured: $err"), 
        onDone: () => print("The stream was closed"));
// add another element before the next event loop iteration
Timer.run(() => s.add(4711));
// periodically add an element
new Timer.periodic(new Duration(seconds: 3), (Timer t) {
    s.isClosed ? t.cancel() : s.add(0);
});
// one more (will be sent before 4711)
s.add(4);

【讨论】:

  • 你是对的,迭代器不支持异步操作,因此无法通知 fromIterable 后面添加的元素。
  • 您能提出任何替代方案吗?我想要一个流,我可以在其中“发送”数据以便订阅者可以收听?我只从dart:html 中找到抽象的CustomStream... 这个任务听起来很简单,我预计“流”应该支持这个“开箱即用”。
  • @ValentynShybanov 用替代方法编辑答案。
  • @MarioP 太棒了!这正是我正在寻找的。 StreamController一定要加到教程里
【解决方案2】:

列表在迭代时无法修改。 您的示例需要一个没有此限制的可迭代对象(例如自定义实现)。

【讨论】:

  • 但是 Dart 的执行不是顺序的吗?没有用于同步的原语,只有用于异步操作的 async 模块,而不是关于并行性。我预计有两种可能性:侦听器不会被解雇,因为它将被添加到“下一个刻度”或将被解雇。但是这种行为看起来像是并行发生的事情,并且无法控制。
  • 我猜 stream.fromIterable() 从列表中获取了一个迭代器,如果在迭代器尚未发布时它正在被修改,则列表抛出。不过,我必须仔细看看 iterable 是如何实现的。
  • 是的,这就是原因 - 在fromIterable 期间,它获得了迭代器,并且在下一次滴答时它试图在该迭代器上移动,但是在离开main 的上下文之前修改了源列表。我仍然认为世界“并发”不是对错误的最佳描述,因为没有并发执行......
猜你喜欢
  • 1970-01-01
  • 2022-07-09
  • 2021-07-22
  • 1970-01-01
  • 1970-01-01
  • 2021-06-04
  • 2021-09-18
  • 2014-01-10
  • 1970-01-01
相关资源
最近更新 更多