Apollo如何通知/订阅主题topic

发布于 2019-09-26 作者 风铃 49次 浏览 版块 前端

导读
众所周知,Apollo是基于ROS开发的,所以其底层也是基于消息的机制进行节点通信的。但是它在ROS的基础上做了一些改动,如下: 
P2P——由于原生ROS的消息机制是通过主节点(Master)分发数据来实现的,这样一个强中心化的结构始终存在一个Master意外导致系统奔溃的隐患,为了解决这个问题,Apollo使用了Fast-RTPS,用P2P的方式抛弃了主节点实现通信。
Protobuf——原生支持Google的Protobuf,完美的解决了原来ROS的MD5验证导致消息不能后向兼容的问题。
共享内存——使用共享内存传输,效率更高。
上面简单的介绍了一些Apollo和ROS的渊源,如果想详细了解可以看Apollo-ROS官方详细文档。鉴于与ROS的关系,那么就不得不说如何在Apollo里面利用topic进行通信了。虽然出自ROS,但是Apollo在ROS的基础上进行了大量的封装和改动,所以,较原生的ROS,在advertise/subscribe(通知/订阅)topic有了较大的区别。
本文我将按照操作步骤、代码分析、流程图的步骤简述一下Apollo的topic使用方法,如果你只想知道如何使用,那么看前面的使用方法即可,如果你想刨根问题,那么请耐心的看完,结合上下文一起观看,效果更好。还有,代码的注释不能放过,有一些上下文联系的地方,我可能放在代码注释里面来解释了。
使用方法
订阅
订阅已配置好的Topic

实现回调函数

void ZuoTestSubnode::ImgTestCallback(const sensor_msgs::Image &msg){
  AINFO << "ImgTestCallback";
  //-- do sth
}
在InitInternal()内将回调函数添加到对应Topic的回调函数队列,如下:

bool ZuoTestSubnode::InitInternal(){
  AdapterManager::AddImageShortCallback(&ZuoTestSubnode::ImgTestCallback, this);
}
订阅未配置好的Topic

如果用户需要新增Topic,那就需要重新配置,而配置,本质上就是调用subscribe/advertise函数,然后生成空的回调函数队列和消息发布句柄。这些是Apollo对ROS做的封装,主要目的是为了方便使用,以及管理众多的topic,这部分主要是在adapter_manager.h、message_manager.h等实现的。具体代码后面再讲,先看步骤:

在adapter_gflags.cc和adapter_gflags.h里面添加新topic。如下:

//-- adapter_gflags.h

//-- Zuo added on 2018-04-15 for testPublishSubnode
DECLARE_string(zuo_test_topic);
//-- adapter_gflags.cc

//-- Zuo added on 2018-04-15 for testPublishSubnode
DEFINE_string(zuo_test_topic, "/apollo/zuo/zuo_test", "Zuo added for testPublishMsg");
在adapter.conf文件内添加一个config标签,这个conf文件是用来配置对应的topic。而config中的type,是用来在后文的有限状态机中,进入我们新添的topic分支。

config {
type: ZUO_TEST
mode: PUBLISH_ONLY
message_history_limit: 5
}
在adapter_manager.h里面增加对ZuoTest的适配:

class AdapterManager {
public:
......
//-- Zuo added on 2018-04-15 for testPublishSubnode
//-- 这里的ZuoTest可以理解为对topic取了个别名。
REGISTER_ADAPTER(ZuoTest);
};
在adapter_manager.cc的Init()函数的有限状态机内添加一个分支,如下:

void AdapterManager::Init(const AdapterManagerConfig &configs) {
......
  for (const auto &config : configs.config()) {
      switch (config.type()) {
        ......
        case AdapterConfig::ZUO_TEST:
        EnableZuoTest(FLAGS_zuo_test_topic, config);
        break;
          default:
          AERROR << "Unknown adapter config type!";
          break;
      }
  }
}
在adapter_config.proto里面增加ZUO_TEST标签:(不然上述的有限状态机分支就无法进入)

message AdapterConfig {
enum MessageType {
    ......
  ZUO_TEST = 45;
}
在message_adapters.h里面增加对应消息的别名:

using ZuoTestAdapter = Adapter<sensor_msgs::Image>;
1
剩下的就和上述已配置好的Topic的使用方法相同了,我就不赘述了。

通知
通知已配置好的Topic

如果是已经配置好的topic,那就很简单,如下:

//-- 新建msg
PerceptionObstacles obstacles;
//-- publish msg
common::adapter::AdapterManager::PublishPerceptionObstacles(obstacles);
通知未配置好的Topic

其实在订阅未配置好的Topic里面的配置方法,是通用的,和这里的配置方法一样,我就不赘述了,在配置好以后参考上一步的通知方法即可。
代码分析
上一篇How_to_add_a_subnode已经讲到了如何添加一个subnode,但是还没有说到如何在这个subnode里面subscribe/publish(订阅/发布)消息,下面将根据上述的操作步骤讲解在Apollo里面为什么要这么用topic。

在说Apollo前,还是带一下ROS的通信机制,也方便我们后续的理解,详情可以了解ROS Wiki 

原生的ROS其实是如上图一个消息传递流程,简而言之就是,一个节点发,一个节点收,中间是由Master进行转发,具体操作如下:

//-- Zuo added on 2018-04-13
void chatterCallback(const std_msgs::String::ConstPtr& msg)  
{
    //-- do sth
    printf("msg's data = %s", msg.data);
}
int main(int argc, char **argv)  
{
    ros::init(argc, argv, "listener");
    ros::NodeHandle n;

    //-- 向Topic发布消息
    //-- 这里是告诉Master,我需要在'TopicfullName'上发布一个消息。
    //-- @param_0 Topic名字
    //-- @param_1 消息发布队列大小
    //-- 返回的句柄需要保存,用来调用publish函数
    ros::Publisher Zuo_pub = n.advertise<std_msgs::String>("TopicfullName", 1000);
    std_msgs::String msg;
    msg.data = "Hello World";
    //-- 用户根据返回的句柄调用publish函数,向对应的topic发送消息
    Zuo_pub.publish(msg);

    //-- 订阅一个Topic
    //-- 如果`TopicfullName`接收到消息,就会触发这里的回调函数chatterCallback()
    ros::Subscriber sub = n.subscribe("TopicfullName", 1000, chatterCallback);
    ros::spin();//ros::spin()进入自循环,循环获取事件
    return 0;
}
注意:上面的两个函数advertise/subscribe的方式我们可以多留意一下,之后在Apollo的代码讲解里面也会看到类似的方式。

在Apollo里面,为了降低subnode在通信过程中对Master节点的依赖,将上图的机制设计成两个subnode直接类似P2P通信,相当于对原来的网络做了一个去中心化的改变。但是这里我们不用管,对于用户来说,我们的使用方式没变,还是通过调用advertise/subscribe两个接口来进行节点通信。

诚然,在Apollo里面我们可以直接调用advertise/subscribe,但是可以用,不代表应该用。看代码我们会发现,在Apollo底层(adapter_manager.h)的确是调用了advertise/subscribe,但是在其之上还封装了几层,它将对不同Topic的advertise/subscribe使用封装成了对应的带有Topic名字标签的函数,例如:

common::adapter::AdapterManager::PublishPerceptionObstacles(obstacles);
1
上述这种本质上就是ROS里面的publish()函数,但是Apollo对publish()函数封装后,使得每个不同模块的publish都带有各自不同的标签,每一个PublishXXX()只能发送对应类型的Msg,也就是说,原生的ROS下,在大量的发送/订阅的场景下,需要用户管理大量的句柄(这里可以参考上述ROS的简例)。

上面是Publish,下面看看Apollo对subscribe是怎么封装的:
 AdapterManager::AddImageShortCallback(&ZuoTestSubnode::ZuoImgTestCallback,this);
1
Apollo将ROS的回调函数封装成一个函数队列,上面的函数就是往这个函数队列里添加回调函数。那么,这个函数在哪里定义的呢?如果直接搜索,是找不到的。这里Apollo用了一个技巧。AdapterManager::AddImageShortCallback()是在adapter_manager里面用宏展开和##拼接结合的方法而成。通过调用REGISTER_ADAPTER(name)来实现(REGISTER_ADAPTER的调用参考操作步骤里面的适配操作)。可以看到adapter_manager.h:#90开始,定义了三个不同参数的static void Add##name##Callback()函数。如下:
  static void Add##name##Callback(name##Adapter::Callback callback) {          \
    CHECK(instance()->name##_)                                                 \
        << "Initialize adapter before setting callback";                       \
    instance()->name##_->AddCallback(callback);                                \
  }                                                                            \
  template <class T>                                                           \
  static void Add##name##Callback(                                             \
      void (T::*fp)(const name##Adapter::DataType &data), T *obj) {            \
    Add##name##Callback(std::bind(fp, obj, std::placeholders::_1));            \
  }                                                                            \
  template <class T>                                                           \
  static void Add##name##Callback(                                             \
      void (T::*fp)(const name##Adapter::DataType &data)) {                    \
    Add##name##Callback(fp);                                                   \
  }  
我们先不管函数具体作用,但是从上述代码我们可以看到,后两个函数最终也是调用了第一个函数,也就是
  static void Add##name##Callback(name##Adapter::Callback callback) {          \
1
接着跟进去,我们看到这个函数实际上就是调用了:
    instance()->name##_->AddCallback(callback);                                \
1
再跟进,可以看到在adapter.h里:
  /**
   * @brief registers the provided callback function to the adapter,
   * so that the callback function will be called once right after the
   * message hits the adapter.
   * @param callback the callback with signature void(const D &).
   */  
  void AddCallback(Callback callback) {
    receive_callbacks_.push_back(callback);
  } 
​ receive_callbacks_的定义如下:

  /// User defined function when receiving a message
  std::vector<Callback> receive_callbacks_;
其实到这里,Apollo订阅的封装实现很明显了,其在adapter_manager.h通过宏展开和拼接生成带有不同名字标签的函数群,这些函数群包含一系列topic相关操作的功能,如订阅,压入回调函数队列等等,然后在adapter_manager.cc里面通过有限状态机根据读入的config文件(参考dag_config_path变量)配置需要启动的subnode。这里调用的Enable##name(topicName,config)实现了对不同topic的订阅以及生成通知句柄,然后就可以调用Add##name##Callback来订阅指定的topic,以及调用Publish##name向指定topic发送消息。

那么Apollo这样绕了一圈将advertise/subscribe封装起来,有什么用呢?原因有如下两点:

当系统里面的相似类型多了,变量的管理会变得复杂起来,那么用这种带有名字标签的命名方式,可以让变量名能够做到顾名思义。
本质上,不同的subscribe和publish函数都是一些相同代码,Apollo通过##拼接和宏展开结合的方式,提高了这一块的代码复用率,也方便整个框架后续的延伸扩展。
还有一个要注意,name##Adapter是不同的消息格式,在message_adapters.h定义的别名,这是为了配合整套方案而将不同的消息都别名成了nameAdapter的形式。

using PerceptionObstaclesAdapter = Adapter<perception::PerceptionObstacles>;
1
类似上面的这样取别名,这里的perception::PerceptionObstacles是在perception_obstacle.pb.h的文件内定义的了,这个和protobuf有关,暂且不说,这是下一章How_to_add_a_new_msg的内容。

流程图
下面是整个topic配置的整体流程图,最终呈现在用户面前的是两句Publish##Name()和Add##Name##Callbackc()。 

--------------------- 
作者:Zuo丶 
来源:CSDN 
原文:https://blog.csdn.net/u012423865/article/details/80024870?utm_source=copy 
版权声明:本文为博主原创文章,转载请附上博文链接!

收藏
暂无回复