admin管理员组文章数量:1590168
ROS queue_size和buff_size设置
- 1. 缘由
- 2. 解决方案
- 3. 解析
- 3.1 queue_size
- 3.2 buff_size
- 3.3 Publisher的消息队列
- 3.4 Subscriber的消息队列
- 4. 演示
- 4.1. 延迟
- 4.2. 实时
1. 缘由
在ROS的Subscriber的Callback回调函数中不可避免的进行一些费时的操作
机器人应用中经常出现很费时的操作,比如运行AI模型、特征匹配等等
Subscriber所订阅的消息的发布频率可能是很高的
而这些操作的运算速度肯定达不到消息发布的速度
如激光雷达数据采集可以达到10Hz,而处理激光数据频率勉强1Hz。
所以,如果要是没有取舍的对于每个消息都调用一次回调函数
那么势必会导致计算越来越不实时,很有可能当下在处理的还是几十秒以前的数据
所以,希望每次回调函数都处理当前时刻最新的一个消息
2. 解决方案
- 设置 Publisher 的 queue_size 等于 1
- 设置 Subscriber 的 queue_size(消息队列大小)等于 1
- 设置 Subscriber 的 buff_size(缓冲区大小)足够大,大于一个消息的大小
pub = rospy.Publisher('chatter', String, queue_size=1)
rospy.Subscriber("chatter", String, callback, queue_size=1, buff_size=10000000)
3. 解析
3.1 queue_size
Publisher的消息队列是为了缓存发布节点发布的消息
一旦队列中消息的数量超过了queue_size
那么最先进入队列的(最老的)消息被舍弃
Subscriber的消息队列是为了缓存节点接收到的信息
一旦回调处理的速度过慢,接收到的消息数量超过了queue_size
那么最先进入队列的(最老的)消息会被舍弃
所以,想只处理最新的消息,实际上只需要把两个queue_size都设置成1
那么系统不会缓存数据,自然处理的就是最新的消息
3.2 buff_size
这个缓冲区的大小是指消息队列使用的缓冲区物理内存空间大小
如果这个空间小于一个消息所需要的空间
比如消息是一张图片5MB,数据量超过了缓冲区1MB的大小
这个时候为了保证通信不发生错误,就会触发网络通信的保护机制,TCP的Buffer会为你缓存消息
这种机制就会导致每一帧消息都被完整的缓存下来,没有消息被丢弃
感觉上就像queue_size被设置成了无穷大
如果缓冲区设置太小,新的数据也无法及时缓存
3.3 Publisher的消息队列
Publisher的消息队列是一定要有的
因为ROS中发布节点往外发送消息是基于Topic发送,而不是直接向Subscriber订阅者发送
所以必须要有一个消息队列来存放发布的消息,以供订阅者来获取
而且这个消息队列的好处是在网络差、带宽小、延时高的时候,保证数据不容易丢失
3.4 Subscriber的消息队列
由于ROS毕竟是分布式系统,Publisher和Subscriber不一定在同一台主机上
因此消息需要通过网络来交换
但是网络的性能时好时坏
如果Subscriber没有消息队列,那么每次运行Callback函数前都要先通过网络取回消息,然后才能处理
当网络很差时,就会让系统堵塞
而有消息队列的话,Subscriber就可以一边处理队列中的消息,一边通过网络缓存新的消息
而不用每次处理消息前都要临时去读一个回来
这样就增加了系统的可靠性
4. 演示
4.1. 延迟
默认发布话题
#!/usr/bin/env python3
# coding=utf-8
import rospy # 导入rospy包 ROS的python客户端
from std_msgs.msg import String # 导入std_msgs.msg包的String消息类型
# 定义发布函数 talker
def talker():
pub = rospy.Publisher('chatter', String, queue_size=10) # 定义发布器 pub 发布chatter
rospy.init_node('talker', anonymous=True) # 初始化节点talker
rate = rospy.Rate(10) # 初始化循环频率10HZ的对象rate
# 在程序没退出的情况下
while not rospy.is_shutdown():
hello_str = "hello world %s" % rospy.get_time()
rospy.loginfo(hello_str) # 消息打印到屏幕上
pub.publish(hello_str) # 发布消息
rate.sleep() # 休眠
if __name__ == '__main__':
try:
talker()
except rospy.ROSInterruptException:
pass
默认订阅话题
#!/usr/bin/env python3
# coding=utf-8
import rospy # 导入rospy包 ROS的python客户端
from std_msgs.msg import String # 导入std_msgs.msg包的String消息类型
# 回调函数
def callback(data):
rospy.loginfo(rospy.get_caller_id() + "I heard %s", data.data) # 消息打印到屏幕上
# 模拟复杂运行,耗时1s
rate = rospy.Rate(1)
rate.sleep()
def listener():
rospy.init_node('listener', anonymous=True) # 初始化节点listener
rospy.Subscriber("chatter", String, callback) # 初始化订阅器 订阅chatter
rospy.spin() # 保持主进程一直循环
if __name__ == '__main__':
listener()
可以看到接受到的数据,时间差就特别大了
4.2. 实时
实时发布话题
#!/usr/bin/env python3
# coding=utf-8
import rospy # 导入rospy包 ROS的python客户端
from std_msgs.msg import String # 导入std_msgs.msg包的String消息类型
# 定义发布函数 talker
def talker():
pub = rospy.Publisher('chatter', String, queue_size=1) # 定义发布器 pub 发布chatter
rospy.init_node('talker', anonymous=True) # 初始化节点talker
rate = rospy.Rate(10) # 初始化循环频率10HZ的对象rate
# 在程序没退出的情况下
while not rospy.is_shutdown():
hello_str = "hello world %s" % rospy.get_time()
rospy.loginfo(hello_str) # 消息打印到屏幕上
pub.publish(hello_str) # 发布消息
rate.sleep() # 休眠
if __name__ == '__main__':
try:
talker()
except rospy.ROSInterruptException:
pass
实时订阅话题
#!/usr/bin/env python3
# coding=utf-8
import rospy # 导入rospy包 ROS的python客户端
from std_msgs.msg import String # 导入std_msgs.msg包的String消息类型
# 回调函数
def callback(data):
rospy.loginfo(rospy.get_caller_id() + "I heard %s", data.data) # 消息打印到屏幕上
# 模拟复杂运行,耗时1s
rate = rospy.Rate(1)
rate.sleep()
def listener():
rospy.init_node('listener', anonymous=True) # 初始化节点listener
rospy.Subscriber("chatter", String, callback, queue_size=1, buff_size=100000) # 初始化订阅器 订阅chatter
rospy.spin() # 保持主进程一直循环
if __name__ == '__main__':
listener()
可以看到接受到的数据,时间差就相对较小
谢谢
版权声明:本文标题:ROS queue_size和buff_size设置 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://m.elefans.com/xitong/1728084369a1144813.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论