mqtt_client包提供了一个ROS节点或ROS 2组件节点,使连接的基于ROS的设备或机器人能够通过MQTT协议使用MQTT代理交换ROS消息。这适用于任意ROS消息类型。mqtt_client还可以与运行在非ROS设备上的MQTT客户端交换原始消息。
[!重要]
本仓库由亚琛工业大学汽车工程研究所(ika)开源维护。
V2X通信是我们车辆智能与自动驾驶领域众多研究主题之一。
如果您想了解更多关于我们如何支持您的自动驾驶或机器人项目,欢迎联系我们!
:email: opensource@ika.rwth-aachen.de
mqtt_client包作为官方ROS / ROS 2包发布,可以通过包管理器轻松安装。
sudo apt update sudo apt install ros-$ROS_DISTRO-mqtt-client
如果您想从源代码安装mqtt_client,只需将此仓库克隆到您的ROS工作空间中。所有在ROS package.xml
中列出的依赖项都可以使用rosdep安装。
# mqtt_client$ rosdep install -r --ignore-src --from-paths . # ROS # workspace$ catkin build -DCMAKE_BUILD_TYPE=Release mqtt_client # ROS 2 # workspace$ colcon build --packages-up-to mqtt_client --cmake-args -DCMAKE_BUILD_TYPE=Release
mqtt_client也可以作为Docker镜像使用,通过docker-ros容器化。
# ROS docker run --rm ghcr.io/ika-rwth-aachen/mqtt_client:ros # ROS 2 docker run --rm ghcr.io/ika-rwth-aachen/mqtt_client:ros2
mqtt_client可以轻松集成到现有的基于ROS的系统中。以下首先提供了在单台机器上测试mqtt_client的快速入门指南。然后,在更复杂的应用中如何启动和配置它的更多细节将被呈现。
按照以下步骤快速启动一个工作的mqtt_client,它通过MQTT代理将ROS消息发送给自己。
假设MQTT代理(如Mosquitto)正在localhost:1883
上运行。
对于此演示,您可以使用Docker轻松启动带有默认配置的Mosquitto。
docker run --rm --network host --name mosquitto eclipse-mosquitto
有关设置自己的代理的更高级说明,请查看我们在Docker中运行启用了身份验证和加密的MQTT代理的说明这里。
mqtt_client最好使用ROS参数yaml文件进行配置。下面显示的配置(另请参见params.yaml
/ params.ros2.yaml
)允许如下交换消息:
/ping/ros
上接收的ROS消息被发送到代理的MQTT主题pingpong/ros
上;pingpong/ros
接收的MQTT消息在本地ROS主题/pong/ros
上发布;/ping/primitive
上接收的原始ROS消息作为原始(字符串)消息发送到代理的MQTT主题pingpong/primitive
上;pingpong/primitive
接收的MQTT消息作为原始ROS消息在本地ROS主 题/pong/primitive
上发布。broker: host: localhost port: 1883 bridge: ros2mqtt: - ros_topic: /ping/ros mqtt_topic: pingpong/ros - ros_topic: /ping/primitive mqtt_topic: pingpong/primitive primitive: true mqtt2ros: - mqtt_topic: pingpong/ros ros_topic: /pong/ros - mqtt_topic: pingpong/primitive ros_topic: /pong/primitive primitive: true
构建ROS工作空间后,使用预配置的演示参数通过roslaunch启动mqtt_client节点,应该会产生以下输出。
# ROS roslaunch mqtt_client standalone.launch # ROS 2 ros2 launch mqtt_client standalone.launch.ros2.xml
[ WARN] [1665575657.358869079]: 参数'broker/tls/enabled'未设置,默认为'0' [ WARN] [1665575657.359798329]: 参数'client/id'未设置,默认为'' [ WARN] [1665575657.359810889]: 客户端ID为空时无法启用客户端缓冲 [ WARN] [1665575657.360300703]: 参数'client/clean_session'未设置,默认为'1' [ WARN] [1665575657.360576344]: 参数'client/keep_alive_interval'未设置,默认为'60.000000' [ WARN] [1665575657.360847295]: 参数'client/max_inflight'未设置,默认为'65535'
[ INFO] [1665575657.361281461]: 正在将ROS主题'/ping/ros'桥接到MQTT主题'pingpong/ros' [ INFO] [1665575657.361303380]: 正在将原始ROS主题'/ping/primitive'桥接到MQTT主题'pingpong/primitive' [ INFO] [1665575657.361352809]: 正在将MQTT主题'pingpong/ros'桥接到ROS主题'/pong/ros' [ INFO] [1665575657.361370558]: 正在将MQTT主题'pingpong/primitive'桥接到原始ROS主题'/pong/primitive' [ INFO] [1665575657.362153083]: 正在连接代理'tcp://localhost:1883'... [ INFO] [1665575657.462622065]: 已连接到代理'tcp://localhost:1883'
请注意,*mqtt_client*成功连接到代理并回显了正在桥接的ROS/MQTT主题。为了测试*mqtt_client*与自身及其他MQTT客户端之间的通信,请打开五个新终端。
为了测试*mqtt_clients*之间的通信,在ROS主题`/ping/ros`上发布任何ROS消息,并等待ROS主题`/pong/ros`上的响应。
```bash
# 第1个终端:向/ping发布ROS消息
# ROS
rostopic pub -r 1 /ping/ros std_msgs/String "Hello MQTT"
# ROS 2
ros2 topic pub /ping/ros std_msgs/msg/String "{data: \"Hello MQTT\"}"
# 第2个终端:监听/pong上的ROS消息 # ROS rostopic echo /pong/ros # ROS 2 ros2 topic echo /pong/ros
为了测试mqtt_client与其他MQTT客户端之间的通信,在ROS主题/ping/primitive
上发布原始ROS消息,直接在MQTT主题pingpong/primitive
上发布原始MQTT消息,并等待ROS主题/pong/primitive
上的响应。请注意,您需要使用不同的配置文件重新启动ROS 2的mqtt_client。
# ROS 2 # mqtt_client$ ros2 launch mqtt_client standalone.launch.ros2.xml params_file:=$(ros2 pkg prefix mqtt_client)/share/mqtt_client/config/params.ros2.primitive.yaml
# 第3个终端:向/ping/primitive发布原始ROS消息 # ROS rostopic pub -r 1 /ping/primitive std_msgs/Int32 42 # ROS2 ros2 topic pub /ping/primitive std_msgs/msg/Int32 "{data: 42}"
# 第4个终端:监听/pong/primitive上的原始ROS消息 # ROS rostopic echo /pong/primitive # ROS2 ros2 topic echo /pong/primitive
# 第5个终端:使用mosquitto_pub直接向pingpong/primitive发布原始MQTT消息 docker run --rm --network host eclipse-mosquitto mosquitto_pub -h localhost -t "pingpong/primitive" --repeat 20 --repeat-delay 1 -m 69
如果一切正常,第二个终端应以1Hz的频率打印消息,而第四个终端应以1Hz的频率打印两个不同的消息。
您可以使用以下命令启动mqtt_client节点:
# ROS roslaunch mqtt_client standalone.launch # ROS 2 ros2 launch mqtt_client standalone.launch.ros2.xml
这将自动加载提供的演示params.yaml
/ params.ros2.yaml
。如果您想加载自定义配置文件,只需传递params_file
。
# ROS roslaunch mqtt_client standalone.launch params_file:="</PATH/TO/PARAMS.YAML>" # ROS 2 ros2 launch mqtt_client standalone.launch.ros2.xml params_file:="</PATH/TO/PARAMS.YAML>"
为了充分利用mqtt_client作为ROS节点/ROS 2组件的优势,将节点/组件加载到您自己的节点管理器/组件容器中。
以下列出了mqtt_client支持的所有可用ROS参数及其默认值(在[]
中)。
broker: host: # [localhost] 运行MQTT代理的机器的IP地址或主机名 port: # [1883] MQTT代理监听的端口 user: # 用于向代理进行身份验证的用户名(如果为空,将尝试匿名连接) pass: # 用于向代理进行身份验证的密码 tls: enabled: # [false] 是否通过SSL/TLS连接 ca_certificate: # [/etc/ssl/certs/ca-certificates.crt] 客户端信任的CA证书文件(相对于ROS_HOME)
client: id: # 用于标识客户端的唯一ID字符串(代理可能允许空ID并自动生成一个) buffer: size: # [0] 未连接到代理时桥接缓冲的最大消息数(仅当客户端ID不为空时可用) directory: # [buffer] 未连接到代理时用于缓冲消息的目录(相对于ROS_HOME) last_will: topic: # 用于此客户端遗嘱消息的主题(如果未指定,则无遗嘱) message: # [offline] 遗嘱消息 qos: # [0] 遗嘱消息的QoS值 retained: # [false] 是否保留遗嘱消息 clean_session: # [true] 是否为此客户端使用清理会话 keep_alive_interval: # [60.0] 保活间隔(秒) max_inflight: # [65535] 最大同时发送的消息数 tls: certificate: # 客户端证书文件(仅当代理要求客户端证书时需要;相对于ROS_HOME) key: # 客户端私钥文件(相对于ROS_HOME) password: # 客户端私钥密码 version: # TLS版本(https://github.com/eclipse/paho.mqtt.cpp/blob/master/src/mqtt/ssl_options.h#L305) verify: # 验证客户端是否应进行连接后检查 alpn_protos: # ALPN协议列表(https://www.openssl.org/docs/man1.1.1/man3/SSL_CTX_set_alpn_protos.html)
bridge: ros2mqtt: # 指定将哪些ROS主题映射到哪些MQTT主题的数组 - ros_topic: # 其消息被转换为MQTT消息的ROS主题 mqtt_topic: # 相应的ROS消息被发送到代理的MQTT主题 primitive: # [false] 是否作为原始消息发布 inject_timestamp: # [false] 是否在ROS2MQTT负载中附加时间戳(用于接收端的延迟计算) advanced: ros: queue_size: # [1] ROS订阅者队列大小 mqtt: qos: # [0] MQTT QoS值 retained: # [false] 是否保留MQTT消息 mqtt2ros: # 指定将哪些MQTT主题映射到哪些ROS主题的数组 - mqtt_topic: # 从代理接收消息的MQTT主题 ros_topic: # 相应的MQTT消息被发布的ROS主题 primitive: # [false] 是否作为原始消息发布(如果来自非ROS MQTT客户端) advanced: mqtt: qos: # [0] MQTT QoS值 ros: queue_size: # [1] ROS发布者队列大小 latched: # [false] 是否锁存ROS消息 ##### ROS 2 ```yaml bridge: ros2mqtt: # 指定将哪些ROS主题映射到哪些MQTT主题的对象 ros_topics: # 指定要桥接的ROS主题的数组 - {{ ros_topic_name }} # 应该被桥接的ROS主题,对应YAML中的子对象 {{ ros_topic_name }}: mqtt_topic: # 相应的ROS消息发送到代理的MQTT主题 primitive: # [false] 是否作为原始消息发布 ros_type: # [*空*] 如果设置,将使用提供的ROS消息类型。如果为空,则通过发布者自动推断类型 inject_timestamp: # [false] 是否在ROS2MQTT负载中附加时间戳(用于接收端计算延迟) advanced: ros: queue_size: # [1] ROS订阅者队列大小 qos: reliability: # [auto] "auto"、"system_default"、"reliable"、"best_effort"之一。如果为auto,则通过发布者自动确定QoS durability: # [auto] "auto"、"system_default"、"volatile"、"transient_local"之一。如果为auto,则通过发布者自动确定QoS mqtt: qos: # [0] MQTT QoS值 retained: # [false] 是否保留MQTT消息 mqtt2ros: # 指定将哪些MQTT主题映射到哪些ROS主题的对象 mqtt_topics: # 指定要桥接的ROS主题的数组 - {{ mqtt_topic_name }} # 应该被桥接的MQTT主题,对应YAML中的子对象 {{ mqtt_topic_name }}: ros_topic: # 相应的MQTT消息发布的ROS主 题 ros_type: # [*空*] 如果设置,将使用提供的ROS消息类型。如果为空,则通过MQTT消息自动推断类型 primitive: # [false] 是否作为原始消息发布(如果来自非ROS MQTT客户端) advanced: mqtt: qos: # [0] MQTT QoS值 ros: queue_size: # [1] ROS发布者队列大小 latched: # [false] 是否锁存ROS消息 qos: reliability: # [system_default] "system_default"、"reliable"、"best_effort"之一 durability: # [system_default] "system_default"、"volatile"、"transient_local"之一
如快速入门所示,mqtt_client不仅可以与其他mqtt_clients交换任意ROS消息,还可以与其他非mqtt_client MQTT客户端交换原始消息数据。这允许基于ROS的设备与不基于ROS的设备交换原始消息。primitive
参数可以为ROS到MQTT(bridge/ros2mqtt
)和MQTT到ROS(bridge/mqtt2ros
)传输设置。
如果将ROS到MQTT传输配置为primitive
,并且ROS消息类型是支持的原始ROS消息类型之一,则原始数据将作为字符串发布。支持的原始ROS消息类型有std_msgs/String
、std_msgs/Bool
、std_msgs/Char
、std_msgs/UInt8
、std_msgs/UInt16
、std_msgs/UInt32
、std_msgs/UInt64
、std_msgs/Int8
、std_msgs/Int16
、std_msgs/Int32
、std_msgs/Int64
、std_msgs/Float32
、std_msgs/Float32
。
如果将MQTT到ROS传输配置为primitive
,则MQTT消息将被解释并发布为原始数据类型(如果可能)。消息按以下顺序探测:bool
(std_msgs/Bool
)、int
(std_msgs/Int32
)、float
(std_msgs/Float32
)、string
(std_msgs/String
)。
mqtt_client提供内置功能,用于测量通过MQTT代理将ROS消息传输回ROS的延迟。注意,此功能仅适用于非原始消息(参见原始消息)。为此,发送客户端将当前时间戳注入MQTT消息中。接收客户端随后可以计算消息接收时间与注入时间戳之间的延迟。请注意,这仅在发送和接收机器上的时钟同步程度内准确。
为了将当前时间戳注入传出的MQTT消息中,必须为相应的bridge/ros2mqtt
条目设置inject_timestamp
参数。接收的mqtt_client随后会自动将测量的延迟(以秒为单位)作为ROS std_msgs/Float64
消息发布在主题/<mqtt_client_name>/latencies/<mqtt2ros/ros_topic>
上。
这些延迟可以使用rostopic echo轻松打印
# ROS rostopic echo --clear /<mqtt_client_name>/latencies/<mqtt2ros/ros_topic>/data # ROS 2 ros2 topic echo /<mqtt_client_name>/latencies/<mqtt2ros/ros_topic>/data
或使用rqt_plot绘制:
# ROS rosrun rqt_plot rqt_plot /<mqtt_client_name>/latencies/<mqtt2ros/ros_topic>/data # ROS 2 ros2 run rqt_plot rqt_plot /<mqtt_client_name>/latencies/<mqtt2ros/ros_topic>/data
这个简短的包摘要按照ROS Wiki风格指南记录了该包。
mqtt_client/MqttClient
使连接的基于ROS的设备或机器人能够通过MQTT代理使用MQTT协议交换ROS消息。
<bridge/ros2mqtt[*]/ros_topic>
(topic_tools/ShapeShifter
)
ROS主题,其消息被转换为MQTT消息并发送到MQTT代理。可以有任意ROS消息类型。<bridge/mqtt2ros[*]/ros_topic>
(topic_tools/ShapeShifter
)
从MQTT代理接收到的MQTT消息在此ROS主题上发布。可以有任意ROS消息类型。~/latencies/<bridge/mqtt2ros[*]/ros_topic>
(std_msgs/Float64
)
如果接收到的消息中注入了时间戳(参见延迟计算),则在此处发布传输到 <bridge/mqtt2ros[*]/ros_topic>
的消息的延迟测量结果。~is_connected
(mqtt_client/srv/IsConnected
)
返回客户端是否已连接到MQTT代理。参见配置。
mqtt_client/MqttClient
使连接的基于ROS的设备或机器人能够通过MQTT代理使用MQTT协议交换ROS消息。
<bridge/ros2mqtt/ros_topic>
(rclcpp::SerializedMessage
)
将其消息转换为MQTT消息并发送到MQTT代理的ROS主题。可以是任意ROS消息类型。<bridge/mqtt2ros/ros_topic>
(rclcpp::SerializedMessage
)
在此ROS主题上发布从MQTT代理接收到的MQTT消息。可以是任意ROS消息类型。~/latencies/<bridge/mqtt2ros/ros_topic>
(std_msgs/Float64
)
如果接收到的消息中注入了时间戳(参见延迟计算),则在此处发布传输到 <bridge/mqtt2ros/ros_topic>
的消息的延迟测量结果。~/is_connected
(mqtt_client/srv/IsConnected
)
返回客户端是否已连接到MQTT代理。
~/new_ros2mqtt_bridge
(mqtt_client/srv/NewRos2MqttBridge
)
返回是否创建了新的ROS -> MQTT桥接。
~/new_mqtt2ros_bridge
(mqtt_client/srv/NewMqtt2RosBridge
)
返回是否创建了新的MQTT -> ROS桥接。
参见配置。
mqtt_client 能够将任意消息类型的ROS消息桥接到MQTT代理。为此,它需要使用在运行时才确定形状的通用ROS订阅者和发布者。
这些通用的ROS订阅者和发布者通过topic_tools::ShapeShifter实现。对于在bridge/ros2mqtt/
下指定的每对ros_topic
和mqtt_topic
,都会设置一个具有以下回调签名的ROS订阅者:
void ros2mqtt(topic_tools::ShapeShifter::ConstPtr&, std::string&)
在回调内部,使用ros::serialization对在ros_topic
上接收到的通用消息进行序列化。然后,序列化后的形式就可以在指定的mqtt_topic
上发送到MQTT代理了。
在检索MQTT消息时,它会作为ROS消息在ROS网络上重新发布。为此,使用topic_tools::ShapeShifter::morph使ShapeShifter发布者采用特定ROS消息类型的形状。
但是,有关ROS消息类型的必要元信息只能在发布mqtt_client的ROS订阅者回调中通过调用topic_tools::ShapeShifter::getMD5Sum、topic_tools::ShapeShifter::getDataType和topic_tools::ShapeShifter::getMessageDefinition来提取。这些属性被包装在自定义类型mqtt_client::RosMsgType的ROS消息中,使用ros::serialization进行序列化,并通过MQTT代理在特殊主题上共享。
当mqtt_client接收到这样的ROS消息类型元信息时,它会使用topic_tools::ShapeShifter::morph配置相应的ROS ShapeShifter发布者。
mqtt_client还提供了测量通过MQTT代理将ROS消息传输回ROS的延迟的功能。为此,发送客户端将当前时间戳注入MQTT消息中。接收客户端随后可以计算消息接收时间与注入时间戳之间的延迟。有关是否注入时间戳的信息也包含在之前发送的自定义mqtt_client::RosMsgType消息中。实际的std::vector<uint8>
消息负载采用以下形式之一:
[... 序列化的时间戳 ... | ... 序列化的ROS消息 ...] [... 序列化的ROS消息 ...]
总结一下,数据流如下:
<ros2mqtt_ros_topic>
上接收到任意类型的ROS消息,并传递给通用回调
RosMsgType