admin管理员组

文章数量:1590168

ROS queue_size和buff_size设置

  • 1. 缘由
  • 2. 解决方案
  • 3. 解析
    • 3.1 queue_size
    • 3.2 buff_size
    • 3.3 Publisher的消息队列
    • 3.4 Subscriber的消息队列
  • 4. 演示
    • 4.1. 延迟
    • 4.2. 实时


1. 缘由

在ROS的Subscriber的Callback回调函数中不可避免的进行一些费时的操作
机器人应用中经常出现很费时的操作,比如运行AI模型、特征匹配等等

Subscriber所订阅的消息的发布频率可能是很高的
而这些操作的运算速度肯定达不到消息发布的速度

如激光雷达数据采集可以达到10Hz,而处理激光数据频率勉强1Hz。

所以,如果要是没有取舍的对于每个消息都调用一次回调函数
那么势必会导致计算越来越不实时,很有可能当下在处理的还是几十秒以前的数据

所以,希望每次回调函数都处理当前时刻最新的一个消息


2. 解决方案

  1. 设置 Publisher 的 queue_size 等于 1
  2. 设置 Subscriber 的 queue_size(消息队列大小)等于 1
  3. 设置 Subscriber 的 buff_size(缓冲区大小)足够大,大于一个消息的大小
pub = rospy.Publisher('chatter', String, queue_size=1)
rospy.Subscriber("chatter", String, callback, queue_size=1, buff_size=10000000)

3. 解析


3.1 queue_size

Publisher的消息队列是为了缓存发布节点发布的消息
一旦队列中消息的数量超过了queue_size
那么最先进入队列的(最老的)消息被舍弃

Subscriber的消息队列是为了缓存节点接收到的信息
一旦回调处理的速度过慢,接收到的消息数量超过了queue_size
那么最先进入队列的(最老的)消息会被舍弃

所以,想只处理最新的消息,实际上只需要把两个queue_size都设置成1
那么系统不会缓存数据,自然处理的就是最新的消息


3.2 buff_size

这个缓冲区的大小是指消息队列使用的缓冲区物理内存空间大小

如果这个空间小于一个消息所需要的空间
比如消息是一张图片5MB,数据量超过了缓冲区1MB的大小
这个时候为了保证通信不发生错误,就会触发网络通信的保护机制,TCP的Buffer会为你缓存消息
这种机制就会导致每一帧消息都被完整的缓存下来,没有消息被丢弃
感觉上就像queue_size被设置成了无穷大
如果缓冲区设置太小,新的数据也无法及时缓存


3.3 Publisher的消息队列

Publisher的消息队列是一定要有的
因为ROS中发布节点往外发送消息是基于Topic发送,而不是直接向Subscriber订阅者发送
所以必须要有一个消息队列来存放发布的消息,以供订阅者来获取
而且这个消息队列的好处是在网络差、带宽小、延时高的时候,保证数据不容易丢失


3.4 Subscriber的消息队列

由于ROS毕竟是分布式系统,Publisher和Subscriber不一定在同一台主机上
因此消息需要通过网络来交换

但是网络的性能时好时坏
如果Subscriber没有消息队列,那么每次运行Callback函数前都要先通过网络取回消息,然后才能处理
当网络很差时,就会让系统堵塞

而有消息队列的话,Subscriber就可以一边处理队列中的消息,一边通过网络缓存新的消息
而不用每次处理消息前都要临时去读一个回来
这样就增加了系统的可靠性


4. 演示


4.1. 延迟

默认发布话题

#!/usr/bin/env python3
# coding=utf-8
import rospy  # 导入rospy包 ROS的python客户端
from std_msgs.msg import String  # 导入std_msgs.msg包的String消息类型


# 定义发布函数 talker
def talker():
    pub = rospy.Publisher('chatter', String, queue_size=10)  # 定义发布器 pub 发布chatter
    rospy.init_node('talker', anonymous=True)  # 初始化节点talker
    rate = rospy.Rate(10)  # 初始化循环频率10HZ的对象rate

    # 在程序没退出的情况下
    while not rospy.is_shutdown():
        hello_str = "hello world %s" % rospy.get_time()
        rospy.loginfo(hello_str)  # 消息打印到屏幕上
        pub.publish(hello_str)  # 发布消息
        rate.sleep()  # 休眠


if __name__ == '__main__':
    try:
        talker()
    except rospy.ROSInterruptException:
        pass

默认订阅话题

#!/usr/bin/env python3
# coding=utf-8
import rospy  # 导入rospy包 ROS的python客户端
from std_msgs.msg import String  # 导入std_msgs.msg包的String消息类型


# 回调函数
def callback(data):
    rospy.loginfo(rospy.get_caller_id() + "I heard %s", data.data)  # 消息打印到屏幕上
    # 模拟复杂运行,耗时1s
    rate = rospy.Rate(1)
    rate.sleep()


def listener():
    rospy.init_node('listener', anonymous=True)  # 初始化节点listener
    rospy.Subscriber("chatter", String, callback)  # 初始化订阅器 订阅chatter
    rospy.spin()  # 保持主进程一直循环


if __name__ == '__main__':
    listener()

可以看到接受到的数据,时间差就特别大了


4.2. 实时

实时发布话题

#!/usr/bin/env python3
# coding=utf-8
import rospy  # 导入rospy包 ROS的python客户端
from std_msgs.msg import String  # 导入std_msgs.msg包的String消息类型


# 定义发布函数 talker
def talker():
    pub = rospy.Publisher('chatter', String, queue_size=1)  # 定义发布器 pub 发布chatter
    rospy.init_node('talker', anonymous=True)  # 初始化节点talker
    rate = rospy.Rate(10)  # 初始化循环频率10HZ的对象rate

    # 在程序没退出的情况下
    while not rospy.is_shutdown():
        hello_str = "hello world %s" % rospy.get_time()
        rospy.loginfo(hello_str)  # 消息打印到屏幕上
        pub.publish(hello_str)  # 发布消息
        rate.sleep()  # 休眠


if __name__ == '__main__':
    try:
        talker()
    except rospy.ROSInterruptException:
        pass

实时订阅话题

#!/usr/bin/env python3
# coding=utf-8
import rospy  # 导入rospy包 ROS的python客户端
from std_msgs.msg import String  # 导入std_msgs.msg包的String消息类型


# 回调函数
def callback(data):
    rospy.loginfo(rospy.get_caller_id() + "I heard %s", data.data)  # 消息打印到屏幕上
    # 模拟复杂运行,耗时1s
    rate = rospy.Rate(1)
    rate.sleep()


def listener():
    rospy.init_node('listener', anonymous=True)  # 初始化节点listener
    rospy.Subscriber("chatter", String, callback, queue_size=1, buff_size=100000)  # 初始化订阅器 订阅chatter
    rospy.spin()  # 保持主进程一直循环


if __name__ == '__main__':
    listener()

可以看到接受到的数据,时间差就相对较小


谢谢

本文标签: rosqueuesizebuffsize