【问题标题】:pyaudio and pynput: recording while a key is being pressed/held downpyaudio 和 pynput:在按下/按住键时录制
【发布时间】:2017-12-07 06:24:27
【问题描述】:

我目前正在尝试制作一个简单的脚本,该脚本会在按下某个键时进行记录。我需要生成一些数据,所以脚本的目的是用一个句子提示终端,当说话者按下一个键时,脚本将开始录音,说话者在这个时候读句子。当句子被说出时,如果按键释放停止录音并因此创建音频文件..

这是我目前拥有的:

from pynput import keyboard
import time
import pyaudio
import wave

CHUNK = 8192
FORMAT = pyaudio.paInt16
CHANNELS = 2
RATE = 44100
RECORD_SECONDS = 5
WAVE_OUTPUT_FILENAME = "output.wav"

p = pyaudio.PyAudio()
frames = []

def callback(in_data, frame_count, time_info, status):
    return (in_data, pyaudio.paContinue)

class MyListener(keyboard.Listener):
    def __init__(self):
        super(MyListener, self).__init__(self.on_press, self.on_release)
        self.key_pressed = None

        self.stream = p.open(format=FORMAT,
                             channels=CHANNELS,
                             rate=RATE,
                             input=True,
                             frames_per_buffer=CHUNK,
                             stream_callback = self.callback)
        print self.stream.is_active()

    def on_press(self, key):
        if key == keyboard.Key.cmd_l:
            self.key_pressed = True

    def on_release(self, key):
        if key == keyboard.Key.cmd_l:
            self.key_pressed = False

    def callback(self,in_data, frame_count, time_info, status):
        if self.key_pressed == True:
            return (in_data, pyaudio.paContinue)
        elif self.key_pressed == False:
            return (in_data, pyaudio.paComplete)
        else:
            return (in_data,pyaudio.paAbort)


listener = MyListener()
listener.start()
started = False

while True:
    time.sleep(0.1)
    if listener.key_pressed == True and started == False:
        started = True
        listener.stream.start_stream()
        print "start Stream"

    elif listener.key_pressed == False and started == True:
        print "Something coocked"
        listener.stream.stop_stream()
        listener.stream.close()
        p.terminate()

        wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
        wf.setnchannels(CHANNELS)
        wf.setsampwidth(p.get_sample_size(FORMAT))
        wf.setframerate(RATE)
        wf.writeframes(b''.join(frames))
        wf.close()

        started = False

我似乎遇到了帧不断下降的问题,并且没有记录任何内容。我使用回调函数进行了实现,因为我认为键盘线程可能已经阻塞了它,所以为了确定..但似乎没有任何记录,因为我不断收到IOerror...我是什么做错了吗?

【问题讨论】:

    标签: python python-2.7 keyboard-events pyaudio


    【解决方案1】:

    这似乎至少在 Windows、Python 3.5 上有效。初始代码的最大问题是: 它做了一个while循环,它几乎阻塞了所有东西, 框架未附加到框架列表中。回调现在执行此操作。

    from pynput import keyboard
    import time
    import pyaudio
    import wave
    import sched
    import sys
    
    CHUNK = 8192
    FORMAT = pyaudio.paInt16
    CHANNELS = 2
    RATE = 44100
    RECORD_SECONDS = 5
    WAVE_OUTPUT_FILENAME = "output.wav"
    
    p = pyaudio.PyAudio()
    frames = []
    
    def callback(in_data, frame_count, time_info, status):
        frames.append(in_data)
        return (in_data, pyaudio.paContinue)
    
    class MyListener(keyboard.Listener):
        def __init__(self):
            super(MyListener, self).__init__(self.on_press, self.on_release)
            self.key_pressed = None
            self.wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
            self.wf.setnchannels(CHANNELS)
            self.wf.setsampwidth(p.get_sample_size(FORMAT))
            self.wf.setframerate(RATE)
        def on_press(self, key):
            if key.char == 'r':
                self.key_pressed = True
            return True
    
        def on_release(self, key):
            if key.char == 'r':
                self.key_pressed = False
            return True
    
    
    listener = MyListener()
    listener.start()
    started = False
    stream = None
    
    def recorder():
        global started, p, stream, frames
    
        if listener.key_pressed and not started:
            # Start the recording
            try:
                stream = p.open(format=FORMAT,
                                 channels=CHANNELS,
                                 rate=RATE,
                                 input=True,
                                 frames_per_buffer=CHUNK,
                                 stream_callback = callback)
                print("Stream active:", stream.is_active())
                started = True
                print("start Stream")
            except:
                raise
    
        elif not listener.key_pressed and started:
            print("Stop recording")
            stream.stop_stream()
            stream.close()
            p.terminate()
            listener.wf.writeframes(b''.join(frames))
            listener.wf.close()
            print "You should have a wav file in the current directory"
            sys.exit()
        # Reschedule the recorder function in 100 ms.
        task.enter(0.1, 1, recorder, ())
    
    
    print "Press and hold the 'r' key to begin recording"
    print "Release the 'r' key to end recording"
    task = sched.scheduler(time.time, time.sleep)
    task.enter(0.1, 1, recorder, ())
    task.run()
    

    【讨论】:

      【解决方案2】:

      只需像下面这样重写回调。

      def callback(self,in_data, frame_count, time_info, status):
          print("callback")
          if self.key_pressed == True:
              #stream_queue.put(in_data)
              print("record")
              frames.append(in_data)
              return (in_data, pyaudio.paContinue)
      
          elif self.key_pressed == False:
              #stream_queue.put(in_data)
              frames.append(in_data)
              return (in_data, pyaudio.paComplete)
      
          else:
              print("not record")
              return (in_data,pyaudio.paContinue)
      

      然后它会工作。

      【讨论】:

        【解决方案3】:

        以下适用于我的 Arch Linux 机器。 启示:pyaudio and pynput: recording while a key is being pressed/held down,https://gist.github.com/sloria/5693955中的先前回答

        # Inspirations: https://stackoverflow.com/questions/44894796/pyaudio-and-pynput-recording-while-a-key-is-being-pressed-held-down, https://gist.github.com/sloria/5693955
        import logging
        import sched
        import time
        import wave
        
        import pyaudio
        from pynput import keyboard
        
        
        for handler in logging.root.handlers[:]:
            logging.root.removeHandler(handler)
        logging.basicConfig(
            level=logging.DEBUG,
            format="%(levelname)s:%(asctime)s:%(module)s:%(lineno)d %(message)s"
        )
        
        
        class KeyPressTriggeredRecorder(object):
            '''Helps record audio during the duration of key-presses.
            Records in mono by default.
        
            Example usage:
                recorder.KeyPressTriggeredRecorder("test.wav").record()
            '''
        
            def __init__(self, trigger_key=keyboard.Key.space, channels=1, rate=44100, frames_per_buffer=1024):
                self.trigger_key = trigger_key
                self.key_pressed = False
                self.recording_started = False
                self.recording_stopped = False
                self.channels = channels
                self.rate = rate
                self.frames_per_buffer = frames_per_buffer
                self.key_listener = keyboard.Listener(self._on_press, self._on_release)
                self.task_scheduler = sched.scheduler(time.time, time.sleep)
        
            def reset(self):
                self.key_pressed = False
                self.recording_started = False
                self.recording_stopped = False
        
            def _on_press(self, key):
                # logging.info(key)
                if key == self.trigger_key:
                    self.key_pressed = True
                return True
        
            def _on_release(self, key):
                # logging.info(key)
                if key == self.trigger_key:
                    self.key_pressed = False
                    # Close listener
                    return False
                return True
        
            def record(self, fname):
                logging.info("Waiting for any key")
                self.reset()
                self.key_listener.start()
                recording_file = RecordingFile(
                    fname=fname, mode='wb', channels=self.channels, rate=self.rate,
                    frames_per_buffer=self.frames_per_buffer)
                def keychek_loop():
                    if self.key_pressed and not self.recording_started:
                        logging.info("Speak while you keep the key pressed.")
                        recording_file.start_recording()
                        self.recording_started = True
                    elif not self.key_pressed and self.recording_started:
                        recording_file.stop_recording()
                        self.recording_stopped = True
                    if not self.recording_stopped:
                        self.task_scheduler.enter(delay=.1, priority=1, action=keychek_loop)
                        self.task_scheduler.run()
                keychek_loop()
        
        
        class RecordingFile(object):
            """"Type of object corresponding to a particular recording.
        
            See :py:class:KeyPressTriggeredRecorder for example usage.
            """
            def __init__(self, fname, mode, channels,
                         rate, frames_per_buffer):
                self.fname = fname
                self.mode = mode
                self.channels = channels
                self.rate = rate
                self.frames_per_buffer = frames_per_buffer
                self._pa = pyaudio.PyAudio()
                self.chosen_device_index = -1
                for x in range(0,self._pa.get_device_count()):
                    info = self._pa.get_device_info_by_index(x)
                    # logging.info(self._pa.get_device_info_by_index(x))
                    if info["name"] == "pulse":
                        self.chosen_device_index = info["index"]
                        # logging.debug("Chosen index: %d", self.chosen_device_index)     
                self.wavefile = self._prepare_file(self.fname, self.mode)
                self._stream = None
        
            def __enter__(self):
                return self
        
            def __exit__(self, exception, value, traceback):
                self.close()
        
            def record(self, duration):
                # Use a stream with no callback function in blocking mode
                self._stream = self._pa.open(format=pyaudio.paInt16,
                                             channels=self.channels,
                                             rate=self.rate,
                                             input_device_index=self.chosen_device_index,
                                             input=True,
                                             frames_per_buffer=self.frames_per_buffer)
                for _ in range(int(self.rate / self.frames_per_buffer * duration)):
                    audio = self._stream.read(self.frames_per_buffer)
                    self.wavefile.writeframes(audio)
                return None
        
            def start_recording(self):
                # Use a stream with a callback in non-blocking mode
                # logging.info("Starting recording")
                self._stream = self._pa.open(format=pyaudio.paInt16,
                                             channels=self.channels,
                                             rate=self.rate,
                                             input=True,
                                             frames_per_buffer=self.frames_per_buffer,
                                             stream_callback=self._get_callback())
                self._stream.start_stream()
                return self
        
            def stop_recording(self):
                self._stream.stop_stream()
                return self
        
            def _get_callback(self):
                def callback(in_data, frame_count, time_info, status):
                    self.wavefile.writeframes(in_data)
                    return in_data, pyaudio.paContinue
                return callback
        
        
            def close(self):
                self._stream.close()
                self._pa.terminate()
                self.wavefile.close()
        
            def _prepare_file(self, fname, mode='wb'):
                import os
                os.makedirs(os.path.dirname(fname), exist_ok=True)
                wavefile = wave.open(fname, mode)
                wavefile.setnchannels(self.channels)
                wavefile.setsampwidth(self._pa.get_sample_size(pyaudio.paInt16))
                wavefile.setframerate(self.rate)
                return wavefile
        
        recorder.KeyPressTriggeredRecorder().record(FILEPATH)
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2021-01-08
          • 2020-11-03
          • 2013-06-07
          • 1970-01-01
          • 2019-10-06
          • 1970-01-01
          相关资源
          最近更新 更多