您可以尝试使用带有r 标志的lockfile-create 命令重试指定次数,捕获CalledProcessError 并退出,-p 标志将存储进程的pid:
import os
import sys
from time import sleep
from subprocess import check_call, CalledProcessError
try:
check_call(["lockfile-create", "-q","-p", "-r", "0", "-l", "my.lock"])
except CalledProcessError as e:
print("{} is already running".format(sys.argv[0]))
print(e.returncode)
exit(1)
# main body
for i in range(10):
sleep(2)
print(1)
check_call(["rm","-f","my.lock"])
使用上面的代码运行 test.py 脚本,而脚本已经在运行,会输出以下内容:
$ python lock.py
lock.py is already running
4
选项
-q, --quiet
抑制任何输出。成功或失败将仅由退出状态指示。
-v, --verbose
启用诊断输出。
-l, --lock-name
不要将 .lock 附加到文件名。此选项适用于 lockfile-create、lockfile-remove、lockfile-touch 或 lockfile-check。
-p, --use-pid
每当创建锁文件时,将当前进程 ID (PID) 写入锁文件,并在检查锁的有效性时使用该 PID。有关详细信息,请参见 lockfile_create(3) 联机帮助页。此选项适用于 lockfile-create、lockfile-remove、lockfile-touch 和 lockfile-check。
-o, --oneshot
触摸锁并立即退出。此选项适用于 lockfile-touch 和 mail-touchlock。如果未提供,这些命令将永远运行,每分钟触摸一次锁,直到被杀死。
-r 重试计数,--retry 重试计数
尝试在放弃前锁定文件名重试次数。每次尝试将比上一次延迟稍长(以 5 秒为增量),直到重试之间的最大延迟达到一分钟。如果未指定 retry-count,则默认为 9,如果所有 9 次锁定尝试都失败,则会在 180 秒(3 分钟)后放弃。
说明
lockfile_create 函数以 NFS 安全方式创建锁定文件。
如果 flags 设置为 L_PID,那么 lockfile_create 不仅会检查现有的锁定文件,还会读取内容以查看它是否包含 ASCII 中的进程 ID。如果是这样,则锁定文件仅在该进程仍然存在时才有效。
如果锁定文件位于共享文件系统上,它可能是由远程主机上的进程创建的。因此,进程 ID 检查是无用的,不应该设置 L_PID 标志。在这种情况下,没有很好的方法来查看锁定文件是否过时。因此,如果锁定文件超过 5 分钟,它将被删除。这就是为什么提供lockfile_touch函数的原因:在持有锁的同时,需要通过调用lockfile_touch()定期(每分钟左右)刷新一次。
lockfile_check 函数检查是否已经存在有效的锁定文件,而不尝试创建新的锁定文件。
最后 lockfile_remove 函数删除了锁文件。
Algorithm
用于以原子方式(甚至在 NFS 上)创建锁文件的算法如下:
1
创建了一个独特的文件。在 printf 格式中,文件的名称是 .lk%05d%x%s。第一个参数 (%05d) 是当前进程 ID。第二个参数 (%x) 由 time(2) 返回的值的 4 个次要位组成。最后一个参数是系统主机名。
2
然后使用链接(2)创建锁定文件。链接的返回值被忽略。
3
现在锁定文件是 stat()ed。如果统计失败,我们转到第 6 步。
4
锁定文件的统计值与临时文件的统计值进行比较。如果它们相同,我们就有了锁。临时文件被删除,返回值 0(成功)给调用者。
5
检查现有的锁文件是否有效。如果无效,则删除过时的锁定文件。
6
在重试之前,我们会休眠 n 秒。 n 最初为 5 秒,但每次重试后,额外增加 5 秒,最多为 60 秒(增量退避)。然后我们进入第 2 步,直到重试次数。
redhat 上似乎有一个名为 lockfile-progs 的等效包。
在 mac 上,您可以使用 lockfile 并执行以下操作:
import os
import sys
from time import sleep
import os
from subprocess import Popen, CalledProcessError, check_call
p = Popen(["lockfile", "-r", "0", "my.lock"])
p.wait()
if p.returncode == 0:
with open("my.pid", "w") as f:
f.write(str(os.getpid()))
else:
try:
with open("my.pid") as f:
# see if process is still running or lockfile
# is left over from previous run.
r = f.read()
check_call(["kill", "-0", "{}".format(r)])
except CalledProcessError:
# remove old lock file and create new
check_call(["rm", "-f", "my.lock"])
check_call(["lockfile", "-r", "0", "my.lock"])
# update pid
with open("my.pid", "w") as out:
out.write(str(os.getpid()))
print("Deleted stale lockfile.")
else:
print("{} is already running".format(sys.argv[0]))
print(p.returncode)
exit(1)
# main body
for i in range(10):
sleep(1)
print(1)
check_call(["rm", "-f", "my.lock"])
在您的情况下,使用套接字可能会起作用:
from socket import socket, gethostname, error, SO_REUSEADDR, SOL_SOCKET
from sys import argv
import errno
sock = socket()
# Create a socket object
host = gethostname()
# /proc/sys/net/ipv4/ip_local_port_range is 32768 61000 on my Ubuntu Machine
port = 60001
# allow connection in TIME_WAIT
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
try:
sock.bind((host, port))
sock.connect((host, port))
except error as e:
# [Errno 99] Cannot assign requested address
if e.errno == errno.EADDRNOTAVAIL:
print("{} is already running".format(argv[0]))
exit(1)
# else raise the error
else:
raise e
# main body
from time import sleep
while True:
print(1)
sleep(2)
sock.close()