【问题标题】:Getting signals working on PulseAudio's DBus interface?让信号在 PulseAudio 的 DBus 接口上工作?
【发布时间】:2016-01-19 22:50:14
【问题描述】:

我试图让 D-Bus 信号处理程序在 PulseAudio 中的接收器状态发生变化(例如变为非活动状态)时被调用。不幸的是,它没有被调用,坦率地说我不知道​​为什么。

import dbus
import dbus.mainloop.glib
from gi.repository import GObject


dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SessionBus()


def signal_handler(*args, **kwargs):
    print('sig: ', args, kwargs)


def connect():
    import os
    if 'PULSE_DBUS_SERVER' in os.environ:
        address = os.environ['PULSE_DBUS_SERVER']
    else:
        bus = dbus.SessionBus()
        server_lookup = bus.get_object("org.PulseAudio1", "/org/pulseaudio/server_lookup1")
        address = server_lookup.Get("org.PulseAudio.ServerLookup1", "Address", dbus_interface="org.freedesktop.DBus.Properties")

    return dbus.connection.Connection(address)


conn = connect()
core = conn.get_object(object_path='/org/pulseaudio/core1')
core.connect_to_signal('StateUpdated', signal_handler)
core.ListenForSignal('org.PulseAudio.Core1.Device.StateUpdated', dbus.Array(signature='o'), dbus_interface='org.PulseAudio.Core1')
loop = GObject.MainLoop()
loop.run()

【问题讨论】:

  • 我想我可以以这种速度用 C 语言编写它。我得 l̶o̶v̶e̶ 讨厌 D-Bus!
  • 是无法连接还是一切正常但从未调用回调的问题?如何触发 StateUpdated 信号?
  • @JoGr 从理论上讲,只要设备状态发生变化(即正在播放或不再播放音频),就会触发它。似乎没有调用回调。
  • 你在pulseaudio中加载了module-dbus-protocol吗?
  • 不确定这是否正确core.connect_to_signal('StateUpdated', signal_handler) 看我的回答

标签: python dbus pulseaudio


【解决方案1】:

在我的环境中,我运行了dbuspulseaudio,但是发现的地址不存在:

>>> import dbus
>>> import dbus.mainloop.glib
>>> from gi.repository import GObject
>>> dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
<dbus.mainloop.NativeMainLoop object at 0x7f3c98ffd4e0>
>>> bus = dbus.SessionBus()
>>> server_lookup = bus.get_object("org.PulseAudio1", "/org/pulseaudio/server_lookup1")
>>> address = server_lookup.Get("org.PulseAudio.ServerLookup1", "Address", dbus_interface="org.freedesktop.DBus.Properties")
>>> address
dbus.String('unix:path=/run/user/1000/pulse/dbus-socket', variant_level=1)


$ dbus-monitor --address 'unix:path=/run/user/1000/pulse/dbus-socket'
Failed to open connection to unix:path=/run/user/1000/pulse/dbus-socket: Failed to connect to socket /run/user/1000/pulse/dbus-socket: No such file or directory

$ ls /run/user/1000/pulse/
cli  native  pid

我不知道我的配置是否是默认的,但似乎 dbus 集成不存在!

【讨论】:

  • 所以/usr/lib/pulse-7.1/modules/module-dbus-protocol.so 存在但未加载,根据pacmd dump
【解决方案2】:

试试这个,对我有用。

import dbus
import os
from dbus.mainloop.glib import DBusGMainLoop
import gobject
def pulse_bus_address():
    if 'PULSE_DBUS_SERVER' in os.environ:
        address = os.environ['PULSE_DBUS_SERVER']
    else:
        bus = dbus.SessionBus()
        server_lookup = bus.get_object("org.PulseAudio1", "/org/pulseaudio/server_lookup1")
        address = server_lookup.Get("org.PulseAudio.ServerLookup1", "Address", dbus_interface="org.freedesktop.DBus.Properties")
        print(address)

    return address

def sig_handler(state):
    print("State changed to %s" % state)
    if state == 0:
        print("Pulseaudio running.")
    elif state == 1:
        print("Pulseaudio idle.")
    elif state == 2:
        print("Pulseaudio suspended")

# setup the glib mainloop

DBusGMainLoop(set_as_default=True)

loop = gobject.MainLoop()

pulse_bus = dbus.connection.Connection(pulse_bus_address())
pulse_core = pulse_bus.get_object(object_path='/org/pulseaudio/core1')
pulse_core.ListenForSignal('org.PulseAudio.Core1.Device.StateUpdated', dbus.Array(signature='o'), dbus_interface='org.PulseAudio.Core1')

pulse_bus.add_signal_receiver(sig_handler, 'StateUpdated')
loop.run()

要求pulseaudio的default.pa有以下内容:

.ifexists module-dbus-protocol.so
load-module module-dbus-protocol
.endif

编辑: 对于那些想知道@conf-f-use 关于应用程序名称的问题的人。原来他们自己回答了这个问题并在这里发布了答案:https://askubuntu.com/questions/906160/is-there-a-way-to-detect-whether-a-skype-call-is-in-progress-dbus-pulseaudio
窃取@con-f-use 的一段代码并应用到我上面的代码中,我们得到一个监视器,它可以跟踪状态,并能够告诉你应用程序名称、艺术家、标题和正在播放的内容。 干杯@con-f-use :)

import dbus
import os
from dbus.mainloop.glib import DBusGMainLoop
import gobject
def pulse_bus_address():
    if 'PULSE_DBUS_SERVER' in os.environ:
        address = os.environ['PULSE_DBUS_SERVER']
    else:
        bus = dbus.SessionBus()
        server_lookup = bus.get_object("org.PulseAudio1", "/org/pulseaudio/server_lookup1")
        address = server_lookup.Get("org.PulseAudio.ServerLookup1", "Address", dbus_interface="org.freedesktop.DBus.Properties")
        print(address)

    return address

# convert byte array to string
def dbus2str(db):
    if type(db)==dbus.Struct:
        return str(tuple(dbus2str(i) for i in db))
    if type(db)==dbus.Array:
        return "".join([dbus2str(i) for i in db])
    if type(db)==dbus.Dictionary:
        return dict((dbus2str(k), dbus2str(v)) for k, v in db.items())
    if type(db)==dbus.String:
        return db+''
    if type(db)==dbus.UInt32:
        return str(db+0)
    if type(db)==dbus.Byte:
        return chr(db)
    if type(db)==dbus.Boolean:
        return db==True
    if type(db)==dict:
        return dict((dbus2str(k), dbus2str(v)) for k, v in db.items())
    return "(%s:%s)" % (type(db), db)

def sig_handler(state):
    print("State changed to %s" % state)
    if state == 0:
        print("Pulseaudio running.")
    elif state == 1:
        print("Pulseaudio idle.")
    elif state == 2:
        print("Pulseaudio suspended")

    dbus_pstreams = (
        dbus.Interface(
            pulse_bus.get_object(object_path=path),
            dbus_interface='org.freedesktop.DBus.Properties'
        ) for path in pulse_core.Get(
            'org.PulseAudio.Core1',
            'PlaybackStreams',
            dbus_interface='org.freedesktop.DBus.Properties' )
        )
    pstreams = {}
    for pstream in dbus_pstreams:
        try:
            pstreams[pstream.Get('org.PulseAudio.Core1.Stream', 'Index')] =  pstream
        except dbus.exceptions.DBusException:
            pass
    if pstreams:
        for stream in pstreams.keys():
            plist = pstreams[stream].Get('org.PulseAudio.Core1.Stream', 'PropertyList')
            appname = dbus2str(plist.get('application.name', None))
            artist = dbus2str(plist.get('media.artist', None))
            title = dbus2str(plist.get('media.title', None))
            name = dbus2str(plist.get('media.name', None))
            print appname,artist,title,name


# setup the glib mainloop

DBusGMainLoop(set_as_default=True)

loop = gobject.MainLoop()

pulse_bus = dbus.connection.Connection(pulse_bus_address())
pulse_core = pulse_bus.get_object(object_path='/org/pulseaudio/core1')
#pulse_clients = pulse_bus.get_object(object_path='/org/pulseaudio/core1/Clients')
#print dir(pulse_clients)
pulse_core.ListenForSignal('org.PulseAudio.Core1.Device.StateUpdated', dbus.Array(signature='o'), dbus_interface='org.PulseAudio.Core1')

pulse_bus.add_signal_receiver(sig_handler, 'StateUpdated')
loop.run()

【讨论】:

  • 对于 Ubuntu 用户,这可能很有用:stackoverflow.com/questions/13403314/... 在 Ubuntu 脉冲上不启用 dbus。为了缓解这种情况:sudo echo 'load-module module-dbus-protocol' &gt;&gt; ~/.config/pulse/default.pa; pkill pulseaudio; pulseaudio/etc/pulse/default.pa 为所有用户
  • 还有一个小问题:如果应用程序打开了一个接收器或源,我如何在sig_handler() 中获取应用程序名称?
  • @con-f-use 看到你的优秀帖子 askubuntu.com/questions/906160/… ,偷了一大块并添加到这篇文章中以使其更完善。我相信我已经给了你足够的信任。
猜你喜欢
  • 1970-01-01
  • 2012-10-14
  • 2016-09-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-03-16
  • 2016-07-16
  • 2012-05-25
相关资源
最近更新 更多