Using another microphone instead of Respeaker

Hi @dsquid,

Good question! Some previous work has used the SpeechRecognition python package; see this forum post for more explanation of working with that package.

Since that forum post is a bit old, and any ROS code in the old post is ROS1, here’s an isolated tutorial for using a different USB microphone in ROS2; I hope that it helps with your integration!

Set up USB Microphone

To test, I plugged my Samson Condenser Microphone into a USB port on the base of a Stretch RE3. You can change the input device in Ubuntu sound settings as shown below; you may have to adjust the gain to get suitable performance with your Rode mic.

ROS2 Node

I whipped up a ROS2 node that is a simplified adaptation of the “color listening” ROS1 node linked in the previous form post (link to that code here). This ROS2 node will use your system’s default microphone and use the SpeechRecognition package to transcribe audio from an audio clip.

Hopefully this code can serve as a useful reference, or as a starting point for building your application.

Example Usage

  1. Copy this node into your ROS2 project and add it to your setup.py file.

  2. In a terminal, run ros2 run your_package node_name.

  3. Say something into your mic, and the output should be something like:

[INFO] [1722017398.853921312] [transcriber_node]: Processing Audio...
[INFO] [1722017399.414619283] [transcriber_node]: recognized text: what's up squid
[INFO] [1722017399.418863376] [transcriber_node]: Transcriber done.

Node Code

import threading

import rclpy
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node

# Speech Recognition
import speech_recognition as sr
from speech_recognition.audio import AudioData

class Transcriber(Node):
    def __init__(self):
        super().__init__("transcriber_node")

        # Initialize speech recognizer
        self.recognizer = sr.Recognizer()

    def _predict_text(self, audio_clip: AudioData) -> str:
        """
        Predicts text contained in an audio snippet.

        Parameters
        ----------
        audio_clip : AudioData
            Audio data output from the SpeechRecognition recognizer object

        Returns
        -------
        str
            English text contained in the audio data
        """

        self.get_logger().info("Processing Audio...")
        try:
            return self.recognizer.recognize_google(audio_clip)  # you can change this to be something else
        except sr.UnknownValueError:
            self.get_logger().info("Speech recognizer could not understand audio")
            return None
        except sr.RequestError as e:
            self.get_logger().info("Speech recognition error; {0}".format(e))
            return None

    def start_recording(self, recording_length_s: float=3.) -> str:
        """
        Triggers an audio recording and returns text contained in the recording.

        Parameters
        ----------
        recording_length_s : float
            Number of seconds to record for

        Returns
        -------
        str
            English text contained in the audio data
        """

        with sr.Microphone() as source:
            audio_clip = self.recognizer.record(source, duration=recording_length_s)
            text_string = self._predict_text(audio_clip)
            self.get_logger().info("recognized text: {}".format(text_string))
            return text_string

    def run(self):
        """
        Main method for node.
        """

        # you could put a loop here
        self.start_recording()
        self.get_logger().info("Transcriber done.")

def main():
    rclpy.init()
    node = Transcriber()
    executor = MultiThreadedExecutor(num_threads=4)

    # Spin in the background since detecting faces will block the main thread
    spin_thread = threading.Thread(
        target=rclpy.spin,
        args=(node,),
        kwargs={"executor": executor},
        daemon=True,
    )
    spin_thread.start()

    # Run node
    try:
        node.run()
    except KeyboardInterrupt:
        pass

    # Terminate this node
    node.destroy_node()
    rclpy.shutdown()

    # Join the spin thread (so it is spinning in the main thread)
    spin_thread.join()


if __name__ == '__main__':
    main()

1 Like