这是我想出的解决方案;它有点复杂,但不如 ruamel 复杂,因为它完全与普通的 PyYAML API 一起工作,并且不往返 cmets(因此它不是this other question 的合适答案)。它可能总体上还没有那么健壮,因为我没有进行广泛的测试,但是对于我的用例来说似乎已经足够了,也就是说,我希望 dicts/mappings 能够拥有 cmets,既适用于整个映射,也适用于每个项目的 cmets。
我相信往返 cmets - 在这种有限的情况下 - 也可以使用类似的方法,但我没有尝试过,因为它目前不是我拥有的用例。
最后,虽然这个解决方案没有实现为列表/序列中的项目添加每个项目的评论(因为这不是我目前需要的),但它可以很容易地扩展到这样做。
首先,就像在 ruamel 中一样,我们需要一种 CommentedMapping 类,它将 cmets 与 Mapping 中的每个键相关联。有很多可能的方法。我的只有一个:
from collections.abc import Mapping, MutableMapping
class CommentedMapping(MutableMapping):
def __init__(self, d, comment=None, comments={}):
self.mapping = d
self.comment = comment
self.comments = comments
def get_comment(self, *path):
if not path:
return self.comment
# Look the key up in self (recursively) and raise a
# KeyError or other execption if such a key does not
# exist in the nested structure
sub = self.mapping
for p in path:
if isinstance(sub, CommentedMapping):
# Subvert comment copying
sub = sub.mapping[p]
else:
sub = sub[p]
comment = None
if len(path) == 1:
comment = self.comments.get(path[0])
if comment is None:
comment = self.comments.get(path)
return comment
def __getitem__(self, item):
val = self.mapping[item]
if (isinstance(val, (dict, Mapping)) and
not isinstance(val, CommentedMapping)):
comment = self.get_comment(item)
comments = {k[1:]: v for k, v in self.comments.items()
if isinstance(k, tuple) and len(k) > 1 and k[0] == item}
val = self.__class__(val, comment=comment, comments=comments)
return val
def __setitem__(self, item, value):
self.mapping[item] = value
def __delitem__(self, item):
del self.mapping[item]
for k in list(self.comments):
if k == item or (isinstance(k, tuple) and k and k[0] == item):
del self.comments[key]
def __iter__(self):
return iter(self.mapping)
def __len__(self):
return len(self.mapping)
def __repr__(self):
return f'{type(self).__name__}({self.mapping}, comment={self.comment!r}, comments={self.comments})'
这个类既有.comment 属性,所以它可以携带映射的整体注释,还有一个.comments 属性包含每个键的cmets。它还允许通过将键路径指定为元组来为嵌套字典中的键添加 cmets。例如。 comments={('c', 'd'): 'comment'} 允许在 'c' 的嵌套字典中为键 'd' 指定注释。当从CommentedMapping 获取项目时,如果项目的值是 dict/Mapping,它也会以保留其 cmets 的方式包装在 CommentedMapping 中。这对于递归调用嵌套结构的 YAML 表示器很有用。
接下来,我们需要实现一个自定义 YAML Dumper,它负责将对象序列化为 YAML 的整个过程。 Dumper 是一个复杂的类,它由四个其他类组成:Emitter、Serializer、Representer 和Resolver。其中我们只需要实现前三个; Resolvers 更关心,例如像1 这样的隐含标量如何解析为正确的类型,以及确定各种值的默认标签。这里并没有真正涉及。
首先我们实现一个解析器。解析器负责识别不同的 Python 类型,并将它们映射到本机 YAML 数据结构/表示图中相应的 节点。即,这些包括标量节点、序列节点和映射节点。例如,Representer 基类包括 Python dicts 的表示器,它将它们转换为 MappingNode(字典中的每个项目依次由一对 ScalarNodes 组成,每个键一个,一个每个值)。
为了将 cmets 附加到整个映射以及映射中的每个键,我们引入了两种新的 Node 类型,它们不是 YAML 规范的正式组成部分:
from yaml.node import Node, ScalarNode, MappingNode
class CommentedNode(Node):
"""Dummy base class for all nodes with attached comments."""
class CommentedScalarNode(ScalarNode, CommentedNode):
def __init__(self, tag, value, start_mark=None, end_mark=None, style=None,
comment=None):
super().__init__(tag, value, start_mark, end_mark, style)
self.comment = comment
class CommentedMappingNode(MappingNode, CommentedNode):
def __init__(self, tag, value, start_mark=None, end_mark=None,
flow_style=None, comment=None, comments={}):
super().__init__(tag, value, start_mark, end_mark, flow_style)
self.comment = comment
self.comments = comments
然后我们添加一个CommentedRepresenter,其中包含将CommentedMapping 表示为CommentedMappingNode 的代码。实际上,它只是重用了基类的代码来表示映射,但是将返回的MappingNode 转换为CommentedMappingNode。它还将每个密钥从ScalarNode 转换为CommentedscalarNode。我们在这里基于SafeRepresenter,因为我不需要序列化任意 Python 对象:
from yaml.representer import SafeRepresenter
class CommentedRepresenter(SafeRepresenter):
def represent_commented_mapping(self, data):
node = super().represent_dict(data)
comments = {k: data.get_comment(k) for k in data}
value = []
for k, v in node.value:
if k.value in comments:
k = CommentedScalarNode(
k.tag, k.value,
k.start_mark, k.end_mark, k.style,
comment=comments[k.value])
value.append((k, v))
node = CommentedMappingNode(
node.tag,
value,
flow_style=False, # commented dicts must be in block style
# this could be implemented differently for flow-style
# maps, but for my case I only want block-style, and
# it makes things much simpler
comment=data.get_comment(),
comments=comments
)
return node
yaml_representers = SafeRepresenter.yaml_representers.copy()
yaml_representers[CommentedMapping] = represent_commented_mapping
接下来我们需要实现Serializer 的子类。 serializer 负责遍历节点的表示图,并为每个节点输出一个或多个事件到 emitter,这是一个复杂的(有时难以遵循)状态机,它接收事件流并为每个事件输出适当的 YAML 标记(例如,有一个 MappingStartEvent,如果它是流式映射,则在接收时将输出 {,和/或添加后续输出的适当缩进级别,直到对应的MappingEndEvent。
重点是,新的序列化程序必须输出表示 cmets 的事件,以便发射器知道何时需要发出注释。这只是通过添加一个CommentEvent 并在每次在表示中遇到CommentedMappingNode 或CommentedScalarNode 时发出它们来处理:
from yaml import Event
class CommentEvent(yaml.Event):
"""
Simple stream event representing a comment to be output to the stream.
"""
def __init__(self, value, start_mark=None, end_mark=None):
super().__init__(start_mark, end_mark)
self.value = value
class CommentedSerializer(Serializer):
def serialize_node(self, node, parent, index):
if (node not in self.serialized_nodes and
isinstance(node, CommentedNode) and
not (isinstance(node, CommentedMappingNode) and
isinstance(parent, CommentedMappingNode))):
# Emit CommentEvents, but only if the current node is not a
# CommentedMappingNode nested in another CommentedMappingNode (in
# which case we would have already emitted its comment via the
# parent mapping)
self.emit(CommentEvent(node.comment))
super().serialize_node(node, parent, index)
接下来,Emitter 需要子类化以处理 CommentEvents。这可能是最棘手的部分,因为正如我所写,发射器有点复杂和脆弱,并且以难以修改状态机的方式编写(我很想更清楚地重写它,但没有时间现在)。所以我尝试了许多不同的解决方案。
这里的关键方法是Emitter.emit,它处理事件流,并调用“状态”方法,这些方法根据机器所处的状态执行一些操作,而这些状态又受到流中出现的事件的影响。一个重要的认识是,在许多情况下,流处理在等待更多事件进入时会暂停——这就是Emitter.need_more_events 方法的职责。在某些情况下,在处理当前事件之前,需要先进入更多事件。例如,在MappingStartEvent 的情况下,至少需要在流上缓冲3 个以上的事件:第一个键/值对,以及可能的下一个键。发射器在开始格式化地图之前需要知道地图中是否有一个或多个项目,并且可能还需要知道第一个键/值对的长度。在处理当前事件之前所需的事件数硬编码在 need_more_events 方法中。
问题在于,这并没有考虑到CommentEvents 现在可能出现在事件流中,这不应该影响其他事件的处理。因此Emitter.need_events 方法可以解释CommentEvents 的存在。例如。如果当前事件是 MappingStartEvent,并且缓冲了 3 个后续事件,如果其中一个是 CommentEvent,我们无法计算它,所以我们至少需要 4 个事件(如果下一个是映射中的预期事件之一)。
最后,每次在流中遇到CommentEvent,我们都会强制跳出当前的事件处理循环来处理评论的写入,然后将CommentEvent从流中弹出并继续,就好像什么都没发生一样。这是最终结果:
import textwrap
from yaml.emitter import Emitter
class CommentedEmitter(Emitter):
def need_more_events(self):
if self.events and isinstance(self.events[0], CommentEvent):
# If the next event is a comment, always break out of the event
# handling loop so that we divert it for comment handling
return True
return super().need_more_events()
def need_events(self, count):
# Hack-y: the minimal number of queued events needed to start
# a block-level event is hard-coded, and does not account for
# possible comment events, so here we increase the necessary
# count for every comment event
comments = [e for e in self.events if isinstance(e, CommentEvent)]
return super().need_events(count + min(count, len(comments)))
def emit(self, event):
if self.events and isinstance(self.events[0], CommentEvent):
# Write the comment, then pop it off the event stream and continue
# as normal
self.write_comment(self.events[0].value)
self.events.pop(0)
super().emit(event)
def write_comment(self, comment):
indent = self.indent or 0
width = self.best_width - indent - 2 # 2 for the comment prefix '# '
lines = ['# ' + line for line in wrap(comment, width)]
for line in lines:
if self.encoding:
line = line.encode(self.encoding)
self.write_indent()
self.stream.write(line)
self.write_line_break()
我还尝试了不同的方法来实现write_comment。 Emitter 基类有它自己的方法 (write_plain),它可以使用适当的缩进和换行处理将文本写入流。但是,它不够灵活,无法处理诸如 cmets 之类的东西,其中每行都需要以'# ' 之类的东西作为前缀。我尝试的一种技术是猴子修补write_indent 方法来处理这种情况,但最后它太难看了。我发现简单地使用 Python 的内置 textwrap.wrap 就足够了。
接下来,我们通过继承现有的 SafeDumper 来创建转储程序,但将我们的新类插入 MRO:
from yaml import SafeDumper
class CommentedDumper(CommentedEmitter, CommentedSerializer,
CommentedRepresenter, SafeDumper):
"""
Extension of `yaml.SafeDumper` that supports writing `CommentedMapping`s with
all comments output as YAML comments.
"""
这是一个示例用法:
>>> import yaml
>>> d = CommentedMapping({
... 'a': 1,
... 'b': 2,
... 'c': {'d': 3},
... }, comment='my commented dict', comments={
... 'a': 'a comment',
... 'b': 'b comment',
... 'c': 'long string ' * 44,
... ('c', 'd'): 'd comment'
... })
>>> print(yaml.dump(d, Dumper=CommentedDumper))
# my commented dict
# a comment
a: 1
# b comment
b: 2
# long string long string long string long string long string long string long
# string long string long string long string long string long string long string
# long string long string long string long string long string long string long
# string long string long string long string long string long string long string
# long string long string long string long string long string long string long
# string long string long string long string long string long string long string
# long string long string long string long string long string
c:
# d comment
d: 3
我还没有对这个解决方案进行广泛的测试,而且它可能仍然包含错误。我会在我更多使用它并找到极端情况等时对其进行更新。