【问题标题】:How to copy a file in Python with a progress bar?如何在 Python 中复制带有进度条的文件?
【发布时间】:2008-11-08 07:21:53
【问题描述】:

当使用shutil.copy() 复制大文件时,您看不到操作进展如何..

我把一些有用的东西放在一起——它使用一个简单的 ProgressBar 类(它简单地返回一个简单的 ASCII 进度条,作为一个字符串)和一个 open().read().write() 的循环来进行实际的复制。它使用sys.stdout.write("\r%s\r" % (the_progress_bar)) 显示进度条,这有点骇人听闻,但它可以工作。

你可以看到代码(在上下文中)on github here

是否有任何内置模块可以做得更好?这段代码有什么可以改进的吗?

【问题讨论】:

  • 将电视剧集文件复制到特定路径([固定路径]/$showname/$season_number]/),文件大小一般为300-700MB

标签: python file-io


【解决方案1】:

两件事:

  • 我会将默认块大小设置为比 512 大 很多。我将从 16384 开始,也许更多。
  • 为了模块化,最好让copy_with_prog 函数本身不输出进度条,而是调用回调函数,以便调用者决定如何显示进度。

大概是这样的:

def copy_with_prog(src, dest, callback = None):
    while True:
        # copy loop stuff
        if callback:
            callback(pos, total)

prog = ProgressBar(...)
copy_with_prog(src, dest, lambda pos, total: prog.update(pos, total))

【讨论】:

    【解决方案2】:

    矫枉过正?也许。但在几乎任何系统上,Linux、Mac 和 Windows 上的快速 wxWidgets 安装,您都可以获得真正的交易,在 gui 设置中带有暂停和取消按钮。如今,Mac 附带 wxWidgets,它是 Linux 上的常用软件包。

    单个文件非常快(它会立即完成并看起来很损坏),因此您可以考虑创建一个 fileSet 作业,该作业在每个文件而不是每个块上运行一次。享受吧!

    -吉姆·卡罗尔

    """
    Threaded Jobs.
    
    Any class that does a long running process can inherit
    from ThreadedJob.  This enables running as a background
    thread, progress notification, pause and cancel.  The
    time remaining is also calculated by the ThreadedJob class.
    """
    import wx.lib.newevent
    import thread
    import exceptions
    import time
    
    (RunEvent, EVT_RUN) = wx.lib.newevent.NewEvent()
    (CancelEvent, EVT_CANCEL) = wx.lib.newevent.NewEvent()
    (DoneEvent, EVT_DONE) = wx.lib.newevent.NewEvent()
    (ProgressStartEvent, EVT_PROGRESS_START) = wx.lib.newevent.NewEvent()
    (ProgressEvent, EVT_PROGRESS) = wx.lib.newevent.NewEvent()
    
    class InterruptedException(exceptions.Exception):
        def __init__(self, args = None):
            self.args = args
        #
    #
    
    class ThreadedJob:
        def __init__(self):
            # tell them ten seconds at first
            self.secondsRemaining = 10.0
            self.lastTick = 0
    
            # not running yet
            self.isPaused = False
            self.isRunning = False
            self.keepGoing = True
    
        def Start(self):
            self.keepGoing = self.isRunning = True
            thread.start_new_thread(self.Run, ())
    
            self.isPaused = False
        #
    
        def Stop(self):
            self.keepGoing = False
        #
    
        def WaitUntilStopped(self):
            while self.isRunning:
                time.sleep(0.1)
                wx.SafeYield()
            #
        #
    
        def IsRunning(self):
            return self.isRunning
        #
    
        def Run(self):
            # this is overridden by the
            # concrete ThreadedJob
            print "Run was not overloaded"
            self.JobFinished()
    
            pass
        #
    
        def Pause(self):
            self.isPaused = True
            pass
        #
    
        def Continue(self):
            self.isPaused = False
            pass
        #
    
        def PossibleStoppingPoint(self):
            if not self.keepGoing:
                raise InterruptedException("process interrupted.")
            wx.SafeYield()
    
            # allow cancel while paused
            while self.isPaused:
                if not self.keepGoing:
                    raise InterruptedException("process interrupted.")
    
                # don't hog the CPU
                time.sleep(0.1)
            #
        #
    
        def SetProgressMessageWindow(self, win):
            self.win = win
        #
    
        def JobBeginning(self, totalTicks):
    
            self.lastIterationTime = time.time()
            self.totalTicks = totalTicks
    
            if hasattr(self, "win") and self.win:
                wx.PostEvent(self.win, ProgressStartEvent(total=totalTicks))
            #
        #
    
        def JobProgress(self, currentTick):
            dt = time.time() - self.lastIterationTime
            self.lastIterationTime = time.time()
            dtick = currentTick - self.lastTick
            self.lastTick = currentTick
    
            alpha = 0.92
            if currentTick > 1:
                self.secondsPerTick = dt * (1.0 - alpha) + (self.secondsPerTick * alpha)
            else:
                self.secondsPerTick = dt
            #
    
            if dtick > 0:
                self.secondsPerTick /= dtick
    
            self.secondsRemaining = self.secondsPerTick * (self.totalTicks - 1 - currentTick) + 1
    
            if hasattr(self, "win") and self.win:
                wx.PostEvent(self.win, ProgressEvent(count=currentTick))
            #
        #
    
        def SecondsRemaining(self):
            return self.secondsRemaining
        #
    
        def TimeRemaining(self):
    
            if 1: #self.secondsRemaining > 3:
                minutes = self.secondsRemaining // 60
                seconds = int(self.secondsRemaining % 60.0)
                return "%i:%02i" % (minutes, seconds)
            else:
                return "a few"
        #
    
        def JobFinished(self):
            if hasattr(self, "win") and self.win:
                wx.PostEvent(self.win, DoneEvent())
            #
    
            # flag we're done before we post the all done message
            self.isRunning = False
        #
    #
    
    class EggTimerJob(ThreadedJob):
        """ A sample Job that demonstrates the mechanisms and features of the Threaded Job"""
        def __init__(self, duration):
            self.duration = duration
            ThreadedJob.__init__(self)
        #
    
        def Run(self):
            """ This can either be run directly for synchronous use of the job,
            or started as a thread when ThreadedJob.Start() is called.
    
            It is responsible for calling JobBeginning, JobProgress, and JobFinished.
            And as often as possible, calling PossibleStoppingPoint() which will 
            sleep if the user pauses, and raise an exception if the user cancels.
            """
            self.time0 = time.clock()
            self.JobBeginning(self.duration)
    
            try:
                for count in range(0, self.duration):
                    time.sleep(1.0)
                    self.JobProgress(count)
                    self.PossibleStoppingPoint()
                #
            except InterruptedException:
                # clean up if user stops the Job early
                print "canceled prematurely!"
            #
    
            # always signal the end of the job
            self.JobFinished()
            #
        #
    
        def __str__(self):
            """ The job progress dialog expects the job to describe its current state."""
            response = []
            if self.isPaused:
                response.append("Paused Counting")
            elif not self.isRunning:
                response.append("Will Count the seconds")
            else:
                response.append("Counting")
            #
            return " ".join(response)
        #
    #
    
    class FileCopyJob(ThreadedJob):
        """ A common file copy Job. """
    
        def __init__(self, orig_filename, copy_filename, block_size=32*1024):
    
            self.src = orig_filename
            self.dest = copy_filename
            self.block_size = block_size
            ThreadedJob.__init__(self)
        #
    
        def Run(self):
            """ This can either be run directly for synchronous use of the job,
            or started as a thread when ThreadedJob.Start() is called.
    
            It is responsible for calling JobBeginning, JobProgress, and JobFinished.
            And as often as possible, calling PossibleStoppingPoint() which will 
            sleep if the user pauses, and raise an exception if the user cancels.
            """
            self.time0 = time.clock()
    
            try:
                source = open(self.src, 'rb')
    
                # how many blocks?
                import os
                (st_mode, st_ino, st_dev, st_nlink, st_uid, st_gid, st_size, st_atime, st_mtime, st_ctime) = os.stat(self.src)
                num_blocks = st_size / self.block_size
                current_block = 0
    
                self.JobBeginning(num_blocks)
    
                dest = open(self.dest, 'wb')
    
                while 1:
                    copy_buffer = source.read(self.block_size)
                    if copy_buffer:
                        dest.write(copy_buffer)
                        current_block += 1
                        self.JobProgress(current_block)
                        self.PossibleStoppingPoint()
                    else:
                        break
    
                source.close()
                dest.close()
    
            except InterruptedException:
                # clean up if user stops the Job early
                dest.close()
                # unlink / delete the file that is partially copied
                os.unlink(self.dest)
                print "canceled, dest deleted!"
            #
    
            # always signal the end of the job
            self.JobFinished()
            #
        #
    
        def __str__(self):
            """ The job progress dialog expects the job to describe its current state."""
            response = []
            if self.isPaused:
                response.append("Paused Copy")
            elif not self.isRunning:
                response.append("Will Copy a file")
            else:
                response.append("Copying")
            #
            return " ".join(response)
        #
    #
    
    class JobProgress(wx.Dialog):
        """ This dialog shows the progress of any ThreadedJob.
    
        It can be shown Modally if the main application needs to suspend
        operation, or it can be shown Modelessly for background progress
        reporting.
    
        app = wx.PySimpleApp()
        job = EggTimerJob(duration = 10)
        dlg = JobProgress(None, job)
        job.SetProgressMessageWindow(dlg)
        job.Start()
        dlg.ShowModal()
    
    
        """
        def __init__(self, parent, job):
            self.job = job
    
            wx.Dialog.__init__(self, parent, -1, "Progress", size=(350,200))
    
            # vertical box sizer
            sizeAll = wx.BoxSizer(wx.VERTICAL)
    
            # Job status text
            self.JobStatusText = wx.StaticText(self, -1, "Starting...")
            sizeAll.Add(self.JobStatusText, 0, wx.EXPAND|wx.ALL, 8)
    
            # wxGague
            self.ProgressBar = wx.Gauge(self, -1, 10, wx.DefaultPosition, (250, 15))
            sizeAll.Add(self.ProgressBar, 0, wx.EXPAND|wx.ALL, 8)
    
            # horiz box sizer, and spacer to right-justify
            sizeRemaining = wx.BoxSizer(wx.HORIZONTAL)
            sizeRemaining.Add((2,2), 1, wx.EXPAND)
    
            # time remaining read-only edit
            # putting wide default text gets a reasonable initial layout.
            self.remainingText = wx.StaticText(self, -1, "???:??")
            sizeRemaining.Add(self.remainingText, 0, wx.LEFT|wx.RIGHT|wx.ALIGN_CENTER_VERTICAL, 8)
    
            # static text: remaining
            self.remainingLabel = wx.StaticText(self, -1, "remaining")
            sizeRemaining.Add(self.remainingLabel, 0, wx.ALL|wx.ALIGN_CENTER_VERTICAL, 8)
    
            # add that row to the mix
            sizeAll.Add(sizeRemaining, 1, wx.EXPAND)
    
            # horiz box sizer & spacer
            sizeButtons = wx.BoxSizer(wx.HORIZONTAL)
            sizeButtons.Add((2,2), 1, wx.EXPAND|wx.ADJUST_MINSIZE)
    
            # Pause Button
            self.PauseButton = wx.Button(self, -1, "Pause")
            sizeButtons.Add(self.PauseButton, 0, wx.ALL, 4)
            self.Bind(wx.EVT_BUTTON, self.OnPauseButton, self.PauseButton)
    
            # Cancel button
            self.CancelButton = wx.Button(self, wx.ID_CANCEL, "Cancel")
            sizeButtons.Add(self.CancelButton, 0, wx.ALL, 4)
            self.Bind(wx.EVT_BUTTON, self.OnCancel, self.CancelButton)
    
            # Add all the buttons on the bottom row to the dialog
            sizeAll.Add(sizeButtons, 0, wx.EXPAND|wx.ALL, 4)
    
            self.SetSizer(sizeAll)
            #sizeAll.Fit(self)
            sizeAll.SetSizeHints(self)
    
            # jobs tell us how they are doing
            self.Bind(EVT_PROGRESS_START, self.OnProgressStart)
            self.Bind(EVT_PROGRESS, self.OnProgress)
            self.Bind(EVT_DONE, self.OnDone)
    
            self.Layout()
        #
    
        def OnPauseButton(self, event):
            if self.job.isPaused:
                self.job.Continue()
                self.PauseButton.SetLabel("Pause")
                self.Layout()
            else:
                self.job.Pause()
                self.PauseButton.SetLabel("Resume")
                self.Layout()
            #
        #
    
        def OnCancel(self, event):
            self.job.Stop()
        #
    
        def OnProgressStart(self, event):
            self.ProgressBar.SetRange(event.total)
            self.statusUpdateTime = time.clock()
        #
    
        def OnProgress(self, event):
            # update the progress bar
            self.ProgressBar.SetValue(event.count)
    
            self.remainingText.SetLabel(self.job.TimeRemaining())
    
            # update the text a max of 20 times a second
            if time.clock() - self.statusUpdateTime > 0.05:
                self.JobStatusText.SetLabel(str(self.job))
                self.statusUpdateTime = time.clock()
                self.Layout()
            #
        #
    
        # when a job is done
        def OnDone(self, event):
            self.ProgressBar.SetValue(0)
            self.JobStatusText.SetLabel("Finished")
            self.Destroy()
        #
    #
    
    if __name__ == "__main__":
        app = wx.PySimpleApp()
        #job = EggTimerJob(duration = 10)
        job = FileCopyJob("VeryBigFile.mp4", "/tmp/test_junk.mp4", 1024*1024*10)
        dlg = JobProgress(None, job)
        job.SetProgressMessageWindow(dlg)
        job.Start()
        dlg.ShowModal()
    #
    

    【讨论】:

    • 嘿...你忘记了在一段时间套件末尾的#!嗯……它们是干什么用的?
    • 那只是因为我真的很喜欢关闭我的块...它让我的文本编辑器粘贴一行并自动正确地重新缩进。否则粘贴可能会缩进太多。
    【解决方案3】:

    我有这个带有进度条的 shutil.copy(),只需使用内置模块就可以简单地制作。 如果您使用的是 utf-8 编码,您可以获得类似于 gif 图像中第二个示例的进度: Progress bars examples for this:

    阅读 cmets 以更改样式和颜色。第一个和最后一个示例不需要 utf-8。 你可以使用命令 CPprogress(SOURCE, DESTINATION) 就在你有 shutil.copy(src, dst) 的地方:

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    '''
    CPprogress(SOURCE, DESTINATION)
    
    I made this to give shutil.copy() [or shutil.copy2() in this case] a progress bar.
    
    You can use CPprogress(SOURCE, DESTINATION) just like shutil.copy(src, dst). SOURCE must be a file path and DESTINATION a file or folder path.
    
    It will give you a progress bar for each file copied. Just copy this code above the place where you want to use CPprogress(SOURCE, DESTINATION) in your code.
    
    You can easily change the look of the progress bar:
        - To keep the style and just change the colors, replace the colors values of progressCOLOR and finalCOLOR (orange code at the end of the lines).
        - The use a solid block progress bar, # -*- coding: utf-8 -*- is required. Otherwise, you will get an encoding error. Some basic terminals, like xterm, may not show the progress bar because of the utf-8 characters.
          To use this style, remove the comments #STYLE# in lines ###COLORS### - BlueCOLOR and endBLOCK.
          In def getPERCECENTprogress() remove the comments  #STYLE# AND COMMENT THE PREVIOUS line. Do the same in def CPprogress()
          If you don't want the utf-8 encoding, delete the four lines beginning with #STYLE#.
    
    NOTE: If you want to copy lots of small files, the copy process for file is so fast 
          that all you will see is a lot of lines scrolling in you terminal window - not enough time for a 'progress'.
          In that case, I use an overall progress that shows only one progress bar to the complete job.   nzX
    '''
    import os
    import shutil
    import sys
    import threading
    import time
    
    ######## COLORS ######
    progressCOLOR = '\033[38;5;33;48;5;236m' #\033[38;5;33;48;5;236m# copy inside '' for colored progressbar| orange:#\033[38;5;208;48;5;235m
    finalCOLOR =  '\033[38;5;33;48;5;33m' #\033[38;5;33;48;5;33m# copy inside '' for colored progressbar| orange:#\033[38;5;208;48;5;208m
    #STYLE#BlueCOLOR = '\033[38;5;33m'#\033[38;5;33m# copy inside '' for colored progressbar Orange#'\033[38;5;208m'# # BG progress# #STYLE# 
    #STYLE#endBLOCK = '' # ▌ copy OR '' for none # BG progress# #STYLE# requires utf8 coding header
    ########
    
    BOLD    = '\033[1m'
    UNDERLINE = '\033[4m'
    CEND    = '\033[0m'
    
    def getPERCECENTprogress(source_path, destination_path):
        time.sleep(.24)
        if os.path.exists(destination_path):
            while os.path.getsize(source_path) != os.path.getsize(destination_path):
                sys.stdout.write('\r')
                percentagem = int((float(os.path.getsize(destination_path))/float(os.path.getsize(source_path))) * 100)
                steps = int(percentagem/5)
                copiado = int(os.path.getsize(destination_path)/1000000)# Should be 1024000 but this get's equal to Thunar file manager report (Linux - Xfce)
                sizzz = int(os.path.getsize(source_path)/1000000)
                sys.stdout.write(("         {:d} / {:d} Mb   ".format(copiado, sizzz)) +  (BOLD + progressCOLOR + "{:20s}".format('|'*steps) + CEND) + ("   {:d}% ".format(percentagem))) # BG progress
                #STYLE#sys.stdout.write(("         {:d} / {:d} Mb   ".format(copiado, sizzz)) +  (BOLD + BlueCOLOR + "▐" + "{:s}".format('█'*steps) + CEND) + ("{:s}".format(' '*(20-steps))+ BOLD + BlueCOLOR + endBLOCK+ CEND) +("   {:d}% ".format(percentagem))) #STYLE# # BG progress# closer to GUI but less compatible (no block bar with xterm) # requires utf8 coding header
                sys.stdout.flush()
                time.sleep(.01)
    
    def CPprogress(SOURCE, DESTINATION):
        if os.path.isdir(DESTINATION):
            dst_file = os.path.join(DESTINATION, os.path.basename(SOURCE))
        else: dst_file = DESTINATION
        print " "
        print (BOLD + UNDERLINE + "FROM:" + CEND + "   "), SOURCE
        print (BOLD + UNDERLINE + "TO:" + CEND + "     "), dst_file
        print " "
        threading.Thread(name='progresso', target=getPERCECENTprogress, args=(SOURCE, dst_file)).start()
        shutil.copy2(SOURCE, DESTINATION)
        time.sleep(.02)
        sys.stdout.write('\r')
        sys.stdout.write(("         {:d} / {:d} Mb   ".format((int(os.path.getsize(dst_file)/1000000)), (int(os.path.getsize(SOURCE)/1000000)))) +  (BOLD + finalCOLOR + "{:20s}".format('|'*20) + CEND) + ("   {:d}% ".format(100))) # BG progress 100%
        #STYLE#sys.stdout.write(("         {:d} / {:d} Mb   ".format((int(os.path.getsize(dst_file)/1000000)), (int(os.path.getsize(SOURCE)/1000000)))) +  (BOLD + BlueCOLOR + "▐" + "{:s}{:s}".format(('█'*20), endBLOCK) + CEND) + ("   {:d}% ".format(100))) #STYLE# # BG progress 100%# closer to GUI but less compatible (no block bar with xterm) # requires utf8 coding header
        sys.stdout.flush()
        print " "
        print " "
    
    '''
    #Ex. Copy all files from root of the source dir to destination dir
    
    folderA = '/path/to/SOURCE' # SOURCE
    folderB = '/path/to/DESTINATION' # DESTINATION
    for FILE in os.listdir(folderA):
        if not os.path.isdir(os.path.join(folderA, FILE)):
            if os.path.exists(os.path.join(folderB, FILE)): continue # as we are using shutil.copy2() that overwrites destination, this skips existing files
            CPprogress(os.path.join(folderA, FILE), folderB) # use the command as if it was shutil.copy2() but with progress
    
    
             75 / 150 Mb  ||||||||||         |  50%
    '''
    

    【讨论】:

      【解决方案4】:

      如果您想使用带有进度的 Windows 复制对话框,您可以使用这些:

      【讨论】:

        【解决方案5】:

        如果你想要一个整体的进步,你可以使用这样的东西(为另一个脚本制作)。请注意,在这种情况下,调用进度条的“threading.Thread”被放置在“for”循环之外。此外,需要以不同的方式采取措施。这是上一个答案中 gif 图像的第三个示例(非 utf-8)。它添加了一个文件“ToGo”计数:

        #!/usr/bin/env python
        # -*- coding: utf-8 -*-
        
        '''
        Ex.
        CopyProgress('/path/to/SOURCE', '/path/to/DESTINATION')
        
        
        I think this 'copy with overall progress' is very 'plastic' and can be easily adapted.
        By default, it will RECURSIVELY copy the CONTENT of  'path/to/SOURCE' to 'path/to/DESTINATION/' keeping the directory tree.
        
        Paying attention to comments, there are 4 main options that can be immediately change:
        
        1 - The LOOK of the progress bar: see COLORS and the PAIR of STYLE lines in 'def getPERCECENTprogress'(inside and after the 'while' loop);
        
        2 - The DESTINATION path: to get 'path/to/DESTINATION/SOURCE_NAME' as target, comment the 2nd 'DST =' definition on the top of the 'def CopyProgress(SOURCE, DESTINATION)' function;
        
        3 - If you don't want to RECURSIVELY copy from sub-directories but just the files in the root source directory to the root of destination, you can use os.listdir() instead of os.walk(). Read the comments inside 'def CopyProgress(SOURCE, DESTINATION)' function to disable RECURSION. Be aware that the RECURSION changes(4x2) must be made in both os.walk() loops;
        
        4 - Handling destination files: if you use this in a situation where the destination filename may already exist, by default, the file is skipped and the loop will jump to the next and so on. On the other way shutil.copy2(), by default, overwrites destination file if exists. Alternatively, you can handle files that exist by overwriting or renaming (according to current date and time). To do that read the comments after 'if os.path.exists(dstFILE): continue' both in the count bytes loop and the main loop. Be aware that the changes must match in both loops (as described in comments) or the progress function will not work properly.
        
        '''
        
        import os
        import shutil
        import sys
        import threading
        import time
        
        progressCOLOR = '\033[38;5;33;48;5;236m' #BLUEgreyBG
        finalCOLOR =  '\033[48;5;33m' #BLUEBG
        # check the color codes below and paste above
        
        ###### COLORS #######
        # WHITEblueBG = '\033[38;5;15;48;5;33m'
        # BLUE = '\033[38;5;33m'
        # BLUEBG  = '\033[48;5;33m'
        # ORANGEBG = '\033[48;5;208m'
        # BLUEgreyBG = '\033[38;5;33;48;5;236m'
        # ORANGEgreyBG = '\033[38;5;208;48;5;236m' # = '\033[38;5;FOREGROUND;48;5;BACKGROUNDm' # ver 'https://i.stack.imgur.com/KTSQa.png' para 256 color codes
        # INVERT = '\033[7m'
        ###### COLORS #######
        
        BOLD    = '\033[1m'
        UNDERLINE = '\033[4m'
        CEND    = '\033[0m'
        
        FilesLeft = 0
        
        def FullFolderSize(path):
            TotalSize = 0
            if os.path.exists(path):# to be safely used # if FALSE returns 0
                for root, dirs, files in os.walk(path):
                    for file in files:
                        TotalSize += os.path.getsize(os.path.join(root, file))
            return TotalSize
        
        def getPERCECENTprogress(source_path, destination_path, bytes_to_copy):
            dstINIsize = FullFolderSize(destination_path)
            time.sleep(.25)
            print " "
            print (BOLD + UNDERLINE + "FROM:" + CEND + "   "), source_path
            print (BOLD + UNDERLINE + "TO:" + CEND + "     "), destination_path
            print " "
            if os.path.exists(destination_path):
                while bytes_to_copy != (FullFolderSize(destination_path)-dstINIsize):
                    sys.stdout.write('\r')
                    percentagem = int((float((FullFolderSize(destination_path)-dstINIsize))/float(bytes_to_copy)) * 100)
                    steps = int(percentagem/5)
                    copiado = '{:,}'.format(int((FullFolderSize(destination_path)-dstINIsize)/1000000))# Should be 1024000 but this get's closer to the file manager report
                    sizzz = '{:,}'.format(int(bytes_to_copy/1000000))
                    sys.stdout.write(("         {:s} / {:s} Mb  ".format(copiado, sizzz)) +  (BOLD + progressCOLOR + "{:20s}".format('|'*steps) + CEND) + ("  {:d}% ".format(percentagem)) + ("  {:d} ToGo ".format(FilesLeft))) #  STYLE 1 progress default # 
                    #BOLD# sys.stdout.write(BOLD + ("        {:s} / {:s} Mb  ".format(copiado, sizzz)) +  (progressCOLOR + "{:20s}".format('|'*steps) + CEND) + BOLD + ("  {:d}% ".format(percentagem)) + ("  {:d} ToGo ".format(FilesLeft))+ CEND) # STYLE 2 progress BOLD # 
                    #classic B/W# sys.stdout.write(BOLD + ("        {:s} / {:s} Mb  ".format(copiado, sizzz)) +  ("|{:20s}|".format('|'*steps)) + ("  {:d}% ".format(percentagem)) + ("  {:d} ToGo ".format(FilesLeft))+ CEND) # STYLE 3 progress classic B/W #
                    sys.stdout.flush()
                    time.sleep(.01)
                sys.stdout.write('\r')
                time.sleep(.05)
                sys.stdout.write(("         {:s} / {:s} Mb  ".format('{:,}'.format(int((FullFolderSize(destination_path)-dstINIsize)/1000000)), '{:,}'.format(int(bytes_to_copy/1000000)))) +  (BOLD + finalCOLOR + "{:20s}".format(' '*20) + CEND) + ("  {:d}% ".format( 100)) + ("  {:s}      ".format('    ')) + "\n") #  STYLE 1 progress default # 
                #BOLD# sys.stdout.write(BOLD + ("        {:s} / {:s} Mb  ".format('{:,}'.format(int((FullFolderSize(destination_path)-dstINIsize)/1000000)), '{:,}'.format(int(bytes_to_copy/1000000)))) +  (finalCOLOR + "{:20s}".format(' '*20) + CEND) + BOLD + ("  {:d}% ".format( 100)) + ("  {:s}      ".format('    ')) + "\n" + CEND ) # STYLE 2 progress BOLD # 
                #classic B/W# sys.stdout.write(BOLD + ("        {:s} / {:s} Mb  ".format('{:,}'.format(int((FullFolderSize(destination_path)-dstINIsize)/1000000)), '{:,}'.format(int(bytes_to_copy/1000000)))) +  ("|{:20s}|".format('|'*20)) + ("  {:d}% ".format( 100)) + ("  {:s}      ".format('    ')) + "\n" + CEND ) # STYLE 3 progress classic B/W # 
                sys.stdout.flush()
                print " "
                print " "
        
        def CopyProgress(SOURCE, DESTINATION):
            global FilesLeft
            DST = os.path.join(DESTINATION, os.path.basename(SOURCE))
            # <- the previous will copy the Source folder inside of the Destination folder. Result Target: path/to/Destination/SOURCE_NAME
            # -> UNCOMMENT the next (# DST = DESTINATION) to copy the CONTENT of Source to the Destination. Result Target: path/to/Destination
            DST = DESTINATION # UNCOMMENT this to specify the Destination as the target itself and not the root folder of the target 
            #
            if DST.startswith(SOURCE):
                print " "
                print BOLD + UNDERLINE + 'Source folder can\'t be changed.' + CEND
                print 'Please check your target path...'
                print " "
                print BOLD + '        CANCELED' + CEND
                print " "
                exit()
            #count bytes to copy
            Bytes2copy = 0
            for root, dirs, files in os.walk(SOURCE): # USE for filename in os.listdir(SOURCE): # if you don't want RECURSION #
                dstDIR = root.replace(SOURCE, DST, 1) # USE dstDIR = DST # if you don't want RECURSION #
                for filename in files:                # USE if not os.path.isdir(os.path.join(SOURCE, filename)): # if you don't want RECURSION #
                    dstFILE = os.path.join(dstDIR, filename)
                    if os.path.exists(dstFILE): continue # must match the main loop (after "threading.Thread")
                    #                                      To overwrite delete dstFILE first here so the progress works properly: ex. change continue to os.unlink(dstFILE)
                    #                                      To rename new files adding date and time, instead of deleating and overwriting, 
                    #                                      comment 'if os.path.exists(dstFILE): continue'
                    Bytes2copy += os.path.getsize(os.path.join(root, filename)) # USE os.path.getsize(os.path.join(SOURCE, filename)) # if you don't want RECURSION #
                    FilesLeft += 1
            # <- count bytes to copy
            #
            # Treading to call the preogress
            threading.Thread(name='progresso', target=getPERCECENTprogress, args=(SOURCE, DST, Bytes2copy)).start()
            # main loop
            for root, dirs, files in os.walk(SOURCE): # USE for filename in os.listdir(SOURCE): # if you don't want RECURSION #
                dstDIR = root.replace(SOURCE, DST, 1) # USE dstDIR = DST # if you don't want RECURSION #
                if not os.path.exists(dstDIR):
                    os.makedirs(dstDIR)
                for filename in files:                # USE if not os.path.isdir(os.path.join(SOURCE, filename)): # if you don't want RECURSION #
                    srcFILE = os.path.join(root, filename) # USE os.path.join(SOURCE, filename) # if you don't want RECURSION #
                    dstFILE = os.path.join(dstDIR, filename)
                    if os.path.exists(dstFILE): continue # MUST MATCH THE PREVIOUS count bytes loop 
                    #   <- <-                              this jumps to the next file without copying this file, if destination file exists. 
                    #                                      Comment to copy with rename or overwrite dstFILE
                    #
                    # RENAME part below
                    head, tail = os.path.splitext(filename)
                    count = -1
                    year = int(time.strftime("%Y"))
                    month = int(time.strftime("%m"))
                    day = int(time.strftime("%d"))
                    hour = int(time.strftime("%H"))
                    minute = int(time.strftime("%M"))
                    while os.path.exists(dstFILE):
                        count += 1
                        if count == 0:
                            dstFILE = os.path.join(dstDIR, '{:s}[{:d}.{:d}.{:d}]{:d}-{:d}{:s}'.format(head, year, month, day, hour, minute, tail))
                        else:
                            dstFILE = os.path.join(dstDIR, '{:s}[{:d}.{:d}.{:d}]{:d}-{:d}[{:d}]{:s}'.format(head, year, month, day, hour, minute, count, tail))
                    # END of RENAME part
                    shutil.copy2(srcFILE, dstFILE)
                    FilesLeft -= 1
                    #
        
        '''
        Ex.
        CopyProgress('/path/to/SOURCE', '/path/to/DESTINATION')
        '''
        

        【讨论】:

          【解决方案6】:

          或者,您可以将ROBOCOPYos 模块一起使用。它不会为您提供进度条,但会在最后为您提供百分比指标以及可靠的摘要。

          import os
          
          def robocopy(source, destination, extension=''):
              os.system("robocopy {} {} {} /xx /njh".format(source, destination, extension))
          
          # Usage example
          robocopy(r'C:\Users\Example\Downloads', r'C:\Users\Example\Desktop', '*.mov')
          

          上面的例子会将所有的 .mov 文件复制到桌面

          extension留空会将源文件夹中的所有文件复制到目标文件夹。

          /xx 将多余的文件/目录从列表中删除

          /njh 删除作业标题

          有关详细信息,请参阅文档: https://docs.microsoft.com/en-us/windows-server/administration/windows-commands/robocopy

          【讨论】:

            【解决方案7】:

            这是一个简单的 PySide 应用程序可以将任何文件从源复制到目标

            #!/usr/bin/python3
            
            import os
            import sys
            from PySide2.QtWidgets import QProgressBar, QApplication, QDialog, QMainWindow, QPushButton
            from PySide2.QtCore import QThread, Signal, Slot
            
            
            class ProgressDialog(QDialog):
                def __init__(self, parent, source, destination):
                    QDialog.__init__(self, parent)
                    
                    self.resize(400, 50)
                    
                    self.parent = parent
                    self.source = source
                    self.destination = destination
                    
                    self.prog = QProgressBar(self)
                    self.prog.setMaximum(100)
                    self.prog.setMinimum(0)
                    self.prog.setFormat("%p%")
            
                def start(self):
                    self.show()
                    self.copy()
            
                def copy(self):
                    copy_thread = CopyThread(self, self.source, self.destination)
                    copy_thread.procPartDone.connect(self.update_progress)
                    copy_thread.procDone.connect(self.finished_copy)
                    copy_thread.start()
            
                def update_progress(self, progress):
                    self.prog.setValue(progress)
            
                def finished_copy(self, state):
                    self.close()
            
            class CopyThread(QThread):
            
                procDone = Signal(bool)
                procPartDone = Signal(int)
            
                def __init__(self, parent, source: str, destination: str):
                    QThread.__init__(self, parent)
                    
                    self.source = source
                    self.destination = destination
            
                def run(self):
                    self.copy()
                    self.procDone.emit(True)
            
                def copy(self):
                    source_size = os.stat(self.source).st_size
                    copied = 0
            
                    with open(self.source, "rb") as source, open(self.destination, "wb") as target:
                        while True:
                            chunk = source.read(1024)
                            if not chunk:
                                break
            
                            target.write(chunk)
                            copied += len(chunk)
            
                            self.procPartDone.emit(copied * 100 / source_size)
            
            
            class MainWindow(QMainWindow):
                def __init__(self, parent: object = None) -> None:
                    super().__init__(parent)
                    
                    self.src = "/path/to/file.ext"
                    self.dest = "/path/to/file.ext"
            
                    self.btn = QPushButton(self)
                    self.btn.setText("Start copy")
                    self.btn.clicked.connect(self.run)
                    
                    self.setCentralWidget(self.btn)
                    
                def run(self):
                    self.prog = ProgressDialog(self, self.src, self.dest)
                    self.prog.start()
            
            
            def main():   
                app = QApplication(sys.argv)
                
                window = MainWindow()
                window.show()
                
                sys.exit(app.exec_())
            
            if __name__ == "__main__":
                main()
            

            将此脚本保存在'main.py'文件中并执行命令

            python3 main.py
            

            【讨论】:

              猜你喜欢
              • 1970-01-01
              • 2011-08-28
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 2019-11-09
              • 2018-04-10
              • 1970-01-01
              • 2014-01-20
              相关资源
              最近更新 更多