0% found this document useful (0 votes)
58 views11 pages

ROS Spinning, Threading, Queuing: Effective Use of Multi Spinner Threads, Different Queues in ROS

Download as pdf or txt
Download as pdf or txt
Download as pdf or txt
You are on page 1/ 11

23/04/2021 ROS publisher queue, subscriber queue, callback queue and spinner threads tutorial | Level Up Coding

You have 2 free member-only stories left this month. Sign up for Medium and get an extra one

ROS Spinning, Threading, Queuing


Effective use of multi spinner threads, different queues in ROS

Kohei Otsuka
Aug 6, 2020 · 8 min read

Robot Operating System (ROS)¹ is a popular framework for developing robotics


application. One of the main facilities that ROS provides is the publisher-subscriber
communication² that is used for message-passing between components. While it is easy
to implement the publisher-subscriber, it is not obvious what is going on underneath it
and how we can tailor it to our specific problems in practice. In this post, I will talk
about the mechanisms behind the publisher-subscriber in ROS such as its execution
model, threading, queuing mechanism. To keep the discussion in practical level, I will
use simple reference programs that were derived from the original tutorial³ from ROS.
Hopefully by the end of the post, you understand those concepts and the trade-offs so
that you can decide what is the best approach for your applications.

Subscriber queue
As I said, I am going to use an example package from ROS tutorial as a base for the rest
of the discussion and make changes on it as we continue. Our example has 2 nodes,
one is called “talker” and the other one is called “listener”.

https://levelup.gitconnected.com/ros-spinning-threading-queuing-aac9c0a793f 1/11
23/04/2021 ROS publisher queue, subscriber queue, callback queue and spinner threads tutorial | Level Up Coding

Talker
The talker node publishes topic “chatter” and listener node subscribes to the topic. The
talker implementation looks like below,

#include <sstream>
#include "ros/ros.h"
#include "std_msgs/String.h"

int main(int argc, char **argv) {


ros::init(argc, argv, "talker");
ros::NodeHandle n;

ros::Publisher chatter_pub =
n.advertise<std_msgs::String("chatter", 1000);

int chatter_count = 0;

ros::Timer timer = n.createTimer(ros::Duration(0.01),


[&](const ros::TimerEvent&) {
std_msgs::String msg;
std::stringstream ss;
ss << "chatter messages: " << chatter_count;
msg.data = ss.str();
ROS_INFO("%s", msg.data.c_str());
chatter_pub.publish(msg);
chatter_count++;
});

ros::spin();

return 0;
}

I changed the original sample code to use a ros::Timer here to explain about it. As
shown below, when we create a timer, it creates a new thread (Let’s call it “Timer
thread”). The timer thread keeps adding the given callback to a callback queue that is
automatically created during initialization. In this example, the given callback is
specified as a lambda⁴ function.Then, when we call ros::spin() , it keeps taking the

callback from the callback queue and executing them one by one in an infinite loop
(acting as a “spinning” thread).

https://levelup.gitconnected.com/ros-spinning-threading-queuing-aac9c0a793f 2/11
23/04/2021 ROS publisher queue, subscriber queue, callback queue and spinner threads tutorial | Level Up Coding

In the given callback function(the lambda function), it creates a message and publishes
it with a counter value which is incremented every time the callback function is called.
The frequency of the timer is set to 100 Hz(0.01 seconds duration). So the timer thread
will put the callback functions to the callback queue at every 0.01 seconds. Note here
that 1000 is specified as the size of publisher queue when it advertises the topic to
publish. As shown below, the publisher queue is another queue like callback queue, but
the queue is for queuing published message which is filled every time publish()

function is called.

There is a separate thread that is responsible for taking the message from the publisher
queue and send it to subscribers of the topic if there are any. If you are calling the
publish() more quickly than the publisher thread can send the messages, the messages

start piling up in the publisher queue and if it reaches over the specified queue size (in
this case 1000), the old message starts to get overwritten by new messages.

Listener
The first version of the listener node implementation looks like below.

#include "ros/ros.h"
#include "std_msgs/String.h"
https://levelup.gitconnected.com/ros-spinning-threading-queuing-aac9c0a793f 3/11
23/04/2021 ROS publisher queue, subscriber queue, callback queue and spinner threads tutorial | Level Up Coding

void ChatterCallback(const std_msgs::String::ConstPtr& msg) {


ROS_INFO(" I heard: [%s]", msg->data.c_str());
}

int main(int argc, char **argv) {


ros::init(argc, argv, "listener");
ros::NodeHandle n;

ros::Subscriber sub = n.subscribe("chatter", 1, ChatterCallback);


ros::spin();

return 0;
}

As shown below, when the listener node is initialized, it creates a thread that is
responsible for receiving messages it has subscribed to. Each subscriber has a queue for
queuing incoming messages. Then, the corresponding callback registered to the
subscriber is put to the callback queue together with the message from the subscriber
queue.

There is a separate thread (let’s call it “Spinner thread”) that is responsible for taking
the callback from the callback queue and execute the callback one by one. Similar to
the case of the publisher queue mentioned above, if the subscriber queue receives
message more often than the spinner thread process the callbacks, the received
messages starts piling up in the subscriber queue. If it reaches over the specified queue
size, the old messages in the subscriber queue get overwritten with new received
messages.

In this example, the subscriber queue size is set to 1. This means it does not queue up
incoming messages but it keeps only the single latest message in the subscriber queue.
This approach is valid if you need to process always only the latest message with the
smallest latency (it does not need to wait until older messages in the queue are

https://levelup.gitconnected.com/ros-spinning-threading-queuing-aac9c0a793f 4/11
23/04/2021 ROS publisher queue, subscriber queue, callback queue and spinner threads tutorial | Level Up Coding

processed) and if you want to save memory usage (for the subscriber queue) in
exchange for the risk of dropping messages.

It seems like an avoidable trade-off, but actually, by using multi spinner


threads(discussed in the next section), we can prevent the messages from dropping
while keeping the advantage of having small subscriber queue.

Multi Spinner threads


In ROS, we can increase the number of the spinner threads so that the listener node
can process callbacks from the callback queue more quickly as shown below. This also
means that it can process the messages in the subscriber queue faster so we can prevent
old messages to get overwritten.

To increase the number of the spinner threads, you can use


ros::MultiThreadedSpinner as shown below in the listener node.

void ChatterCallback(const std_msgs::String::ConstPtr& msg) {


ROS_INFO(" I heard: [%s]", msg->data.c_str());
std::this_thread::sleep_for(0.02s);
}

int main(int argc, char **argv) {


ros::init(argc, argv, "listener");
ros::NodeHandle n;

ros::Subscriber sub = n.subscribe("chatter", 1, ChatterCallback);

ros::MultiThreadedSpinner spinner(2);
spinner.spin();

return 0;
}

https://levelup.gitconnected.com/ros-spinning-threading-queuing-aac9c0a793f 5/11
23/04/2021 ROS publisher queue, subscriber queue, callback queue and spinner threads tutorial | Level Up Coding

It is specified to use 2 spinner threads. In theory, this should prevent the messages
from dropping. However, if we execute the both talker and listener nodes again, still
the listener node is dropping some messages.

The reason is because ROS prevents a callback registered in one subscriber from being
called simultaneously by multiple threads by default. So, even though there are 2
spinner threads processing callbacks from the callback queue, they cannot execute the
ChatterCallback at the same time. One spinner thread always has to wait until the
other one finishes the execution, that’s why it did not improve the throughput of
processing callbacks.

To solve the problem, we can specify so that ChatterCallback can be called


concurrently. The implementation after the required change is shown below.

void ChatterCallback(const std_msgs::String::ConstPtr& msg) {


ROS_INFO(" I heard: [%s]", msg->data.c_str());
std::this_thread::sleep_for(0.02s);
}

int main(int argc, char **argv) {


ros::init(argc, argv, "listener");
ros::NodeHandle n;

ros::SubscribeOptions ops;
ops.template init<std_msgs::String>("chatter", 1,
ChatterCallback);

https://levelup.gitconnected.com/ros-spinning-threading-queuing-aac9c0a793f 6/11
23/04/2021 ROS publisher queue, subscriber queue, callback queue and spinner threads tutorial | Level Up Coding

ops.allow_concurrent_callbacks = true;
ros::Subscriber sub1 = n.subscribe(ops);

ros::MultiThreadedSpinner spinner(2);
spinner.spin();

return 0;
}

With this change, the 2 spinner threads can execute the


ChatterCallback simultaneously, so listener node can process the messages without

dropping any messages.

Multi Callback queues


So far we talked about the situation when the listener node is subscribing to only one
topic. However, in practice, a typical node subscribes to multiple topics, so it has
multiple subscriber queues as shown below.

This could bring up new issue which we did not face when the listener node was
subscribing to only one topic. Let’s assume that the listener node subscribes to topic A
and B as shown above. And assume the message A is a critical message that has to be
processed as soon as the listener node receives it. On the other hand, the message B is
less critical message that sent more often than message A.

As shown below, if there is only one callback queue which is FIFO queue (first in, first
out), the callback for the message A cannot be processed until all three callbacks for
https://levelup.gitconnected.com/ros-spinning-threading-queuing-aac9c0a793f 7/11
23/04/2021 ROS publisher queue, subscriber queue, callback queue and spinner threads tutorial | Level Up Coding

the message B are processed. This problem can be mitigated by creating multi spinner
threads as talked before, but it cannot completely solve the issue which stems from
having a single callback queue.

To solve the problem, we can create a separate callback queue and spinner thread
which is dedicated to the message A as shown below.

https://levelup.gitconnected.com/ros-spinning-threading-queuing-aac9c0a793f 8/11
23/04/2021 ROS publisher queue, subscriber queue, callback queue and spinner threads tutorial | Level Up Coding

By having the dedicated callback queue and spinner thread, the callback for the
message A will be processed without being blocked by callback for other messages that
are less critical.

The implementation looks like below,

#include <thread>
#include <ros/callback_queue.h>
#include "ros/ros.h"
#include "std_msgs/String.h"

void CallbackA(const std_msgs::String::ConstPtr& msg) {


ROS_INFO(" I heard: [%s]", msg->data.c_str());
}

void CallbackB(const std_msgs::String::ConstPtr& msg) {


ROS_INFO(" I heard: [%s]", msg->data.c_str());
}

int main(int argc, char **argv) {


ros::init(argc, argv, "listener");
ros::NodeHandle n;
ros::Subscriber sub_b = n.subscribe("MessageB", 1, CallbackB);

ros::NodeHandle n_a;

ros::CallbackQueue callback_queue_a;
n_a.setCallbackQueue(&callback_queue_a);
ros::Subscriber sub_a = n_a.subscribe("MessageA", 1, CallbackA);

std::thread spinner_thread_a([&callback_queue_a]() {
ros::SingleThreadedSpinner spinner_a;
spinner_a.spin(&callback_queue_a);
});

ros::spin();
spinner_thread_a.join();

return 0;
}

It creates a separate node handle for the message A. Then it sets a newly created
callback queue which is dedicated for the message A to the node handle. Also, it
creates a subscriber using the node handle, and eventually create a separate thread
which processes the callback queue, acting as a separate spinner thread dedicated for
the message A.

https://levelup.gitconnected.com/ros-spinning-threading-queuing-aac9c0a793f 9/11
23/04/2021 ROS publisher queue, subscriber queue, callback queue and spinner threads tutorial | Level Up Coding

Summary
In this post, I explained about the concepts of the execution model, threading, queues
that are used in ROS, especially related to its publisher-subscriber pattern. We also
talked about the different possible configurations of queue size, the number of spinner
threads, the number of queue and their effects. There is no one-size-fits-all solution but
it is important to understand those concepts and the trade-offs so that you can decide
what is the best for your applications.

[1]: https://www.ros.org/

[2]:https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern

[3]:http://wiki.ros.org/ROS/Tutorials/WritingPublisherSubscriber%28c%2B%2B%2
9

[4]:https://en.cppreference.com/w/cpp/language/lambda

The link to the example code used in the post.

Sign up for Top Stories


By Level Up Coding

A monthly summary of the best stories shared in Level Up Coding Take a look.

Your email

Get this newsletter

By signing up, you will create a Medium account if you don’t already have one. Review our Privacy Policy for more information
about our privacy practices.

Programming Robotics Technology Machine Learning Software Engineering

About Help Legal

https://levelup.gitconnected.com/ros-spinning-threading-queuing-aac9c0a793f 10/11
23/04/2021 ROS publisher queue, subscriber queue, callback queue and spinner threads tutorial | Level Up Coding

Get the Medium app

https://levelup.gitconnected.com/ros-spinning-threading-queuing-aac9c0a793f 11/11

You might also like