【问题标题】:Processing a large file in .NET在 .NET 中处理大文件
【发布时间】:2013-04-20 10:11:58
【问题描述】:

问题

我需要能够使用 C# 保存和读取一个非常大的数据结构。结构本身相当简单;它是一个非常长的数组,包含一个固定大小的简单结构体。

为了清楚起见,只是一个例子:

struct st {
UInt32 a;
UInt16 b;
//etc.
} completion ports

st[] data = new st[1024*1024*100]

我希望能够尽可能快速高效地保存这些文件并将其加载到文件中。

总体方向

到目前为止,我的想法是将数据切割成段,当然,从概念上讲,将这些段分配给任务,然后将它们异步写入文件。 FileStream.WriteAsync 似乎非常适合这个。

我的问题在于阅读,从 FileStream.ReadAsync API 看来,可以在每个结构的中间切割结果似乎是完全合理的,实际上是在一个基元的中间。当然我可以解决这个问题,但我不确定什么是最好的方法,以及我会在多大程度上干扰操作系统的缓冲机制。

最终,我计划使用MemoryStream.MemoryStream(byte[]) 从每个缓冲区创建一个 MemoryStream,并使用二进制读取器将每个缓冲区读入结构。

问题

那么解决这个问题的最佳方法是什么?我的方向好吗?有没有更好的解决方案? 代码示例和链接将不胜感激...

结论

在进行性能测试后,我发现使用 BinaryReader 读取文件或使用 FileStream.ReadAsync 使用多个读取器可以提供大致相同的性能。

Soo....这个问题毫无意义。

【问题讨论】:

标签: c# .net asynchronous io io-completion-ports


【解决方案1】:

[编辑] 我更新了这篇文章以包含一个完整的可编译示例,并解决@Daniel 在下面的 cmets 中提出的问题。因此,此代码不再使用任何“危险”方法,也没有代码分析警告。 [/编辑]

如果你的结构包含ONLY blittable 类型,有一种方法可以加快速度。

您可以使用封送处理将数据直接读取到数组中,而无需制作额外的副本,如下所示(完整的可编译示例):

using System;
using System.ComponentModel;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using Microsoft.Win32.SafeHandles;

namespace ConsoleApplication1
{
    internal class Program
    {
        struct TestStruct // Mutable for brevity; real structs should be immutable.
        {
            public byte   ByteValue;
            public short  ShortValue;
            public int    IntValue;
            public long   LongValue;
            public float  FloatValue;
            public double DoubleValue;
        }

        static void Main()
        {
            var array = new TestStruct[10];

            for (byte i = 0; i < array.Length; ++i)
            {
                array[i].ByteValue   = i;
                array[i].ShortValue  = i;
                array[i].IntValue    = i;
                array[i].LongValue   = i;
                array[i].FloatValue  = i;
                array[i].DoubleValue = i;
            }

            Directory.CreateDirectory("C:\\TEST");

            using (var output = new FileStream(@"C:\TEST\TEST.BIN", FileMode.Create))
                FastWrite(output, array, 0, array.Length);

            using (var input = new FileStream(@"C:\TEST\TEST.BIN", FileMode.Open))
                array = FastRead<TestStruct>(input, array.Length);

            for (byte i = 0; i < array.Length; ++i)
            {
                Trace.Assert(array[i].ByteValue   == i);
                Trace.Assert(array[i].ShortValue  == i);
                Trace.Assert(array[i].IntValue    == i);
                Trace.Assert(array[i].LongValue   == i);
                Trace.Assert(array[i].FloatValue  == i);
                Trace.Assert(array[i].DoubleValue == i);
            }
        }

        /// <summary>
        /// Writes a part of an array to a file stream as quickly as possible,
        /// without making any additional copies of the data.
        /// </summary>
        /// <typeparam name="T">The type of the array elements.</typeparam>
        /// <param name="fs">The file stream to which to write.</param>
        /// <param name="array">The array containing the data to write.</param>
        /// <param name="offset">The offset of the start of the data in the array to write.</param>
        /// <param name="count">The number of array elements to write.</param>
        /// <exception cref="IOException">Thrown on error. See inner exception for <see cref="Win32Exception"/></exception>

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2004:RemoveCallsToGCKeepAlive")]

        public static void FastWrite<T>(FileStream fs, T[] array, int offset, int count) where T: struct
        {
            int sizeOfT = Marshal.SizeOf(typeof(T));
            GCHandle gcHandle = GCHandle.Alloc(array, GCHandleType.Pinned);

            try
            {
                uint bytesWritten;
                uint bytesToWrite = (uint)(count * sizeOfT);

                if
                (
                    !WriteFile
                    (
                        fs.SafeFileHandle,
                        new IntPtr(gcHandle.AddrOfPinnedObject().ToInt64() + (offset*sizeOfT)),
                        bytesToWrite,
                        out bytesWritten,
                        IntPtr.Zero
                    )
                )
                {
                    throw new IOException("Unable to write file.", new Win32Exception(Marshal.GetLastWin32Error()));
                }

                Debug.Assert(bytesWritten == bytesToWrite);
            }

            finally
            {
                gcHandle.Free();
            }
        }

        /// <summary>
        /// Reads array data from a file stream as quickly as possible,
        /// without making any additional copies of the data.
        /// </summary>
        /// <typeparam name="T">The type of the array elements.</typeparam>
        /// <param name="fs">The file stream from which to read.</param>
        /// <param name="count">The number of elements to read.</param>
        /// <returns>
        /// The array of elements that was read. This may be less than the number that was
        /// requested if the end of the file was reached. It may even be empty.
        /// NOTE: There may still be data left in the file, even if not all the requested
        /// elements were returned - this happens if the number of bytes remaining in the
        /// file is less than the size of the array elements.
        /// </returns>
        /// <exception cref="IOException">Thrown on error. See inner exception for <see cref="Win32Exception"/></exception>

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2004:RemoveCallsToGCKeepAlive")]

        public static T[] FastRead<T>(FileStream fs, int count) where T: struct
        {
            int sizeOfT = Marshal.SizeOf(typeof(T));

            long bytesRemaining  = fs.Length - fs.Position;
            long wantedBytes     = count * sizeOfT;
            long bytesAvailable  = Math.Min(bytesRemaining, wantedBytes);
            long availableValues = bytesAvailable / sizeOfT;
            long bytesToRead     = (availableValues * sizeOfT);

            if ((bytesRemaining < wantedBytes) && ((bytesRemaining - bytesToRead) > 0))
            {
                Debug.WriteLine("Requested data exceeds available data and partial data remains in the file.", "Dmr.Common.IO.Arrays.FastRead(fs,count)");
            }

            T[] result = new T[availableValues];

            if (availableValues == 0)
                return result;

            GCHandle gcHandle = GCHandle.Alloc(result, GCHandleType.Pinned);

            try
            {
                uint bytesRead;

                if
                (
                    !ReadFile
                    (
                        fs.SafeFileHandle,
                        gcHandle.AddrOfPinnedObject(),
                        (uint)bytesToRead,
                        out bytesRead,
                        IntPtr.Zero
                    )
                )
                {
                    throw new IOException("Unable to read file.", new Win32Exception(Marshal.GetLastWin32Error()));
                }

                Debug.Assert(bytesRead == bytesToRead);
            }

            finally
            {
                gcHandle.Free();
            }

            return result;
        }

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Interoperability", "CA1415:DeclarePInvokesCorrectly")]
        [DllImport("kernel32.dll", SetLastError=true)]
        [return: MarshalAs(UnmanagedType.Bool)]

        private static extern bool WriteFile
        (
            SafeFileHandle       hFile,
            IntPtr               lpBuffer,
            uint                 nNumberOfBytesToWrite,
            out uint             lpNumberOfBytesWritten,
            IntPtr               lpOverlapped
        );

        /// <summary>See the Windows API documentation for details.</summary>

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Interoperability", "CA1415:DeclarePInvokesCorrectly")]
        [DllImport("kernel32.dll", SetLastError=true)]
        [return: MarshalAs(UnmanagedType.Bool)]

        private static extern bool ReadFile
        (
            SafeFileHandle       hFile,
            IntPtr               lpBuffer,
            uint                 nNumberOfBytesToRead,
            out uint             lpNumberOfBytesRead,
            IntPtr               lpOverlapped
        );
    }
}

然后您可以创建一个BlockingCollection 来存储传入的数据并使用一个线程来填充它并使用一个单独的线程来使用它。

将数据读入队列的线程可能如下所示:

public void ReadIntoQueue<T>(FileStream fs, BlockingCollection<T[]> queue, int blockSize) where T: struct
{
    while (true)
    {
        var data = FastRead<T>(fs, blockSize);

        if (data.Length == 0)
        {
            queue.CompleteAdding();
            break;
        }

        queue.Add(data);
    }
}

消费线程会像这样从队列中删除东西:

public void ProcessDataFromQueue<T>(BlockingCollection<T[]> queue) where T : struct
{
    foreach (var array in queue.GetConsumingEnumerable())
    {
        // Do something with 'array'
    }
}

【讨论】:

  • 使用不安全的代码直接填充或读取数据不是更快更简单吗?
  • @AK_ 如果您被允许使用不安全代码,那么可能是这样,但是这种方法可以避免不安全代码在项目中传播。一旦一个程序集使用它,您就必须在所有直接或间接使用它的项目上启用不安全代码 - 这对于某些站点(例如我们的站点)是不可接受的。做起来也可能没有你想象的那么容易!
  • @MatthewWatson 同意最后一点;我花了 3 次才把我的例子(下)弄对了,而且我对使用原始内存时的序列化非常熟悉......
  • @AK_ 这并不比意外指定超出范围的数组索引或尝试读取文件流末尾之外的内容更危险。使用我发布的代码不可能破坏内存或任何东西。编组器会抛出异常。
  • @AK_ 一如既往,这是一个权衡。你的问题是“尽可能快速和高效” - 我会回答 为此 unsafe / memcpy 方法可能(你需要衡量)将是最好的你可以做。但是,如果您真的讨厌unsafe 的想法,那么您就不能这样做,在这种情况下,使用编组器是一个很好的折衷方案。无论哪种情况,坦率地说,我认为为此使用并发任务没有任何好处,而在这里我是作为一个执行 lot IO、lot 序列化的人发言,以及大量并发。
【解决方案2】:

您最大的瓶颈将是 IO,它必须以对文件的独占访问权限执行。实际的字节处理速度会很快 - 您可以直接将其写入文件(注意FileStream 本身有一个缓冲区,或者您可以使用BufferedStream 添加一个额外的层)而不是您可以通过序列化内存中的不同部分,然后将每个内存部分分别复制到流中。

我的建议:只需在单个线程中写入数据。坦率地说,我不确定我什至会打扰async(提示:异步代码增加开销),尤其是在缓冲区跟上的情况下。我也不会使用BiaryWriter / BinaryReader - 我只是直接写。您可以做的一个棘手的事情是使用一些unsafe 代码来复制块中的数据,以避免甚至不得不查看单个对象,但这是更难的事情......我会尝试做一个例子.

这是一个读/写的例子,首先注意性能:

Write: 2012ms
Read: 1089ms
File: 838,860,804 bytes

代码:

[DllImport("msvcrt.dll", EntryPoint = "memcpy", CallingConvention = CallingConvention.Cdecl, SetLastError = false)]
public static extern IntPtr memcpy(IntPtr dest, IntPtr src, UIntPtr count);

unsafe static st[] Read(string path)
{
    using (var file = File.OpenRead(path))
    {
        int size = sizeof(st);
        const int BLOCK_SIZE = 512; // process at a time
        byte[] buffer = new byte[BLOCK_SIZE * size];

        UIntPtr bufferLen = new UIntPtr((uint)buffer.Length);
        fixed (byte* bufferPtr = buffer)
        {
            Fill(file, buffer, 0, 4);
            int len = ((int*)bufferPtr)[0];

            st[] result = new st[len];
            fixed (st* dataPtr = result)
            {
                st* rawPtr = dataPtr;
                IntPtr source= new IntPtr(bufferPtr);
                while (len >= BLOCK_SIZE)
                {
                    Fill(file, buffer, 0, buffer.Length);
                    memcpy(new IntPtr(rawPtr), source, bufferLen);
                    len -= BLOCK_SIZE;
                    rawPtr += BLOCK_SIZE;
                }
                if (len > 0)
                {
                    Fill(file, buffer, 0, len * size);
                    memcpy(new IntPtr(rawPtr), source, new UIntPtr((uint)(len * size)));
                }
            }
            return result;
        }
    }


}
static void Fill(Stream source, byte[] buffer, int offset, int count)
{
    int read;
    while (count > 0 && (read = source.Read(buffer, offset, count)) > 0)
    {
        offset += read;
        count -= read;
    }
    if (count > 0) throw new EndOfStreamException();
}

unsafe static void Write(st[] data, string path)
{
    using (var file = File.Create(path))
    {
        int size = sizeof(st);
        const int BLOCK_SIZE = 512; // process at a time
        byte[] buffer = new byte[BLOCK_SIZE * size];

        int len = data.Length;
        UIntPtr bufferLen = new UIntPtr((uint)buffer.Length);
        fixed (st* dataPtr = data)
        fixed (byte* bufferPtr = buffer)
        {
            // write the number of elements
            ((int*)bufferPtr)[0] = data.Length;
            file.Write(buffer, 0, 4);

            st* rawPtr = dataPtr;
            IntPtr destination = new IntPtr(bufferPtr);
            // write complete blocks of BLOCK_SIZE
            while (len >= BLOCK_SIZE)
            {
                memcpy(destination, new IntPtr(rawPtr), bufferLen);
                len -= BLOCK_SIZE;
                rawPtr += BLOCK_SIZE;
                file.Write(buffer, 0, buffer.Length);
            }
            if (len > 0)
            {   // write an incomplete block, if necessary
                memcpy(destination, new IntPtr(rawPtr), new UIntPtr((uint)(len * size)));
                file.Write(buffer, 0, len * size);
            }
        }
    }
}

【讨论】:

  • 我认为在单线程中编写也可以,我更关心阅读......为什么我需要锁定任何东西? FileStream.SomethingAsync 不使用 Completion 端口吗?
  • @AK_ 一个文件只能在一个位置;您需要知道您正在读取/写入文件的连续部分;添加了完整的读/写实现,顺便说一句
  • @AK_ 哎呀,有一些错误...现在希望已修复
  • @MarcGravell 实际上我不需要知道我正在读取(或写入)文件的连续部分。我只需要知道我读取的缓冲区的位置,以及我的数据中的相应位置...
  • @AK_ 再说一遍,你错过了我的关键点:不管你怎么做,你的主要开销是 IO。在上面,我字面意思只是在做 IO 和 memcpy - 后者非常快。将其分割/分割成并发任务并没有任何好处无论如何,因为没有工作要做。通过添加并发可以实现的只是添加: 查找时间;需要多个缓冲区;在读/写时需要独占访问(是的,你需要这个)......所有开销。
【解决方案3】:

据我所知,读取或写入文件的最快方法是单个只进进程。否则除了强制读/写之外,磁盘还必须在文件中来回移动。

当然这并不意味着您不能处理多个并发线程中的数据。

如果段足够大,磁盘移动的开销可能不会引起注意。

【讨论】:

  • "做对了",每个段的处理都只是一个原始的 memcpy ......没什么可做的,所以不需要同时做
猜你喜欢
  • 1970-01-01
  • 2012-05-10
  • 2010-12-08
  • 2014-06-23
  • 2012-11-01
  • 1970-01-01
  • 2016-12-13
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多