跳轉到

ROS2

1. ROS2 基礎介紹

推薦資源:ROS2 及 ROS Porting 自學筆記

Tip

ROS2 的教學資源其實非常多,社群也很廣大。
會鼓勵讀者多多上網查資料,這篇只會列出最核心的部分。
多看網路資源能更理解上下文。


1-1. ROS 簡介

ROS(Robot Operating System)是一套為機器人開發所設計的中介軟體架構,
提供常用工具與功能,例如通訊、硬體抽象、模組化架構等。

ROS2 每個發行版(distro)都對應特定的 Ubuntu 版本,
建議選用 LTS(Long Term Support)版本以獲得較長的維護與穩定性。

ROS2 Distro 支援 Ubuntu 版本 LTS 支援期限
Foxy 20.04 (Focal, LTS) 2023/05
Galactic 20.04 (Focal, LTS) 2022/11
Humble 22.04 (Jammy, LTS) 2027/05 (LTS)
Iron 22.04 (Jammy, LTS) 2024/11
Jazzy 24.04 (Noble, LTS) 2029/05 (LTS)

ROS Distro 選擇

開發專案優先選擇 ROS2 LTS 版本(如 Humble、Jazzy)及對應的 Ubuntu LTS 版本,以確保長期支援與相容性。

ROS1 vs. ROS2

ROS1 雖然廣泛使用,但在可靠性、安全性、即時性、多平台支援等方面有明顯限制。
ROS2 則是針對這些缺點進行重構,並導入了工業界標準的 DDS 架構。

ROS1 與 ROS2 的差異比較:

特性 ROS1 ROS2
通訊機制 基於 TCP/UDP 自訂協定 基於 DDS 標準
多平台支援 僅支援 Linux 為主 支援 Linux、Windows、macOS
實時性支援 不支援 可透過 DDS 設定實現基本實時性
安全性 幾乎無內建安全性 DDS 提供加密與存取控制機制
模組啟動與控制 不支援 Lifecycle 支援 Lifecycle 與狀態機

雖然 ROS2 支援 Linux 以外的 OS, 但還是盡量跑在 Linux OS。


1-2. ROS2 核心概念

核心概念 說明
Node ROS2 中的最小運作單元,負責特定任務的程式(如感測、控制、通訊等)。
Topic 非同步資料流通道,Publisher 發布、Subscriber 訂閱,適合持續性資料傳遞。
Service 同步請求/回應通訊,Client 發送請求、Server 回傳結果,適合短暫性任務。
Action 適合需回報進度或可中斷的長時間任務(如導航),支援目標、取消、狀態回報。
Publisher/Subscriber 一對多、多對多的訊息發布與接收機制,實現資料分發。
Message 定義節點間通訊的資料結構(如 std_msgs/String),可自訂型別以傳遞複雜資料。
tf2 座標轉換系統 管理多個座標系之間的關係與轉換,常用於機器人運動學。
Parameter 可於執行時動態設定、查詢、調整的參數,支援節點行為彈性化。
Lifecycle 節點生命週期管理機制,便於控制啟動、暫停、重啟等狀態,提升系統穩定性。

1-3. ROS2 Middleware

  • DDS (Data Distribution Service)

    • ROS2 的預設中介系統(例如 Fast DDS、Cyclone DDS 等)
    • 支援 QoS(Quality of Service)設定,如可靠性、延遲、保留歷史等
    • 分散式架構,不需中央 Master 管理
  • RMW(ROS Middleware Interface)層

    • 抽象層,提供 ROS2 與 DDS 的介接
    • 允許不同 DDS 實作(如 Fast DDS、RTI Connext)切換而不更改上層程式碼

1-4. 常見通訊機制

ROS2 提供三種主要的節點間通訊機制:

  • Publish-Subscribe
  • Service-Client
  • Action

它們各自適用於不同的應用場景。

機制 架構與機制 資料流特性 回應方式 適用情境
Pub-Sub 節點透過 Topic 進行非同步資料傳遞,Publisher 發布、Subscriber 訂閱,一對多或多對多,資料持續流動。 非同步 無回應 感測器數據、狀態回報、即時監控(如雷達、影像)
Srv-Cli 採同步請求/回應模式,Client 發送請求,Server 處理並回傳結果,一次請求對應一次回應。 同步 單次回應 查詢參數、設定值、單次控制(如拍照、開關設備)
Action 適合長時間、可中斷、需回報進度的任務,Client 發送目標,Server 回報進度、結果,支援取消。 非同步 進度+結果+取消 導航、移動、複雜操作(如路徑規劃、長時間運算)

2. 環境準備

開發須知

所有的軟體都必須使用 Docker 開發,不要直接在 Host Machine 上安裝套件。


2-1. ROS2 Docker

首先是 ROS 官方提供的 Image 有以下幾種版本。

學習階段,可以使用 ros:humble-desktop-full 避免缺少依賴套件的錯誤。
但開發階段,考慮到 Docker 減肥,最好是從 ros:<distro>-base 開始安裝必要工具。

ROS2 Official

Image 名稱 內容簡介 適用情境
ros:<distro>-base 僅含最基礎的 ROS 安裝 自訂需求、最小化映像檔
ros:<distro>-core 含開發工具與編譯器 Package 開發
ros:<distro>-desktop 含 GUI 工具(如 RViz) 模擬、一般開發
ros:<distro>-desktop-full 完整版本。
含模擬器(如 Gazebo)、範例套件等
需要完整功能與模擬環境
ROS2 Official vs. osrf

除了官方 Image,也可以參考看看 osrf 的版本。

項目 ROS 官方 Docker Image (ros:) OSRF Docker Image (osrf/ros:)
維護單位 Canonical & Open Robotics(官方) OSRF(Open Source Robotics Foundation)
Docker Hub 位置 https://hub.docker.com/_/ros https://hub.docker.com/r/osrf/ros
發布頻率 穩定、對齊 ROS 發布節奏 有時會提前測試新版本
支援版本 全版本
(base、core、desktop、full)
部分以 desktop 為主,
也有 dev images
安裝方式 使用 apt 安裝 ROS 套件 有時使用 rosinstall_generator 或自建
映像大小 較小(視版本而定) 有時包含多餘 dev 工具,較大
適合情境 正式開發與部署用 測試、CI/CD、自訂構建需求
鏡像結構 基於 Ubuntu 官方映像構建 有些版本是多階段構建,偏向開發用途
文件與說明 官方 ROS Wiki、Docker Hub
說明完整
相對較少、需查 GitHub 或社群說明

Checkpoint

選擇一個 ROS2 Docker Image,並且進入到 Container 內準備開發。
為了學習方便,請在 docker compose 中 volume 空資料夾到 container 內部:~/colcon_ws:/root/colcon_ws
如果不會操作,請回頭參閱 docker-compose


2-2. ROS2 Workspace 概念

ROS2 使用工作區(workspace)來管理程式碼與套件,最基本的結構如下:

ros2_ws/                # ROS2 工作區主目錄
├── build/              # (自動生成)colcon 建構中間產物(CMake build files)
├── install/            # (自動生成)安裝後的執行檔與設定檔(source 此資料夾以使用套件)
├── log/                # (自動生成)建構過程的日誌(編譯錯誤與輸出紀錄)
└── src/                # 原始碼放置處:放置 ROS2 packages
    ├── my_robot_pkg/   # 自訂或下載的 ROS2 package
    └── example_pkg/    # 可以有多個 ROS2 packages

有些目錄是由 ROS2 的建置工具 colcon 生成的,開發中不需要使用到他們。

再往下追溯到 ROS2 Package 的內部結構,也是遵循著一定的架構。
ROS2 Package 可用 C++ 或是 Python 語言撰寫,檔案架構上會有些許不同:

cpp_robot_pkg/
├── CMakeLists.txt                  # ROS2 CMake 建構腳本,編譯與安裝規則定義
├── package.xml                     # ROS2 套件描述檔,宣告依賴、作者等資訊
├── launch/
│   └── cpp_robot.launch.py         # 使用 Python 撰寫的啟動檔,啟動節點用
├── src/
│   └── cpp_robot_node.cpp          # 節點主程式 (C++ 撰寫),通常含 main()
├── include/
│   └── cpp_robot_pkg/
│       └── cpp_robot_node.hpp      # 對應的 C++ 標頭檔,定義類別與介面
└── config/
    └── params.yaml                 # 參數檔,供節點在啟動時載入設定值
py_robot_pkg/
├── package.xml                 # ROS2 套件描述檔,宣告依賴、執行點、授權等
├── setup.py                    # Python 安裝腳本,宣告套件結構與 entry_points
├── setup.cfg                   # 補充設定檔,簡化 setup.py 的格式定義
├── resource/
│   └── py_robot_pkg            # ament 所需,用來標示為可發佈套件(空檔即可)
├── py_robot_pkg/
│   ├── __init__.py             # Python 套件初始化(可留空)
│   └── py_robot_node.py        # 節點主程式 (Python 撰寫),含 rclpy 程式碼
├── launch/
│   └── py_robot.launch.py      # 啟動檔,定義如何啟動此 Python 節點
└── config/
    └── params.yaml             # 參數設定檔,以 YAML 格式定義參數值

ROS2 編譯工具 colcon

注意以上文件中有很多是依靠 ROS2 的編譯工具 colcon 建立或編譯的產物,
因此請跟著步驟使用工具建立檔案,而不要自己徒手新增檔案,否則可能會編譯或執行錯誤。

Checkpoint

理解 Workspace 架構,並且能分辨 workspace/srcpackage/src 的差異。


3. ROS2 基礎功能實作

實作內容

從乾淨的 ROS2 Workspace 搭建不同功能的 Node,並編譯、執行。


3-1. 建立 Workspace

你已經在 Container 內了嗎?

後續操作都是在 Docker Container 內執行,不要汙染到宿主機環境!

建立 Workspace 目錄:

mkdir -p ~/colcon_ws/src
src ~/colcon_ws

3-2. 撰寫 Publisher / Subscriber Node

  1. 建立 Package,以下分為 C++ 和 Python 兩種版本:

    # :~/colcon_ws$
    cd src
    ros2 pkg create --build-type ament_cmake cpp_pubsub --dependencies rclcpp std_msgs
    
    # :~/colcon_ws$
    cd src
    ros2 pkg create --build-type ament_python py_pubsub --dependencies rclpy std_msgs
    

    ros2 package 命名規則

    命名時請遵守以下規範:

    • 必須全部小寫字母(a-z),可包含數字(0-9)與底線(_)
    • 不可包含大寫字母、破折號(-)、空格或特殊符號
    • 必須以字母開頭
    • 參考官方命名慣例,避免與現有套件重名

    e.g realsense_roskoch_ros2_wrapper

  2. 撰寫 Publisher 和 Subscriber Node 及相關配置,也提供 C++ 和 Python 版本:

    Abstract

    talker 將定時發佈 message 到 topic 上面,並且由 listener 接收該 topic。

    檔案架構:

    cpp_pubsub/
    ├── CMakeLists.txt                # CMake 編譯配置檔
    ├── package.xml                   # ROS2 套件資訊與依賴設定
    └── src/
        ├── talker.cpp                # Publisher node 主程式
        └── listener.cpp              # Subscriber node 主程式
    
    py_pubsub/
    ├── package.xml                   # ROS2 套件資訊與依賴設定
    ├── setup.py                      # Python 安裝腳本,設定 entry_points
    ├── resource/
    │   └── py_pubsub                 # 標記為 ROS2 套件(空檔即可)
    ├── py_pubsub/
    │   ├── __init__.py               # Python 套件初始化(可留空)
    │   ├── talker.py                 # Publisher node 主程式
    │   └── listener.py               # Subscriber node 主程式
    

    程式碼:

    #include "rclcpp/rclcpp.hpp"
    #include "std_msgs/msg/string.hpp"
    #include <chrono>
    
    using namespace std::chrono_literals;
    
    class Talker : public rclcpp::Node {
    public:
        Talker() : Node("talker_node") {
            publisher_ = this->create_publisher<std_msgs::msg::String>("chatter", 10);
            timer_ = this->create_wall_timer(500ms, std::bind(&Talker::timer_callback, this));
        }
    
    private:
        void timer_callback() {
            auto msg = std_msgs::msg::String();
            msg.data = "Hello, ROS2!";
            RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", msg.data.c_str());
            publisher_->publish(msg);
        }
    
        rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
        rclcpp::TimerBase::SharedPtr timer_;
    };
    
    int main(int argc, char *argv[]) {
        rclcpp::init(argc, argv);
        rclcpp::spin(std::make_shared<Talker>());
        rclcpp::shutdown();
        return 0;
    }
    
    #include "rclcpp/rclcpp.hpp"
    #include "std_msgs/msg/string.hpp"
    
    class Listener : public rclcpp::Node {
    public:
        Listener() : Node("listener_node") {
            subscription_ = this->create_subscription<std_msgs::msg::String>(
            "chatter", 10,
            [this](const std_msgs::msg::String::SharedPtr msg) {
                RCLCPP_INFO(this->get_logger(), "Received: '%s'", msg->data.c_str());
            });
        }
    
    private:
        rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_;
    };
    
    int main(int argc, char *argv[]) {
        rclcpp::init(argc, argv);
        rclcpp::spin(std::make_shared<Listener>());
        rclcpp::shutdown();
        return 0;
    }
    
    cmake_minimum_required(VERSION 3.8)
    project(cpp_pubsub)
    
    find_package(ament_cmake REQUIRED)
    find_package(rclcpp REQUIRED)
    find_package(std_msgs REQUIRED)
    
    add_executable(talker src/talker.cpp)
    ament_target_dependencies(talker rclcpp std_msgs)
    
    add_executable(listener src/listener.cpp)
    ament_target_dependencies(listener rclcpp std_msgs)
    
    install(TARGETS talker listener DESTINATION lib/${PROJECT_NAME})
    
    ament_package()
    
    <?xml version="1.0"?>
    <package format="3">
        <name>cpp_pubsub</name>
        <version>0.0.0</version>
        <description>C++ publisher/subscriber example</description>
        <maintainer email="your_email@example.com">Your Name</maintainer>
        <license>Apache License 2.0</license>
    
        <buildtool_depend>ament_cmake</buildtool_depend>
        <exec_depend>rclcpp</exec_depend>
        <exec_depend>std_msgs</exec_depend>
    
        <export>
            <build_type>ament_cmake</build_type>
        </export>
    </package>
    
    import rclpy
    from rclpy.node import Node
    from std_msgs.msg import String
    
    class Talker(Node):
        def __init__(self):
            super().__init__('talker_node')
            self.publisher_ = self.create_publisher(String, 'chatter', 10)
            self.timer = self.create_timer(0.5, self.timer_callback)
    
        def timer_callback(self):
            msg = String()
            msg.data = 'Hello from Python!'
            self.get_logger().info(f'Publishing: "{msg.data}"')
            self.publisher_.publish(msg)
    
    def main(args=None):
        rclpy.init(args=args)
        node = Talker()
        rclpy.spin(node)
        node.destroy_node()
        rclpy.shutdown()
    
    if __name__ == '__main__':
        main()
    
    import rclpy
    from rclpy.node import Node
    from std_msgs.msg import String
    
    class Listener(Node):
        def __init__(self):
            super().__init__('listener_node')
            self.subscription = self.create_subscription(
                String,
                'chatter',
                self.listener_callback,
                10)
    
        def listener_callback(self, msg):
            self.get_logger().info(f'Received: "{msg.data}"')
    
    def main(args=None):
        rclpy.init(args=args)
        node = Listener()
        rclpy.spin(node)
        node.destroy_node()
        rclpy.shutdown()
    
    if __name__ == '__main__':
        main()
    
    from setuptools import setup
    
    package_name = 'py_pubsub'
    
    setup(
        name=package_name,
        version='0.0.0',
        packages=[package_name],
        entry_points={
            'console_scripts': [
                'talker = py_pubsub.talker:main',
                'listener = py_pubsub.listener:main',
            ],
        },
        install_requires=['setuptools'],
        zip_safe=True,
        maintainer='your_name',
        description='Python publisher/subscriber example',
    )
    
    <?xml version="1.0"?>
    <package format="3">
        <name>py_pubsub</name>
        <version>0.0.0</version>
        <description>Python publisher/subscriber example</description>
        <maintainer email="your_email@example.com">Your Name</maintainer>
        <license>Apache License 2.0</license>
    
        <buildtool_depend>ament_python</buildtool_depend>
        <exec_depend>rclpy</exec_depend>
        <exec_depend>std_msgs</exec_depend>
    
        <export>
            <build_type>ament_python</build_type>
        </export>
    </package>
    

    Checkpoint

    1. 自行理解程式碼的涵義。
    2. 範例使用的 message type 為何? 這個 message 的結構是什麼?
  3. 使用 colcon 工具編譯工作區。

    cd ~/colcon_ws
    colcon build --packages-select cpp_pubsub --symlink-install
    
    cd ~/colcon_ws
    colcon build --packages-select py_pubsub --symlink-install
    

    編譯完成後,使用 source 安裝環境:

    source install/setup.bash
    
    colcon build 常見參數
    參數 說明
    --packages-select 只編譯特定套件。
    --symlink-install 對 Python 套件建立符號連結,開發時不需重複編譯。
    強烈推薦用在開發中。
    --packages-up-to 編譯某個套件及其所有相依套件。
    --parallel-workers N 指定同時編譯數量。
    通常與 CPU 數量相同。
    --event-handlers console_direct+ 更清晰即時顯示 log。
    推薦在 debug 編譯錯誤時使用。
    --cmake-args -DCMAKE_BUILD_TYPE=Release 傳遞給 CMake 的參數。
    例如指定 Release 模式來提升效能。
    --continue-on-error 若部分套件失敗仍繼續編譯其他套件
    (適合 batch build)。

    建構常見問題 & 建議

    1. 若套件名稱拼錯,--packages-select 會直接忽略該套件,無任何錯誤訊息。

    2. 使用 --symlink-install 後,如果你修改了 Python 檔案,不需要重新 build;
      但如果修改的是 C++ 或 interface (msg/srv/action),一定要 colcon build。

    3. 每個新 terminal 都要做 source install/setup.bash,或可加進 ~/.bashrc
      以及每次編譯完也建議都做一次 source install/setup.bash

    4. 若遇到編譯錯誤,可用 colcon build --event-handlers console_direct+ 去看 error log。

    Checkpoint

    1. 成功編譯,並看到工作區中生成的 build/, install/log/
    2. source install/setup.bash 放進 ~/.bashrc 內。
      每開啟新 terminal 輸入 ros2 pkg list | grep -E 'cpp_pubsub|py_pubsub',應能顯示出自己的套件。
  4. 執行 ROS2 Node。

    開啟兩個 Terminal,分別執行 talkerlistener

    運行 talker node。

    ros2 run cpp_pubsub talker
    
    ros2 run py_pubsub talker
    

    運行 listener node。

    ros2 run cpp_pubsub listener
    
    ros2 run py_pubsub listener
    

    一定開啟兩個終端機嗎?

    不用。須開啟兩個的原因是,目前的 ROS2 Node 都是持續運作的程序,所以運行後就無法繼續輸入指令。

    我們可以使用 & 來使當前程序進入背景執行,例如:ros2 run cpp_pubsub talker &
    這樣按下 ctrl+C 後也會持續運作,這點可以用 psjobs 來確認。
    若要終止該進程,可以使用 kill 指令,這些就留給讀者自行實踐。

    Checkpoint

    開啟兩個終端機,並且成功運行兩個 Node。

  5. topic 相關工具測試。

    指令 說明
    ros2 topic list 查看目前所有 topic 名稱
    ros2 topic echo /chatter 監看 /chatter topic 的內容
    ros2 topic info /chatter 顯示 topic 資訊(型別、pub/sub)
    ros2 topic hz /chatter 顯示訊息頻率
    ros2 topic pub /chatter std_msgs/String "data: 'hello'" 手動發送一筆訊息

    Checkpoint

    使用以上所有的 topic 指令測試。


3-4. 撰寫 Service / Client

  1. 建立 Package。

    # :~/colcon_ws$
    cd src
    ros2 pkg create --build-type ament_cmake cpp_srvcli --dependencies rclcpp example_interfaces
    
    # :~/colcon_ws$
    cd src
    ros2 pkg create --build-type ament_python py_srvcli --dependencies rclpy example_interfaces
    
  2. 撰寫 Service 和 Client Node。

    Abstract

    add_two_ints_server 是一個服務節點,會接收兩個整數,並回傳它們的總和。
    add_two_ints_client 是一個用來呼叫該服務的節點,送出請求並顯示回傳的加總結果。

    檔案架構:

    cpp_srvcli/
    ├── CMakeLists.txt                    # CMake 編譯配置檔
    ├── package.xml                       # ROS2 套件資訊與依賴設定
    └── src/
        ├── add_two_ints_client.cpp       # 用於呼叫 service 的 client node
        └── add_two_ints_server.cpp       # 提供服務的 server node
    
    py_srvcli/
    ├── package.xml                       # ROS2 套件資訊與依賴設定
    ├── setup.py                          # Python 套件安裝腳本,設定 console_scripts
    ├── setup.cfg                         # Python 安裝用的輔助設定檔(可選)
    ├── resource/
    │   └── py_srvcli                     # 用於標記為 ROS2 套件
    └── py_srvcli/
        ├── __init__.py                   # 空白即可,標記為 Python 模組
        ├── add_two_ints_client.py        # 用於呼叫 service 的 client node
        └── add_two_ints_server.py        # 提供服務的 server node
    

    程式碼:

    #include "rclcpp/rclcpp.hpp"
    #include "example_interfaces/srv/add_two_ints.hpp"
    
    using AddTwoInts = example_interfaces::srv::AddTwoInts;
    using std::placeholders::_1;
    using std::placeholders::_2;
    
    class AddTwoIntsServer : public rclcpp::Node {
    public:
        AddTwoIntsServer() : Node("add_two_ints_server") {
            service_ = create_service<AddTwoInts>(
            "add_two_ints", std::bind(&AddTwoIntsServer::handle_add, this, _1, _2));
        }
    
    private:
        void handle_add(const std::shared_ptr<AddTwoInts::Request> request,
                        std::shared_ptr<AddTwoInts::Response> response) {
            response->sum = request->a + request->b;
            RCLCPP_INFO(this->get_logger(), "Received: %ld + %ld", request->a, request->b);
        }
    
        rclcpp::Service<AddTwoInts>::SharedPtr service_;
    };
    
    int main(int argc, char **argv) {
        rclcpp::init(argc, argv);
        rclcpp::spin(std::make_shared<AddTwoIntsServer>());
        rclcpp::shutdown();
        return 0;
    }
    
    #include "rclcpp/rclcpp.hpp"
    #include "example_interfaces/srv/add_two_ints.hpp"
    
    using AddTwoInts = example_interfaces::srv::AddTwoInts;
    
    int main(int argc, char **argv) {
        rclcpp::init(argc, argv);
        auto node = rclcpp::Node::make_shared("add_two_ints_client");
        auto client = node->create_client<AddTwoInts>("add_two_ints");
    
        while (!client->wait_for_service(std::chrono::seconds(1))) {
            RCLCPP_INFO(node->get_logger(), "Waiting for service...");
        }
    
        auto request = std::make_shared<AddTwoInts::Request>();
        request->a = 3;
        request->b = 7;
    
        auto result = client->async_send_request(request);
        if (rclcpp::spin_until_future_complete(node, result) 
            rclcpp::FutureReturnCode::SUCCESS) {
            RCLCPP_INFO(node->get_logger(), "Result: %ld", result.get()->sum);
        }
    
        rclcpp::shutdown();
        return 0;
    }
    
    find_package(ament_cmake REQUIRED)
    find_package(rclcpp REQUIRED)
    find_package(example_interfaces REQUIRED)
    
    add_executable(add_two_ints_server src/add_two_ints_server.cpp)
    add_executable(add_two_ints_client src/add_two_ints_client.cpp)
    
    ament_target_dependencies(add_two_ints_server rclcpp example_interfaces)
    ament_target_dependencies(add_two_ints_client rclcpp example_interfaces)
    
    install(TARGETS
        add_two_ints_server
        add_two_ints_client
        DESTINATION lib/${PROJECT_NAME}
    )
    
    ament_package()
    
    <?xml version="1.0"?>
    <package format="3">
        <name>cpp_srvcli</name>
        <version>0.0.0</version>
        <description>C++ service/client example</description>
        <maintainer email="your_email@example.com">Your Name</maintainer>
        <license>Apache License 2.0</license>
    
        <buildtool_depend>ament_cmake</buildtool_depend>
        <build_depend>rclcpp</build_depend>
        <build_depend>example_interfaces</build_depend>
        <exec_depend>rclcpp</exec_depend>
        <exec_depend>example_interfaces</exec_depend>
    
        <export>
            <build_type>ament_cmake</build_type>
        </export>
    </package>
    
    import rclpy
    from rclpy.node import Node
    from example_interfaces.srv import AddTwoInts
    
    class AddTwoIntsServer(Node):
        def __init__(self):
            super().__init__('add_two_ints_server')
            self.srv = self.create_service(AddTwoInts, 'add_two_ints', self.handle_add)
    
        def handle_add(self, request, response):
            self.get_logger().info(f"Received request: {request.a} + {request.b}")
            response.sum = request.a + request.b
            return response
    
    def main(args=None):
        rclpy.init(args=args)
        node = AddTwoIntsServer()
        rclpy.spin(node)
        rclpy.shutdown()
    
    if __name__ == '__main__':
        main()
    
    import rclpy
    from rclpy.node import Node
    from example_interfaces.srv import AddTwoInts
    
    class AddTwoIntsClient(Node):
        def __init__(self):
            super().__init__('add_two_ints_client')
            self.client = self.create_client(AddTwoInts, 'add_two_ints')
            while not self.client.wait_for_service(timeout_sec=1.0):
                self.get_logger().info('Waiting for service...')
            self.send_request()
    
        def send_request(self):
            req = AddTwoInts.Request()
            req.a = 3
            req.b = 5
            future = self.client.call_async(req)
            rclpy.spin_until_future_complete(self, future)
            if future.result():
                self.get_logger().info(f'Result: {future.result().sum}')
    
    def main(args=None):
        rclpy.init(args=args)
        node = AddTwoIntsClient()
        rclpy.shutdown()
    
    if __name__ == '__main__':
        main()
    
    from setuptools import setup
    package_name = 'py_srvcli'
    
    setup(
        name=package_name,
        version='0.0.0',
        packages=[package_name],
        entry_points={
            'console_scripts': [
                'add_two_ints_server = py_srvcli.add_two_ints_server:main',
                'add_two_ints_client = py_srvcli.add_two_ints_client:main',
            ],
        },
        install_requires=['setuptools'],
        zip_safe=True,
        maintainer='your_name',
        description='Example Service/Client in Python',
    )
    
    <?xml version="1.0"?>
    <package format="3">
        <name>cpp_srvcli</name>
        <version>0.0.0</version>
        <description>C++ service/client example</description>
        <maintainer email="your_email@example.com">Your Name</maintainer>
        <license>Apache License 2.0</license>
    
        <buildtool_depend>ament_cmake</buildtool_depend>
        <exec_depend>rclpy</exec_depend>
        <exec_depend>example_interfaces</exec_depend>
    
        <export>
            <build_type>ament_cmake</build_type>
        </export>
    </package>
    

    Checkpoint

    理解程式碼的涵義。

  3. 使用 colcon 工具編譯工作區。

    cd ~/colcon_ws
    colcon build --packages-select cpp_srvcli --symlink-install
    
    cd ~/colcon_ws
    colcon build --packages-select py_srvcli --symlink-install
    

    編譯完成後,使用 source 安裝環境:

    source install/setup.bash
    
  4. 執行 ROS2 Service Node。

    開啟兩個 Terminal,分別執行 add_two_ints_serveradd_two_ints_client

    運行 add_two_ints_server node。

    ros2 run cpp_srvcli add_two_ints_server
    
    ros2 run cpp_srvcli add_two_ints_client
    

    運行 add_two_ints_client node。

    ros2 run py_srvcli add_two_ints_server
    
    ros2 run py_srvcli add_two_ints_client
    

    Checkpoint

    成功運行 service 服務節點。

  5. service 相關工具測試。

    指令 說明
    ros2 service list 查看目前所有 topic 名稱
    ros2 service type /add_two_ints 查詢服務型別
    ros2 interface show example_interfaces/srv/AddTwoInts 查詢型別結構

    Checkpoint

    成功測試所有以上 service 指令。


3-5. 撰寫 Action

Action 機制

ROS 2 中的 Action 是用來處理長時間運行任務的一種通訊方式,例如導航、追蹤等。它具有三種互動類型:

  • goal:用來傳送請求開始任務
  • feedback:期間性回報狀態
  • result:最終回傳結果
  • cancel:中途取消任務 (Optional)
  1. 建立 Package:

    # :~/colcon_ws$
    cd src
    ros2 pkg create cpp_action_example --build-type ament_cmake --dependencies rclcpp action_msgs
    
    # :~/colcon_ws$
    cd src
    ros2 pkg create py_action_example --build-type ament_python --dependencies rclpy action_msgs
    
  2. 撰寫 Service 和 Client Node。

    Abstract

    add_two_ints_server 是一個服務節點,會接收兩個整數,並回傳它們的總和。
    add_two_ints_client 是一個用來呼叫該服務的節點,送出請求並顯示回傳的加總結果。

    檔案架構:

    cpp_action_example/
    ├── action/
    │   └── Fibonacci.action
    ├── src/
    │   ├── action_server.cpp
    │   └── action_client.cpp
    ├── CMakeLists.txt
    └── package.xml
    
    py_action_example/
    ├── action/
    │   └── Fibonacci.action
    ├── py_action_example/
    │   ├── __init__.py
    │   ├── action_server.py
    │   └── action_client.py
    ├── setup.py
    ├── package.xml
    └── setup.cfg
    

    程式碼:

    #include <memory>
    #include <vector>
    #include <rclcpp/rclcpp.hpp>
    #include <rclcpp_action/rclcpp_action.hpp>
    #include "cpp_action_example/action/fibonacci.hpp"
    
    class FibonacciActionServer : public rclcpp::Node{
    public:
        using Fibonacci = cpp_action_example::action::Fibonacci;
        using GoalHandleFibonacci = rclcpp_action::ServerGoalHandle<Fibonacci>;
    
        explicit FibonacciActionServer(): Node("fibonacci_action_server"){
            action_server_ = rclcpp_action::create_server<Fibonacci>(
            this,
            "fibonacci",
            std::bind(&FibonacciActionServer::handle_goal, this, std::placeholders::_1, std::placeholders::_2),
            std::bind(&FibonacciActionServer::handle_cancel, this, std::placeholders::_1),
            std::bind(&FibonacciActionServer::handle_accepted, this, std::placeholders::_1));
        }
    
    private:
        rclcpp_action::Server<Fibonacci>::SharedPtr action_server_;
    
        rclcpp_action::GoalResponse handle_goal(
            const rclcpp_action::GoalUUID &,
            std::shared_ptr<const Fibonacci::Goal> goal){
            RCLCPP_INFO(this->get_logger(), "Received goal: order = %d", goal->order);
            return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE;
        }
    
        rclcpp_action::CancelResponse handle_cancel(const std::shared_ptr<GoalHandleFibonacci> goal_handle){
            RCLCPP_INFO(this->get_logger(), "Received cancel request");
            return rclcpp_action::CancelResponse::ACCEPT;
        }
    
        void handle_accepted(const std::shared_ptr<GoalHandleFibonacci> goal_handle){
            std::thread{std::bind(&FibonacciActionServer::execute, this, goal_handle)}.detach();
        }
    
        void execute(const std::shared_ptr<GoalHandleFibonacci> goal_handle){
            RCLCPP_INFO(this->get_logger(), "Executing goal...");
            const auto goal = goal_handle->get_goal();
            auto feedback = std::make_shared<Fibonacci::Feedback>();
            auto result = std::make_shared<Fibonacci::Result>();
    
            feedback->partial_sequence = {0, 1};
            rclcpp::Rate rate(1);
            for (int i = 2; i < goal->order; ++i){
            if (goal_handle->is_canceling()) {
                goal_handle->canceled(result);
                RCLCPP_INFO(this->get_logger(), "Goal canceled");
                return;
            }
    
            feedback->partial_sequence.push_back(
                feedback->partial_sequence[i - 1] + feedback->partial_sequence[i - 2]);
                goal_handle->publish_feedback(feedback);
                RCLCPP_INFO(this->get_logger(), "Feedback: %ld", feedback->partial_sequence.back());
                rate.sleep();
            }
    
            result->sequence = feedback->partial_sequence;
            goal_handle->succeed(result);
            RCLCPP_INFO(this->get_logger(), "Goal succeeded");
        }
    };
    
    int main(int argc, char ** argv){
        rclcpp::init(argc, argv);
        rclcpp::spin(std::make_shared<FibonacciActionServer>());
        rclcpp::shutdown();
        return 0;
    }
    
    #include <rclcpp/rclcpp.hpp>
    #include <rclcpp_action/rclcpp_action.hpp>
    #include "cpp_action_example/action/fibonacci.hpp"
    
    class FibonacciActionClient : public rclcpp::Node{
    public:
        using Fibonacci = cpp_action_example::action::Fibonacci;
        using GoalHandleFibonacci = rclcpp_action::ClientGoalHandle<Fibonacci>;
    
        explicit FibonacciActionClient(): Node("fibonacci_action_client"){
            this->client_ptr_ = rclcpp_action::create_client<Fibonacci>(this, "fibonacci");
            this->send_goal(10);
        }
    
        void send_goal(int order){
            if (!client_ptr_->wait_for_action_server(std::chrono::seconds(5))) {
                RCLCPP_ERROR(this->get_logger(), "Action server not available after waiting");
                return;
            }
    
            auto goal_msg = Fibonacci::Goal();
            goal_msg.order = order;
    
            auto send_goal_options = rclcpp_action::Client<Fibonacci>::SendGoalOptions();
            send_goal_options.feedback_callback = std::bind(&FibonacciActionClient::feedback_cb, this, std::placeholders::_1, std::placeholders::_2);
            send_goal_options.result_callback = std::bind(&FibonacciActionClient::result_cb, this, std::placeholders::_1);
    
            client_ptr_->async_send_goal(goal_msg, send_goal_options);
        }
    
    private:
        rclcpp_action::Client<Fibonacci>::SharedPtr client_ptr_;
    
        void feedback_cb(GoalHandleFibonacci::SharedPtr,
                        const std::shared_ptr<const Fibonacci::Feedback> feedback){
            RCLCPP_INFO(this->get_logger(), "Feedback: %ld", feedback->partial_sequence.back());
        }
    
        void result_cb(const GoalHandleFibonacci::WrappedResult & result){
            RCLCPP_INFO(this->get_logger(), "Result received.");
            for (auto number : result.result->sequence) {
                std::cout << number << " ";
            }
            std::cout << std::endl;
            rclcpp::shutdown();
        }
    };
    
    int main(int argc, char ** argv){
        rclcpp::init(argc, argv);
        auto node = std::make_shared<FibonacciActionClient>();
        rclcpp::spin(node);
        return 0;
    }
    
    find_package(ament_cmake REQUIRED)
    find_package(rclcpp REQUIRED)
    find_package(rclcpp_action REQUIRED)
    find_package(builtin_interfaces REQUIRED 
    find_package(rosidl_default_generators REQUIRED)
    
    rosidl_generate_interfaces(${PROJECT_NAME}
        "action/Fibonacci.action"
        DEPENDENCIES builtin_interfaces
    )
    
    add_executable(action_server src/action_server.cpp)
    ament_target_dependencies(action_server rclcpp rclcpp_action)
    
    add_executable(action_client src/action_client.cpp)
    ament_target_dependencies(action_client rclcpp rclcpp_action)
    
    install(TARGETS
        action_server
        action_client
        DESTINATION lib/${PROJECT_NAME}
    )
    
    ament_package()
    
    <?xml version="1.0"?>
    <package format="3">
        <name>cpp_action_example</name>
        <version>0.0.1</version>
        <description>Fibonacci action example in C++ for ROS 2</description>
    
        <maintainer email="your_email@example.com">Your Name</maintainer>
        <license>Apache License 2.0</license>
    
        <!-- 建構時所需的依賴 -->
        <buildtool_depend>ament_cmake</buildtool_depend>
    
        <build_depend>rclcpp</build_depend>
        <build_depend>rclcpp_action</build_depend>
        <build_depend>rosidl_default_generators</build_depend>
    
        <!-- 執行時所需的依賴 -->
        <exec_depend>rclcpp</exec_depend>
        <exec_depend>rclcpp_action</exec_depend>
        <exec_depend>rosidl_default_runtime</exec_depend>
    
        <!-- action 檔案支援 -->
        <member_of_group>rosidl_interface_packages</member_of_group>
    
        <export>
            <build_type>ament_cmake</build_type>
        </export>
    </package>
    
    1
    2
    3
    4
    5
    int32 order
    ---
    int32[] sequence
    ---
    int32[] partial_sequence
    
    import rclpy
    from rclpy.node import Node
    from rclpy.action import ActionServer
    from py_action_example.action import Fibonacci
    
    class FibonacciActionServer(Node):
        def __init__(self):
            super().__init__('fibonacci_action_server')
            self._action_server = ActionServer(
                self,
                Fibonacci,
                'fibonacci',
                self.execute_callback
            )
    
        async def execute_callback(self, goal_handle):
            self.get_logger().info('Executing goal...')
            result = Fibonacci.Result()
            feedback_msg = Fibonacci.Feedback()
            feedback_msg.partial_sequence = [0, 1]
    
            for i in range(2, goal_handle.request.order):
                feedback_msg.partial_sequence.append(
                    feedback_msg.partial_sequence[i - 1] + feedback_msg.partial_sequence[i - 2]
                )
                self.get_logger().info(f'Feedback: {feedback_msg.partial_sequence}')
                goal_handle.publish_feedback(feedback_msg)
                await rclpy.sleep(1)
    
            result.sequence = feedback_msg.partial_sequence
            goal_handle.succeed()
            return result
    
    def main(args=None):
        rclpy.init(args=args)
        node = FibonacciActionServer()
        rclpy.spin(node)
        rclpy.shutdown()
    
    if __name__ == '__main__':
        main()
    
    import rclpy
    from rclpy.node import Node
    from rclpy.action import ActionClient
    from py_action_example.action import Fibonacci
    
    class FibonacciActionClient(Node):
        def __init__(self):
            super().__init__('fibonacci_action_client')
            self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
    
        def send_goal(self, order):
            self._action_client.wait_for_server()
            goal_msg = Fibonacci.Goal()
            goal_msg.order = order
    
            self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback).add_done_callback(
                self.goal_response_callback
            )
    
        def goal_response_callback(self, future):
            goal_handle = future.result()
            if not goal_handle.accepted:
                self.get_logger().info('Goal rejected')
                return
    
            self.get_logger().info('Goal accepted')
            goal_handle.get_result_async().add_done_callback(self.result_callback)
    
        def feedback_callback(self, feedback_msg):
            self.get_logger().info(f'Received feedback: {feedback_msg.feedback.partial_sequence}')
    
        def result_callback(self, future):
            result = future.result().result
            self.get_logger().info(f'Result: {result.sequence}')
            rclpy.shutdown()
    
    def main(args=None):
        rclpy.init(args=args)
        action_client = FibonacciActionClient()
        action_client.send_goal(10)
        rclpy.spin(action_client)
    
    if __name__ == '__main__':
        main()
    
    from setuptools import setup
    
    package_name = 'py_action_example'
    
    setup(
        name=package_name,
        version='0.0.0',
        packages=[package_name],
        install_requires=['setuptools'],
        zip_safe=True,
        maintainer='you',
        description='Example of ROS 2 action in Python',
        entry_points={
            'console_scripts': [
                'action_server = py_action_example.action_server:main',
                'action_client = py_action_example.action_client:main',
            ],
        },
    )
    

    Checkpoint

    理解程式碼的涵義。


4. ROS Launch 功能

ROS2 Launch

ROS 2 Launch 系統提供一套高度模組化的方式,同時啟動多個節點、載入參數、命名空間分組與條件式執行等。
透過 Python Launch API,可撰寫更具彈性的啟動腳本,以支援開發與測試流程。

4-1. 基本範例

現在將先前的 Publisher-Subscriber、Service-Client、Action 範例整合進一個 Launch 檔,
方便一次啟動多個節點。

建立 launch pkg :ros2 pkg create all_launch_examples --build-type ament_python

檔案架構:

colcon_ws/
├── build/
├── install/
├── log/
├── src/
│   ├── cpp_pubsub/
│   ├── py_pubsub/
│   ├── cpp_srvcli/
│   ├── py_srvcli/
│   ├── cpp_action_example/
│   ├── py_action_example/
│   └── all_launch_examples/    # launch package
│       ├── launch/
│       │   └── all_examples_launch.py
│       ├── package.xml
│       ├── setup.py
│       ├── setup.cfg
│       └── resource/
│           └── all_launch_examples

程式碼:

from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    return LaunchDescription([
        # C++ Pub-Sub
        Node(package='cpp_pubsub', executable='talker', name='cpp_talker', output='screen'),
        Node(package='cpp_pubsub', executable='listener', name='cpp_listener', output='screen'),

        # Python Pub-Sub
        Node(package='py_pubsub', executable='talker', name='py_talker', output='screen'),
        Node(package='py_pubsub', executable='listener', name='py_listener', output='screen'),

        # C++ Service
        Node(package='cpp_srvcli', executable='add_two_ints_server', name='cpp_srv_server', output='screen'),
        Node(package='cpp_srvcli', executable='add_two_ints_client', name='cpp_srv_client', output='screen'),

        # Python Service
        Node(package='py_srvcli', executable='add_two_ints_server', name='py_srv_server', output='screen'),
        Node(package='py_srvcli', executable='add_two_ints_client', name='py_srv_client', output='screen'),

        # C++ Action
        Node(package='cpp_action_example', executable='action_server', name='cpp_action_server', output='screen'),
        Node(package='cpp_action_example', executable='action_client', name='cpp_action_client', output='screen'),

        # Python Action
        Node(package='py_action_example', executable='action_server', name='py_action_server', output='screen'),
        Node(package='py_action_example', executable='action_client', name='py_action_client', output='screen'),
    ])
from setuptools import setup

package_name = 'all_launch_examples'

setup(
    name=package_name,
    version='0.0.0',
    packages=[package_name],
    data_files=[
        ('share/' + package_name, ['package.xml']),
        ('share/' + package_name + '/launch', ['launch/all_examples_launch.py']),
        ('share/ament_index/resource_index/packages', ['resource/' + package_name]),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='your_name',
    maintainer_email='your_email@example.com',
    description='A launch package for pub-sub, service and action examples',
    license='Apache License 2.0',
)

編譯與執行:

cd ~/colcon_ws
colcon build --packages-select all_launch_examples
source install/setup.bash

ros2 launch all_launch_examples all_examples_launch.py
如何確定 YAML 正確匯入?

使用 CLI:

ros2 param list /cpp_talker_param
ros2 param get /cpp_talker_param message_prefix

Checkpoint

成功一鍵 launch 所有 ROS2 Nodes。


4-2. 載入 YAML 參數

載入 YAML 參數

在 ROS 2 中,參數設定可以透過 YAML 檔案集中管理,使 Launch 檔更簡潔。
適用於:需要多次調整參數的開發過程,或是有多組配置情境(例如實機與模擬)的應用。

檔案架構:

all_launch_examples/
├── config/
│   └── pub_params.yaml            # 參數設定檔
├── launch/
│   ├── all_examples_launch.py
│   └── talker_with_params.py      # 使用參數的 launch
├── package.xml
├── setup.py
├── setup.cfg
└── resource/
    └── all_launch_examples

程式碼:

1
2
3
4
5
6
7
8
9
cpp_pubsub:
ros__parameters:
    message_prefix: "Hello from CPP"
    publish_frequency: 2.0

py_pubsub:
ros__parameters:
    message_prefix: "Hello from Python"
    publish_frequency: 1.0
from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    return LaunchDescription([
        Node(
            package='cpp_pubsub',
            executable='talker',
            name='cpp_talker_param',
            parameters=['config/pub_params.yaml'],
            output='screen'
        ),
        Node(
            package='py_pubsub',
            executable='talker',
            name='py_talker_param',
            parameters=['config/pub_params.yaml'],
            output='screen'
        )
    ])
from setuptools import setup

package_name = 'all_launch_examples'

setup(
    name=package_name,
    version='0.0.0',
    packages=[package_name],
    data_files=[
        ('share/' + package_name, ['package.xml']),
        ('share/' + package_name + '/launch', [
            'launch/all_examples_launch.py',
            'launch/talker_with_params.py'
        ]),
        ('share/' + package_name + '/config', ['config/pub_params.yaml']),
        ('share/ament_index/resource_index/packages', ['resource/' + package_name]),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='your_name',
    maintainer_email='your_email@example.com',
    description='A launch package for pub-sub, service and action examples',
    license='Apache License 2.0',
)

編譯與執行:

cd ~/colcon_ws
colcon build --packages-select all_launch_examples
source install/setup.bash

ros2 launch all_launch_examples talker_with_params.py

Checkpoint

  1. 成功載入 YAML 參數至對應節點。
  2. 如何確認 publish_frequency 是否為 yaml 中的設置頻率?

4-3. Launch Arguments

Launch Arguments 簡介

Launch Arguments 是在 ROS 2 Launch 系統中用來讓使用者在啟動時傳遞參數的機制,
使得同一份 Launch 檔可根據不同需求,動態調整節點參數、執行行為或啟動條件。
適用於需要彈性配置、環境切換或多種執行模式的場景。

實作透過 Launch Args 決定是否啟動 py_talker 節點,並且指定參數 message_prefix

from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, LogInfo
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node

def generate_launch_description():
    # 宣告兩個 launch arguments:
    #   use_py_talker: bool,決定是否啟動 py_talker
    #   prefix: 傳給 py_talker 的 message_prefix 參數
    use_py_talker_arg = DeclareLaunchArgument(
        'use_py_talker',
        default_value='true',
        description='Whether to launch py_talker node'
    )
    prefix_arg = DeclareLaunchArgument(
        'prefix',
        default_value='Hello from launch args!',
        description='Message prefix parameter for py_talker'
    )

    use_py_talker = LaunchConfiguration('use_py_talker')
    prefix = LaunchConfiguration('prefix')

    # 定義節點,但包在條件中,只有 use_py_talker 是 true 時才啟動
    py_talker_node = Node(
        package='py_pubsub',
        executable='talker',
        name='py_talker_arg',
        output='screen',
        parameters=[{'message_prefix': prefix}]
    )

    return LaunchDescription([
        use_py_talker_arg,
        prefix_arg,
        LogInfo(msg=['use_py_talker = ', use_py_talker]),
        LogInfo(msg=['prefix = ', prefix]),
        # 使用 Python 條件判斷來控制是否加入節點
        # LaunchDescription 不支援 if else 直接加入節點,所以用 Condition 或 Python API 動態控制
        py_talker_node if use_py_talker.perform({}) == 'true' else None
    ])

編譯:

cd ~/colcon_ws
colcon build --packages-select all_launch_examples
source install/setup.bash

執行:

ros2 launch all_launch_examples launch_args_example.py
ros2 launch all_launch_examples launch_args_example.py use_py_talker:=false prefix:="Custom Prefix"
Launch Args vs. YAML
項目 Launch Arguments YAML 參數檔
用途 在啟動時動態設定參數 定義靜態參數,集中管理多個參數
設定方式 CLI 或 launch 檔內
DeclareLaunchArgument
YAML 檔案撰寫 +
Node(parameters=[...])
支援型別 通常為字串,需手動轉換型別 支援字串、整數、浮點、
布林、List 等原生型別
可攜性 / 易讀性 不易重複使用,維護成本高 結構清楚,便於版本控制與重複使用
彈性 高:可搭配 CLI 傳參、條件控制、
命名空間切換
中:依據檔案切換,
彈性取決於程式內讀取邏輯
典型使用場景 輕量參數切換、條件式啟動、
debug 模式開關
模組化設定(大量參數)、
多機參數配置
可否搭配 CLI ros2 launch pkg file.py arg:=value ✅ 可讀取固定 YAML 檔案
(但 CLI 不支援動態修改)
參數作用對象 可用於控制節點是否啟動、參數值 通常是傳入節點內部的 ros__parameters

Checkpoint

學會使用 Launch Arguments。