【问题标题】:process management with python: execute service or systemd or init.d script使用 python 进行进程管理:执行服务或 systemd 或 init.d 脚本
【发布时间】:2015-10-28 05:32:21
【问题描述】:

如何使用 python 高效、正确地管理进程。我想运行如下命令:

/etc/init.d/daemon stop
service daemon start
systemctl restart daemon

是否有任何可用的 python 模块?

我们将不胜感激。

【问题讨论】:

  • 您的意思是subprocess 模块还是supervisord 之类的东西?此外,您可以在纯 Python 中实现守护进程,例如 Python daemon and systemd service
  • 嗨,我正在寻找这样的东西:code.activestate.com/recipes/…,但适用于 linux。
  • 最简单的解决方案是使用subprocess模块运行相应的命令(例如systemctl restart <service>),尽管可能有相应的Python包装器。
  • 使用子进程不能处理 systemctl 的 stdout/stderr。我通常无法控制它在执行过程中打印的内容。 Arindam 的答案正是我想要的。

标签: python init upstart systemd process-management


【解决方案1】:

我找到了一种使用 systemd dbus 接口的方法。代码如下:

import dbus
import subprocess
import os
import sys
import time

SYSTEMD_BUSNAME = 'org.freedesktop.systemd1'
SYSTEMD_PATH = '/org/freedesktop/systemd1'
SYSTEMD_MANAGER_INTERFACE = 'org.freedesktop.systemd1.Manager'
SYSTEMD_UNIT_INTERFACE = 'org.freedesktop.systemd1.Unit'

bus = dbus.SystemBus()

proxy = bus.get_object('org.freedesktop.PolicyKit1', '/org/freedesktop/PolicyKit1/Authority')
authority = dbus.Interface(proxy, dbus_interface='org.freedesktop.PolicyKit1.Authority')
system_bus_name = bus.get_unique_name()

subject = ('system-bus-name', {'name' : system_bus_name})
action_id = 'org.freedesktop.systemd1.manage-units'
details = {}
flags = 1            # AllowUserInteraction flag
cancellation_id = '' # No cancellation id

result = authority.CheckAuthorization(subject, action_id, details, flags, cancellation_id)

if result[1] != 0:
    sys.exit("Need administrative privilege")

systemd_object = bus.get_object(SYSTEMD_BUSNAME, SYSTEMD_PATH)
systemd_manager = dbus.Interface(systemd_object, SYSTEMD_MANAGER_INTERFACE)

unit = systemd_manager.GetUnit('cups.service')
unit_object = bus.get_object(SYSTEMD_BUSNAME, unit)
#unit_interface = dbus.Interface(unit_object, SYSTEMD_UNIT_INTERFACE)

#unit_interface.Stop('replace')
systemd_manager.StartUnit('cups.service', 'replace')

while list(systemd_manager.ListJobs()):
    time.sleep(2)
    print 'there are pending jobs, lets wait for them to finish.'

prop_unit = dbus.Interface(unit_object, 'org.freedesktop.DBus.Properties')

active_state = prop_unit.Get('org.freedesktop.systemd1.Unit', 'ActiveState')

sub_state = prop_unit.Get('org.freedesktop.systemd1.Unit', 'SubState')

print active_state, sub_state

【讨论】:

  • 身份验证位对我来说仍然是个谜,但它有效!
  • 这很完美!就我而言,我想在 Django/Flask 之类的 Web 界面上管理服务,所以一开始身份验证给我带来了麻烦,但我设法通过使用 solution on AskUbuntu 解决了这个问题。希望它会帮助别人!谢谢阿林丹!
猜你喜欢
  • 2015-03-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-08-01
  • 2019-11-21
  • 2011-01-01
相关资源
最近更新 更多