输入规范:
- 0 到 999999 之间的行号(如果需要 1 索引,可以应用偏移量)
- 每个行号只出现一次
- 数字按升序排序(很有用,我们还是希望对它们进行排序)
当匹配数超过可能值的一半时,您的一个好主意是反转结果的含义。让我们保留这一点,并假设我们得到了一个标志和一个匹配/未命中列表。
您最初尝试将数字编码为逗号分隔的文本。这意味着对于 90% 的可能值,您需要 6 个字符 + 1 个分隔符——因此平均需要 7 个字节。但是,由于最大值为 999999,因此您实际上只需要 20 位来对每个条目进行编码。
因此,减小尺寸的第一个想法是使用二进制编码。
二进制编码
最简单的方法是写入发送的值的数量,后跟一个 32 位整数流。
一种更有效的方法是将两个 20 位值打包到每 5 个写入的字节中。如果是奇数,您只需用零填充 4 个多余的位。
这些方法可能适用于少量匹配(或未命中)。然而,需要注意的重要一点是,对于每一行,我们只需要跟踪 1 位信息——无论它是否存在。这意味着我们可以将结果编码为 1000000 位的位图。
结合这两种方法,我们可以在匹配或未命中的情况下使用位图,并在效率更高时切换到二进制编码。
范围缩小
编码排序的整数序列时使用的下一个潜在改进是使用范围缩减。
这个想法是从最大到最小对值进行编码,并随着它们变得更小而减少每个值的位数。
- 首先,我们对表示第一个值所需的位数
N 进行编码。
- 我们使用
N bits 对第一个值进行编码
- 对于以下每个值
- 使用
N bits 对值进行编码
- 如果值需要更少的位来编码,请适当减少
N
熵编码
让我们回到位图编码。基于Shannon entropy theory
最坏的情况是我们有 50% 的匹配。概率偏差越大,我们平均需要对每个条目进行编码的位数就越少。
Matches | Bits
--------+-----------
0 | 0
1 | 22
2 | 41
3 | 60
4 | 78
5 | 96
10 | 181
100 | 1474
1000 | 11408
10000 | 80794
100000 | 468996
250000 | 811279
500000 | 1000000
为此,我们需要使用可以对小数位进行编码的熵编码器,例如算术或范围编码器或某些基于 ANS 的新编码器,例如 FSE。或者,我们可以将符号组合在一起并使用霍夫曼编码。
原型和测量
我使用 Amir Said 的 FastAC 的 32 位实现编写了一个测试,它将模型限制为小数点后 4 位。
(这不是一个真正的问题,因为我们不应该将这些数据直接提供给编解码器。这只是一个演示。)
首先是一些常用代码:
typedef std::vector<uint8_t> match_symbols_t;
typedef std::vector<uint32_t> match_list_t;
typedef std::set<uint32_t> match_set_t;
typedef std::vector<uint8_t> buffer_t;
// ----------------------------------------------------------------------------
static uint32_t const NUM_VALUES(1000000);
// ============================================================================
size_t symbol_count(uint8_t bits)
{
size_t count(NUM_VALUES / bits);
if (NUM_VALUES % bits > 0) {
return count + 1;
}
return count;
}
// ----------------------------------------------------------------------------
void set_symbol(match_symbols_t& symbols, uint8_t bits, uint32_t match, bool state)
{
size_t index(match / bits);
size_t offset(match % bits);
if (state) {
symbols[index] |= 1 << offset;
} else {
symbols[index] &= ~(1 << offset);
}
}
// ----------------------------------------------------------------------------
bool get_symbol(match_symbols_t const& symbols, uint8_t bits, uint32_t match)
{
size_t index(match / bits);
size_t offset(match % bits);
return (symbols[index] & (1 << offset)) != 0;
}
// ----------------------------------------------------------------------------
match_symbols_t make_symbols(match_list_t const& matches, uint8_t bits)
{
assert((bits > 0) && (bits <= 8));
match_symbols_t symbols(symbol_count(bits), 0);
for (auto match : matches) {
set_symbol(symbols, bits, match, true);
}
return symbols;
}
// ----------------------------------------------------------------------------
match_list_t make_matches(match_symbols_t const& symbols, uint8_t bits)
{
match_list_t result;
for (uint32_t i(0); i < 1000000; ++i) {
if (get_symbol(symbols, bits, i)) {
result.push_back(i);
}
}
return result;
}
首先,更简单的变体是写入匹配数,确定匹配/未命中的概率并将其限制在支持的范围内。
然后使用这个静态概率模型简单地对位图的每个值进行编码。
class arithmetic_codec_v1
{
public:
buffer_t compress(match_list_t const& matches)
{
uint32_t match_count(static_cast<uint32_t>(matches.size()));
arithmetic_codec codec(static_cast<uint32_t>(NUM_VALUES / 4));
codec.start_encoder();
// Store the number of matches (1000000 needs only 20 bits)
codec.put_bits(match_count, 20);
if (match_count > 0) {
// Initialize the model
static_bit_model model;
model.set_probability_0(get_probability_0(match_count));
// Create a bitmap and code all the bitmap entries
// NB: This is lazy and inefficient, but simple
match_symbols_t symbols = make_symbols(matches, 1);
for (auto entry : symbols) {
codec.encode(entry, model);
}
}
uint32_t compressed_size = codec.stop_encoder();
return buffer_t(codec.buffer(), codec.buffer() + compressed_size);
}
match_list_t decompress(buffer_t& compressed)
{
arithmetic_codec codec(static_cast<uint32_t>(compressed.size()), &compressed[0]);
codec.start_decoder();
// Read number of matches (20 bits)
uint32_t match_count(codec.get_bits(20));
match_list_t result;
if (match_count > 0) {
static_bit_model model;
model.set_probability_0(get_probability_0(match_count));
result.reserve(match_count);
for (uint32_t i(0); i < NUM_VALUES; ++i) {
uint32_t entry = codec.decode(model);
if (entry == 1) {
result.push_back(i);
}
}
}
codec.stop_decoder();
return result;
}
private:
double get_probability_0(uint32_t match_count, uint32_t num_values = NUM_VALUES)
{
double probability_0(double(num_values - match_count) / num_values);
// Limit probability to match FastAC limitations...
return std::max(0.0001, std::min(0.9999, probability_0));
}
};
第二种方法是根据我们编码的符号来调整模型。
在每个匹配被编码后,减少下一个匹配的概率。
一旦我们编码的所有匹配项,停止。
第二个变体的压缩效果稍微好一些,但性能开销明显。
class arithmetic_codec_v2
{
public:
buffer_t compress(match_list_t const& matches)
{
uint32_t match_count(static_cast<uint32_t>(matches.size()));
uint32_t total_count(NUM_VALUES);
arithmetic_codec codec(static_cast<uint32_t>(NUM_VALUES / 4));
codec.start_encoder();
// Store the number of matches (1000000 needs only 20 bits)
codec.put_bits(match_count, 20);
if (match_count > 0) {
static_bit_model model;
// Create a bitmap and code all the bitmap entries
// NB: This is lazy and inefficient, but simple
match_symbols_t symbols = make_symbols(matches, 1);
for (auto entry : symbols) {
model.set_probability_0(get_probability_0(match_count, total_count));
codec.encode(entry, model);
--total_count;
if (entry) {
--match_count;
}
if (match_count == 0) {
break;
}
}
}
uint32_t compressed_size = codec.stop_encoder();
return buffer_t(codec.buffer(), codec.buffer() + compressed_size);
}
match_list_t decompress(buffer_t& compressed)
{
arithmetic_codec codec(static_cast<uint32_t>(compressed.size()), &compressed[0]);
codec.start_decoder();
// Read number of matches (20 bits)
uint32_t match_count(codec.get_bits(20));
uint32_t total_count(NUM_VALUES);
match_list_t result;
if (match_count > 0) {
static_bit_model model;
result.reserve(match_count);
for (uint32_t i(0); i < NUM_VALUES; ++i) {
model.set_probability_0(get_probability_0(match_count, NUM_VALUES - i));
if (codec.decode(model) == 1) {
result.push_back(i);
--match_count;
}
if (match_count == 0) {
break;
}
}
}
codec.stop_decoder();
return result;
}
private:
double get_probability_0(uint32_t match_count, uint32_t num_values = NUM_VALUES)
{
double probability_0(double(num_values - match_count) / num_values);
// Limit probability to match FastAC limitations...
return std::max(0.0001, std::min(0.9999, probability_0));
}
};
实用方法
实际上,可能不值得设计一种新的压缩格式。
事实上,将结果写成位可能不值得,只需创建一个值为 0 或 1 的字节数组。
然后使用现有的压缩库——zlib 很常见,或者你可以尝试 lz4 或 snappy、bzip2、lzma……选择很多。
ZLib 示例
class zlib_codec
{
public:
zlib_codec(uint32_t bits_per_symbol) : bits_per_symbol(bits_per_symbol) {}
buffer_t compress(match_list_t const& matches)
{
match_symbols_t symbols(make_symbols(matches, bits_per_symbol));
z_stream defstream;
defstream.zalloc = nullptr;
defstream.zfree = nullptr;
defstream.opaque = nullptr;
deflateInit(&defstream, Z_BEST_COMPRESSION);
size_t max_compress_size = deflateBound(&defstream, static_cast<uLong>(symbols.size()));
buffer_t compressed(max_compress_size);
defstream.avail_in = static_cast<uInt>(symbols.size());
defstream.next_in = &symbols[0];
defstream.avail_out = static_cast<uInt>(max_compress_size);
defstream.next_out = &compressed[0];
deflate(&defstream, Z_FINISH);
deflateEnd(&defstream);
compressed.resize(defstream.total_out);
return compressed;
}
match_list_t decompress(buffer_t& compressed)
{
z_stream infstream;
infstream.zalloc = nullptr;
infstream.zfree = nullptr;
infstream.opaque = nullptr;
inflateInit(&infstream);
match_symbols_t symbols(symbol_count(bits_per_symbol));
infstream.avail_in = static_cast<uInt>(compressed.size());
infstream.next_in = &compressed[0];
infstream.avail_out = static_cast<uInt>(symbols.size());
infstream.next_out = &symbols[0];
inflate(&infstream, Z_FINISH);
inflateEnd(&infstream);
return make_matches(symbols, bits_per_symbol);
}
private:
uint32_t bits_per_symbol;
};
BZip2 示例
class bzip2_codec
{
public:
bzip2_codec(uint32_t bits_per_symbol) : bits_per_symbol(bits_per_symbol) {}
buffer_t compress(match_list_t const& matches)
{
match_symbols_t symbols(make_symbols(matches, bits_per_symbol));
uint32_t compressed_size = symbols.size() * 2;
buffer_t compressed(compressed_size);
int err = BZ2_bzBuffToBuffCompress((char*)&compressed[0]
, &compressed_size
, (char*)&symbols[0]
, symbols.size()
, 9
, 0
, 30);
if (err != BZ_OK) {
throw std::runtime_error("Compression error.");
}
compressed.resize(compressed_size);
return compressed;
}
match_list_t decompress(buffer_t& compressed)
{
match_symbols_t symbols(symbol_count(bits_per_symbol));
uint32_t decompressed_size = symbols.size();
int err = BZ2_bzBuffToBuffDecompress((char*)&symbols[0]
, &decompressed_size
, (char*)&compressed[0]
, compressed.size()
, 0
, 0);
if (err != BZ_OK) {
throw std::runtime_error("Compression error.");
}
if (decompressed_size != symbols.size()) {
throw std::runtime_error("Size mismatch.");
}
return make_matches(symbols, bits_per_symbol);
}
private:
uint32_t bits_per_symbol;
};
比较
代码库,包括 64 位 Visual Studio 2015 的依赖项位于 https://github.com/dan-masek/bounded_sorted_list_compression.git