【问题标题】:Update pyqtgraph plot from RxPy observable从 RxPy observable 更新 pyqtgraph 图
【发布时间】:2017-04-17 05:12:14
【问题描述】:

在以下脚本中,创建了一个绘图窗口并将values 正确传递给plot.update。但是,剧情没有更新。我做错了什么?

import sys
import time

import numpy
from numpy import pi

import rx
from rx.concurrency import QtScheduler

import pyqtgraph as pg
from pyqtgraph.Qt import QtCore, QtGui


class DemoData:

    def __init__(self, window, steps):

        def signal(t):
            omega = 5
            return numpy.sin(2*pi*omega*t) + numpy.random.random()

        self.stream = rx.Observable.range(
            0, steps
        ).map(
            signal
        ).buffer_with_count(
            window, 1
        )


class UpdatingPlot:

    def __init__(self):
        self.plot = pg.plot()
        self.curve = self.plot.plot()

    def update(self, values):
        self.curve.setData(values)


def main():

    app = QtGui.QApplication([])
    scheduler = QtScheduler(QtCore)

    window, steps = 50, 100
    data = DemoData(window, steps)
    plot = UpdatingPlot()

    data.stream.subscribe_on(scheduler=scheduler).subscribe(plot.update)

    sys.exit(app.exec_())


if __name__ == '__main__':

    main()

【问题讨论】:

    标签: python pyqtgraph rx-py


    【解决方案1】:

    这不能解决您的问题,但可能会解决您的需求。您可以使用包joystick,使用pip install joystick 安装它。它旨在提供实时和交互式绘图框架。

    这是一个使用它的例子。它创建了一个“监听”的图形框架,并将您的正弦函数显示为时间的函数;每 0.2 秒添加一个新数据点。

    import joystick as jk
    import numpy as np
    import time
    
    class DemoData(jk.Joystick):
        # initialize the infinite loop and callit decorators so they can auto-
        # register methods they decorate
        _infinite_loop = jk.deco_infinite_loop()
        _callit = jk.deco_callit()
    
        @_callit('before', 'init')
        def _init_data(self, *args, **kwargs):
            # Function automatically called at initialization, thanks to the
            # decorator
            self.tdata = np.array([])  # time x-axis
            self.ydata = np.array([])  # data y-axis
            self.omega = 5
    
        @_callit('after', 'init')
        def _build_frames(self, *args, **kwargs):
            # Function automatically called at initialization, thanks to the
            # decorator.
            # Creates a graph frame
            self.mygraph = self.add_frame(
                       jk.Graph(name="DemoData", size=(500, 500), pos=(50, 50),
                                fmt="go-", xnpts=50, freq_up=7, bgcol="w",
                                xylim=(None,None,-1.1,2.1), xlabel='t', ylabel='sine'))
    
        @_callit('before', 'start')
        def _set_t0(self):
            # initialize t0 at start-up
            self._t0 = time.time()
    
        @_infinite_loop(wait_time=0.2)
        def _get_data(self):
            # This method will automatically be called with simulation start
            # (t.start()), and looped every 0.2 in a separate thread as long as
            # the simulation runs (running == True)
            # It generates new data and pushes it to the frame.
            # concatenate data on the time x-axis
            timestamp = time.time() - self._t0
            self.tdata = jk.add_datapoint(self.tdata,
                                          timestamp,
                                          xnptsmax=self.mygraph.xnptsmax)
            new_y_data = np.sin(2*np.pi*self.omega*timestamp) + np.random.random()
            self.ydata = jk.add_datapoint(self.ydata,
                                          new_y_data,
                                          xnptsmax=self.mygraph.xnptsmax)
            # push new data to the graph
            self.mygraph.set_xydata(np.round(self.tdata, 1), self.ydata)
    
    d = DemoData()
    d.start()
    ...
    d.stop()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-03-31
      • 2017-04-09
      • 2017-07-02
      • 2018-09-12
      • 2020-08-04
      • 2017-09-26
      相关资源
      最近更新 更多