【问题标题】:How to pass observable's elements in correct (real) time?如何正确(实时)传递可观察的元素?
【发布时间】:2017-02-07 00:17:19
【问题描述】:

假设我有一个 aobservable,它发出 2D 点,表示某些路线的转折点

 public static double[][] points = new double[][]{
        {0, 0},
        {0,  10},
        {1,  10},
        {1,  0},
        {0, 0}
    };

    public static Observable<double[]> routePoints() {
        return Observable
            .from(points);
    }

这意味着机器人从原点(0,0)开始,然后到矩形的左下角,然后右下角,然后返回,最后回到原点。

假设现在我想要一个 observable,它提供的点就像机器人以恒定的速度移动,比如每秒 1 个单位。

因此,第一个操作员接收到(0,0) 并按原样重新传输。

然后它接收到(0, 10) 并看到,到该点的距离为10 单位,到达该点需要10 秒。所以,新的 observable 应该发出

(0, 1)
(0, 2)
(0, 3)
(0, 4)
(0, 5)
(0, 6)
(0, 7)
(0, 8)
(0, 9)
(0, 10)

每秒一对,直到最后一个接收点到达并重新传输。然后操作员应该采取下一个点并对其执行相同的操作。

如何使用 ReactiveX 来实现这一点?

我怀疑我们应该在此处实施“bakpressure”以减慢可观察源的速度,直到随后及时重新发射所有内容?

更新

我写了下面的代码,它可以工作。它使用两个flatMaps。一个是提供额外的点,并且每个点都带有时间戳(以秒为单位),另一个 flatMap 引入了适合时间戳的延迟。

代码有效,但看起来很复杂。问题仍然存在:ReactiveX 库是否包含用于这些事情的现成工具:

import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

public class EmitByTime {

   public static double speed = 1;

   public static double[][] points = new double[][]{
      {0, 0},
      {0,  10},
      {1,  10},
      {1,  0},
      {0, 0}
   };

   public static Observable<double[]> routePoints() {
      return Observable
         .from(points);
   }

   public static Observable<double[]> routeFeeder() {

      return routePoints()
         .flatMap(new Func1<double[], Observable<double[]>>() {

            double[] previous = null;

            @Override
            public Observable<double[]> call(double[] doubles) {
               if( previous == null) {
                  previous = new double[] {doubles[0], doubles[1], 0};
                  return Observable.just(previous);
               }
               else {
                  double dx = doubles[0] - previous[0];
                  double dy = doubles[1] - previous[1];
                  double dist = Math.sqrt( dx*dx + dy*dy );
                  dx /= dist;
                  dy /= dist;
                  double vx = dx * speed;
                  double vy = dy * speed;
                  double period = dist / speed;
                  double end = previous[2] + period;



                  double start = Math.ceil(previous[2] + 1);
                  ArrayList<double[]> sequence = new ArrayList<>();
                  for(double t = start; t < end; t+=1) {
                     double tt = (t - previous[2]);
                     double x = previous[0] + tt * vx;
                     double y = previous[1] + tt * vy;
                     sequence.add(new double[] {x, y, t});
                  }

                  previous = new double[] {doubles[0], doubles[1], end};
                  sequence.add(previous);

                  return Observable.from(sequence);
               }
            }
         })
         .flatMap(new Func1<double[], Observable<double[]>>() {

            long origin = System.currentTimeMillis();

            @Override
            public Observable<double[]> call(double[] doubles) {

               long now = System.currentTimeMillis();
               long due = Math.round(doubles[2]*1000) + origin;

               if( due <= now ) {
                  return Observable.just(doubles);
               }
               else {
                  return Observable.just(doubles).delay(due-now, TimeUnit.MILLISECONDS);
               }
            }
         })
         ;



   }

   public static void main(String[] args) throws InterruptedException {


      routeFeeder().subscribe(new Action1<double[]>() {
         @Override
         public void call(double[] doubles) {
            System.out.println(Arrays.toString(doubles));
         }
      });

      Thread.sleep(20000);
   }
}

【问题讨论】:

  • 嗯,那是戴珍珠耳环的女孩
  • 让机器人执行命令队列(在您的情况下为航点)会不会更好,只需将航点放入此队列中,让您的机器人在空闲时立即获取下一个命令准备好执行新指令了吗?

标签: java rx-java reactive-programming backpressure


【解决方案1】:

这感觉主要是一个寻路问题。

我可能有一种方法可以将您的一组航路点转换为通过它们的路线(这是路径查找位)。

类似:

public Observable<Point> calculateRoute(Observable<Point> waypoints);

航路点是您上面提到的“转折点”;

然后你可以zip 一个Observable.interval 流与你的Observable 的路由,每条路由每秒发出一次。

所以可能是这样的:

public Observable<Point> emitRoutePerSecond(Observable<Point> wayPoints) {
   return Observable.interval(1, TimeUnit.SECONDS)
     .zipWith(calculateRoute(wayPoints), (i, point) -> point);
}   

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-03-25
    • 1970-01-01
    • 1970-01-01
    • 2018-11-26
    • 2016-04-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多