Introduction to Apollo Cyber Component

Recently, I'm learning Apollo project which is an open source project for autonoumous driving. At beginning I hoped I can focus on only the perception module, but soon I realized it's not possible to go without touching the bottom layer -- Cyber RT framework. At the time when I'm learning now, there was not much info about the code strucuture and process chain can be found from the official documents in the project. This blog is my summary on this(version 8.0). I will put it to the following parts: - Important concept items in Cyber component - Initialization of a Cyber component and data receiving/processing inside - Data transimit in Cyber framework - Perception module break down

Important concept items in Cyber component

The official info CyberRT Terms gives definitions of important concept terms in Cyber framework. For people who are not familiar with ROS system like me, will still have no clue about those terms. I drew a picture below to show the basic structure of Component class. Basic Component class doesn't have a writer memeber, but almost all perception components like Camera detection component and lidar detection component they have declared a writer as they need to output the process result to other components.

alt text

As the picture shows, a component can have multiple readers, each reader will use its receiver to subscribe a message type. The same rule for writers, a writer will use its tranmitter to send out the message. Transmitting or receiving a specific message type will use the channel id as the identifier, which is inside the role_attr member variable of a transmitter or a receiver as you can see in the picture.

Component class encapsulates the data processing process for a module, and also provides developers the interfaces to customize the initialization and the process in Init() and Proc(), which are invoked inside Initialize() and Process() memeber functions.

Initialization of a Cyber component and data receiving/processing inside

Apollo provides two ways to launch a module: mainboard and cyber_launch. Example commands:
- mainboard -d *.dag - cyber_launch start *.launch

In perception module, the order during loading a module is:
launch file -> DAG file -> Component
A launch file can include several DAG module files, means it could start mulitiple modules. A DAG file is the config file of module topology, it includes the components which are put into this module.

As cyber_launch will invoke mainboard to start a module, here I will break down the process of mainboard. From mainboard.cc we can know:

1
2
3
4
5
6
mainboard \
-> moduleController::Init() \
-> moduleController::LoadAll() \
-> for loop of moduleController::LoadModule(DagConfig) \
-> ClassLoadManager::CreateClassObj(Compnent_name) \
-> Component::Initialize(Component_config) \

In Component.h there are 5 partial template specializations for class template Component, which can handle one to four message type cases(described as M0,M1,M2,M3). Here we take the two message types case(M0,M1) as example to analyze the initialization process.

Before diving into code, an important concept RunMode needs to be clarified first. RunMode is a simple enum, there are two mode types defined: - MODE_SIMULATION - MODE_REALITY

And the default mode is set to MODE_REALITY in cyber.pb.conf file. MODE_SIMULATION is mainly used in test code.
The initialization process and data processing for those two modes are quite different. Here is the break down for each mode type:

I. MODE_REALITY

The initialization of a component under this mode has the following steps:
1. Create a node node_ 2. Execute user customized Init() function 3. Load config for reader 4. Create reader1 for M1, create reader0 for M0, put them all into readers_
4.1Initialize readers with reader_ptr->Init() 5. Define function func which will invoke data process function of the messages M0,M1 6. Create a DataVisitor<M0, M1> dv with reader configs 7. Create a RoutineFactory factory = croutine::CreateRoutineFactory<M0, M1>(func, dv). It actually defines the coroutine function which will be used to create corountine in the next step 8. Create a Task(coroutine) sched->CreateTask(factory, node_->Name())

We need to break down even deeper of some steps above (4.1, 5, 7, 8) to understand this whole process.

In 4.1, look into the steps of Reader<MessageT>::Init() :
4.1.1 Define a lambda function func as following:

1
2
3
4
5
6
7
8
9
if (reader_func_ != nullptr) {
func = [this](const std::shared_ptr<MessageT>& msg) {
this->Enqueue(msg);
this->reader_func_(msg);
};
} else {
func = [this](const std::shared_ptr<MessageT>& msg) {
this->Enqueue(msg); }
}
4.1.2 Create a DataVisitor<MessageT> dv with the specific message type
4.1.3 Create a RoutineFactory factory = croutine::CreateRoutineFactory<MessageT>(func, dv)
4.1.4 Create a Task sched->CreateTask(factory, croutine_name_)
4.1.5 Create a receiver receiver_with message type config 4.1.6 Add this receiver into the channel topology as a listener of the corresponding reader

Notice steps 4.1.1 to 4.1.4 are very similar to steps 5 to 8, it seems like repeating steps but actually not. In our case(M0, M1), there will be together 3 coroutines(named as CRoutine class in the project) created. The coroutine which is directly created in Component::Initialize() is with name node_->Name(), but the other 2 coroutines which are created in Reader::Init() are with name role_attr_.node_name() + "_" + role_attr_.channel_name(). Looking at the func defintion in 4.1.1, all readers under MODE_REALITY will be created without reader_func_, so it will go to else branch(this->Enqueue(msg)). Enqueue(msg) will execute blocker_->Publish(msg),which writes msg into the message queue published_msg_queue_ of blocker_.

In 5, a lambda function func is defined as following:

1
2
3
4
5
6
7
8
auto func = [self](const std::shared_ptr<M0>& msg0, const std::shared_ptr<M1>& msg1) {
auto ptr = self.lock();
if (ptr) {
ptr->Process(msg0, msg1);
} else {
AERROR << "Component object has been destroyed.";
}
};
In 7, a coroutine function is defined as a lambda function as following. There are 5 function templates for CreateRoutineFactory to handle multiple message types. For the coroutine which is directly created in Component::Initialize()(we name it here ascr), the function f following is the func defined in step 5. For the coroutines which are crated in Reader::Init()(we name them here as cr_M0, cr_M1), the function f following is the func defined in step 4.1.1. So coroutine cr is in charge of fetching and processing the incoming messages(M0, M1 type), coroutine cr_M0 will fetch the coming M0 message and push into the message queue, same for coroutine cr_M1.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
factory.create_routine = [=]() {
return [=]() {
std::shared_ptr<M0> msg0;
std::shared_ptr<M1> msg1;
for (;;)
{
CRoutine::GetCurrentRoutine()->set_state(RoutineState::DATA_WAIT);
if (dv->TryFetch(msg0, msg1)) {
f(msg0, msg1);
CRoutine::Yield(RoutineState::READY);
} else {
CRoutine::Yield();
}
}
};
};
Till now, we know all the coroutines crerated during the component initialization and the jobs they will do. Next is to be clear about the event triggering process.

In 8, after the coroutine is created and dispatched(the dispatch policy is defined in SchedulerClassic and SchedulerChoreography class implemenation), an important call back function is registered to DataVisitor object dv:

1
2
3
4
visitor->RegisterNotifyCallback([this, task_id]() {
if (cyber_unlikely(stop_.load())) { return; }
this->NotifyProcessor(task_id);
});
From the definition, it seems to wake up the corotuine, and it should be triggered when new data/messages come. Indeed it is, but the process is not so straightforward:
In step 4.1.5, a receiver is created with a lambda function input as following, this lambda function will be assigned to the member variable of Receiver class : msg_listener_. Another member function OnNewMessage() is a listener of new message data, when new message data comes, and it will execute callback function msg_listener_. I will show the loop of message transmitting and receiving in next section, here only focus on receiver side.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
CreateReceiver<MessageT>(
role_attr, [](const std::shared_ptr<MessageT>& msg,
const transport::MessageInfo& msg_info,
const proto::RoleAttributes& reader_attr) {
(void)msg_info;
(void)reader_attr;
PerfEventCache::Instance()->AddTransportEvent(
TransPerf::DISPATCH, reader_attr.channel_id(),
msg_info.seq_num());
data::DataDispatcher<MessageT>::Instance()->Dispatch(
reader_attr.channel_id(), msg);
PerfEventCache::Instance()->AddTransportEvent(
TransPerf::NOTIFY, reader_attr.channel_id(),
msg_info.seq_num());
});
The trigger process when receiving new message data is:
1
2
3
4
5
`OnNewMessage(msg, msg_info)` \
-> `msg_listener` func \
-> `DataDispatcher<MessageT>::Instance()->Dispatch(reader_attr.channel_id(), msg)` \
-> `buffer->Fill(msg)` + `notifier_->Notify(channel_id)` \
-> `notifier->callback()` \
To understand details in the above trigger process, it's better to take a look at those classes: DataVisitor, DataDispatcher, DataNotifier, ChannelBuffer. DataDispatcher class and DataNotifier class are typical singleton pattern. In short words, when receives new M0 message data msg0, it will be written into ChannelBuffer buffer_m0_. And notifier->callback() will call exactly the callback function registered to DataVisitor object dv in step 8: this->NotifyProcessor(task_id), which means it does indeed "wake up" the coroutine cr_M0 to execute the coroutine function defined in step 7. The same process for receiving new M1 message data.   For the coroutine cr we named before, its TryFetch(msg0, msg1) will fuse the new M0 and M1 message data first before processing the data: data_fusion_->Fusion(&next_msg_index_, m0, m1). In summary, all receivers push new coming data into the corresponding channel buffer, the main coroutine will fetch data from all buffers and process them.

II. MODE_SIMULATION

The initialization of a component under this mode has the following steps:
1. Create a node node_ 2. Execute user customized Init() function 3. Load config for readers 4. Create reader1 for M1 without reader function
4.1 Initialize the reader with reader_ptr->Init() 5. Define function func which will invoke data process function of the messages M0,M1 6. Create reader0 for M0 with reader function func defined in last step, put all readers into readers_
6.1Initialize readers with reader_ptr->Init()

In MODE_REALITY, readers are Reader class ojbects; in MODE_SIMULATION, readers are IntraReader class objects, its initialization Init() is quite different:

1
2
3
BlockerManager::Instance()->Subscribe<MessageT>(
this->role_attr_.channel_name(), this->role_attr_.qos_profile().depth(), this->role_attr_.node_name(),
std::bind(&IntraReader<MessageT>::OnMessage, this, std::placeholders::_1));
Simliar to OnNewMessage() invokes msg_listener in MODE_REALITY, here is OnMessage() invokes msg_callback_. But msg_callback_ is not defined when creating the receiver like MODE_REALITY. There is an important difference need to note: MODE_SIMULATION uses reader and writer classes IntraReader and IntraWriter, it does't have transmitter and receiver level as MODE_REALITY, Which means it doesn't use transmitter and receiver to handle messages but it uses Blocker instead. OnMessage() and msg_callback_ are direct members of IntraReader class, but in MODE_REALITY, OnNewMessage() and msg_listener are on the receiver level. Also there will no coroutines and Datavisitor objects created.

The msg_callback_ is defined in step 5 as following. It will be used to initialize reader0 only. reader1 is initialized without msg_callback. In 3 message type (M0,M1,M2) cases, reader1 and reader2 are initialized without msg_callback_. Because the below function does the really data processing for all new message data(M0 msg0 as input, M1 msg1 got from blocker1), it doesn't need to repeat in every reader. Next session will talk about the process of tranmitting message data into Blocker.

1
2
3
4
5
6
7
8
9
10
11
12
13
auto blocker1 = blocker::BlockerManager::Instance()->GetBlocker<M1>(config.readers(1).channel());

auto func = [self, blocker1](const std::shared_ptr<M0>& msg0) {
auto ptr = self.lock();
if (ptr) {
if (!blocker1->IsPublishedEmpty()) {
auto msg1 = blocker1->GetLatestPublishedPtr();
ptr->Process(msg0, msg1);
}
} else {
AERROR << "Component object has been destroyed.";
}
};
## Data transimit in Cyber framework

Here "data transmit" corresponds to the above session "data receiving/processing" which means the message data.
Apollo provides an API cyber_recorder play to play back a record file, together with mainboard or cyber_launch start will simulate a whole data transmit->receive->process loop. So it's a good entrance to understand the data transmitt process.

1. Break down of cyber_recorder play

First is the break down of cyber_recorder/main.cc :

1
2
3
4
5
cyber_recorder command="play" in main.cc 
-> player.Init()
-> producer_->Init()
-> player.Start()
-> producer->Start() + consumer_::Start()
There are 3 classess used in this process: Play, PlayTask, PlayTaskBuffer, PlayTaskProducer and PlayTaskConsumer. First, look at PlayTaskProducer::Init(). It will execute the following two functions:
- ReadRecordInfo(): Create recorder_reader object(s) to read the channel message infos from record file(s), save them into MessageTypeMap msg_types_.
- CreateWriters(): Create a node node_, then for loop with msg_types_ to create writers for all message types, put them into WriterMap writers_.

Then we look at PlayTaskProducer::Start(). It will directly run a thread:

1
produce_th_.reset(new std::thread(&PlayTaskProducer::ThreadFunc, this))
And the ThreadFunc does the following: - Calculate the number of data transmissions according to the start/end time and time interval of the record, put it in a container object record_viewer; - For loop with record_viewer to create a PlayTask and pushes it into task_buffer_ each time. For each item, it will use its corresponding channel name to find the corresponding writer in writers_, and use this writer and raw message to create a PlayTask.

Notice that class Player has member variables:
- ConsumerPtr consumer_ - ProducerPtr producer_ - TaskBufferPtr task_buffer_: a shared pointer which is shared among Player, PlayTaskProducer and PlayTaskConsumer classes.

In PlayTaskConsumer::Start could find similar ThreadFunc as in PlayTaskProducer::Start(). It will execute:

1
`task = task_buffer_->Front()` -> `task->Play()` -> `task_buffer_->PopFront()` 
So the producer_ object keeps pushing new PlayTask ojbects to task_buffer_, while consumer_ keeps pulling PlayTask and executing PlayTask::Play(), and pops out from task_buffer_ after execution finished.

The only left thing is PlayTask::Play(). It executes the kernal job of transmitting data: writer_->Write(msg_).

2. Data transmitt process

Before going into Writer::Write(), it's necessary to know how this writer is initialized. In NodeChannelImpl::CreateWriter() can find the writer will be a Writer class object under MODE_REALITY, will be a IntraWriter class object under MODE_SIMULATION.

I. MODE_REALITY

In Writer::Init(), it will do the following: 1. Create a Tranmitter object transmitter_ 2. Add this tranmitter into the channel topology (so the receivers can subscribe to this channel message)

In step 1, according to the mode value, CreateTransmitter() will create transmitter_ from one of the following classes: - IntraTransmitter - ShmTransmitter - RtpsTransmitter - HybridTransmitter (default)

Notice this mode has nothing to do with the RUN_MODE, this mode means the relationship between communicating nodes under MODE_REALITY which has the transmitter and receiver level. There are 4 modes defined: INTRA means communicating in the process, SHM means communicating between the host process, RTPS means conmmunicating across the host, HYBRID means mixed with those 3 modes.

After the writer initialization, we go to Writer::Write() to see inside. Here takes INTRA mode as example case.

1
2
3
4
5
writer_->Write(msg_)
-> transmitter_->Transmit(msg_ptr)
-> dispatcher_->OnMessage(channel_id_, msg, msg_info)
-> handler->Run(message, message_info)
-> (*signals_[oppo_id])(msg, msg_info)
Don't mix IntraDispatcher object dispatcher_ with DataDispatcher class instance here, the first one is on transmitting side, the second one is on receiving side, both of them are singleton pattern. Notice dispatcher_ is a shared pointer in both IntraTransmitter class and IntraReceiver class.

To figure out what the last step (*signals_[oppo_id])(msg, msg_info) does, need to back to the receiver part. Still remember member function Receiver<M>::OnNewMessage(msg, msg_info) which is the beginning point of the receiving process, this function will be written into member variable msg_listeners_ in dispatcher_ object when IntraReceiver object initializes as following:

1
2
3
dispatcher_->AddListener<M>(this->attr_, std::bind(&IntraReceiver<M>::OnNewMessage, this, std::placeholders::_1, std::placeholders::_2))
-> handler->Connect(self_attr.id(), opposite_attr.id(), listener)
-> signals_[oppo_id]->Connect(listener)

The last step signals_[oppo_id]->Connect(listener) will make signals_[oppo_id] as a callable object with the listener function, (*signals_[oppo_id])(msg, msg_info) will call the lisntener function OnNewMessage(msg, msg_info). This is called the signal and slot mechanism.

So this is the whole loop of data transmitting and receiving process for MODE_REALITY.

  1. MODE_SIMULATION

In IntraWriter::Init(), it will create a blocker object according to the message type, and put it to BlockerMap blockers_ in BlockerManager singleton instance. About IntraWriter::Write() we can break down to:

1
2
3
4
5
6
7
writer_->Write(msg_)
-> blocker_manager_->Publish<MessageT>(this->role_attr_.channel_name(), msg)
-> blocker->Publish(msg)
-> Enqueue(msg)
-> published_msg_queue_.push_front(msg)
-> Notify(msg)
-> call published_callbacks_
Remember we have known that IntraReader::Init() will do the following, it will register OnMessage() into published_callbacks, which is a member variable of Blocker class.
1
2
3
BlockerManager::Instance()->Subscribe<MessageT>(
this->role_attr_.channel_name(), this->role_attr_.qos_profile().depth(), this->role_attr_.node_name(),
std::bind(&IntraReader<MessageT>::OnMessage, this, std::placeholders::_1))
So the data transmitting and receiving loop for MODE_SIMULATION is also clear.