Introduction to Apollo Cyber Component
Recently, I'm learning Apollo project which is an open source project for autonoumous driving. At beginning I hoped I can focus on only the perception module, but soon I realized it's not possible to go without touching the bottom layer -- Cyber RT framework. At the time when I'm learning now, there was not much info about the code strucuture and process chain can be found from the official documents in the project. This blog is my summary on this(version 8.0). I will put it to the following parts: - Important concept items in Cyber component - Initialization of a Cyber component and data receiving/processing inside - Data transimit in Cyber framework - Perception module break down
Important concept items in Cyber component
The official info CyberRT Terms gives definitions of important concept terms in Cyber framework. For people who are not familiar with ROS system like me, will still have no clue about those terms. I drew a picture below to show the basic structure of Component
class. Basic Component
class doesn't have a writer
memeber, but almost all perception components like Camera detection component and lidar detection component they have declared a writer
as they need to output the process result to other components.
As the picture shows, a component can have multiple readers, each reader will use its receiver to subscribe a message type. The same rule for writers, a writer will use its tranmitter to send out the message. Transmitting or receiving a specific message type will use the channel id as the identifier, which is inside the role_attr
member variable of a transmitter or a receiver as you can see in the picture.
Component class encapsulates the data processing process for a module, and also provides developers the interfaces to customize the initialization and the process in Init()
and Proc()
, which are invoked inside Initialize()
and Process()
memeber functions.
Initialization of a Cyber component and data receiving/processing inside
Apollo provides two ways to launch a module: mainboard
and cyber_launch
. Example commands:
- mainboard -d *.dag
- cyber_launch start *.launch
In perception module, the order during loading a module is:
launch file -> DAG file -> Component
A launch file can include several DAG module files, means it could start mulitiple modules. A DAG file is the config file of module topology, it includes the components which are put into this module.
As cyber_launch
will invoke mainboard
to start a module, here I will break down the process of mainboard
. From mainboard.cc
we can know: 1
2
3
4
5
6mainboard \
-> moduleController::Init() \
-> moduleController::LoadAll() \
-> for loop of moduleController::LoadModule(DagConfig) \
-> ClassLoadManager::CreateClassObj(Compnent_name) \
-> Component::Initialize(Component_config) \
In Component.h
there are 5 partial template specializations for class template Component
, which can handle one to four message type cases(described as M0,M1,M2,M3). Here we take the two message types case(M0,M1) as example to analyze the initialization process.
Before diving into code, an important concept RunMode
needs to be clarified first. RunMode
is a simple enum, there are two mode types defined: - MODE_SIMULATION
- MODE_REALITY
And the default mode is set to MODE_REALITY
in cyber.pb.conf
file. MODE_SIMULATION
is mainly used in test code.
The initialization process and data processing for those two modes are quite different. Here is the break down for each mode type:
I. MODE_REALITY
The initialization of a component under this mode has the following steps:
1. Create a node node_
2. Execute user customized Init()
function 3. Load config for reader 4. Create reader1
for M1, create reader0
for M0, put them all into readers_
4.1Initialize readers with reader_ptr->Init()
5. Define function func
which will invoke data process function of the messages M0,M1 6. Create a DataVisitor<M0, M1>
dv
with reader configs 7. Create a RoutineFactory
factory = croutine::CreateRoutineFactory<M0, M1>(func, dv)
. It actually defines the coroutine function which will be used to create corountine in the next step 8. Create a Task(coroutine) sched->CreateTask(factory, node_->Name())
We need to break down even deeper of some steps above (4.1, 5, 7, 8) to understand this whole process.
In 4.1, look into the steps of Reader<MessageT>::Init()
:
4.1.1 Define a lambda function func
as following: 1
2
3
4
5
6
7
8
9if (reader_func_ != nullptr) {
func = [this](const std::shared_ptr<MessageT>& msg) {
this->Enqueue(msg);
this->reader_func_(msg);
};
} else {
func = [this](const std::shared_ptr<MessageT>& msg) {
this->Enqueue(msg); }
}DataVisitor<MessageT>
dv
with the specific message type
4.1.3 Create a RoutineFactory factory = croutine::CreateRoutineFactory<MessageT>(func, dv)
4.1.4 Create a Task sched->CreateTask(factory, croutine_name_)
4.1.5 Create a receiver receiver_
with message type config 4.1.6 Add this receiver into the channel topology as a listener of the corresponding reader
Notice steps 4.1.1 to 4.1.4 are very similar to steps 5 to 8, it seems like repeating steps but actually not. In our case(M0, M1), there will be together 3 coroutines(named as CRoutine
class in the project) created. The coroutine which is directly created in Component::Initialize()
is with name node_->Name()
, but the other 2 coroutines which are created in Reader::Init()
are with name role_attr_.node_name() + "_" + role_attr_.channel_name()
. Looking at the func
defintion in 4.1.1, all readers under MODE_REALITY
will be created without reader_func_
, so it will go to else
branch(this->Enqueue(msg)
). Enqueue(msg)
will execute blocker_->Publish(msg)
,which writes msg
into the message queue published_msg_queue_
of blocker_
.
In 5, a lambda function func
is defined as following: 1
2
3
4
5
6
7
8auto func = [self](const std::shared_ptr<M0>& msg0, const std::shared_ptr<M1>& msg1) {
auto ptr = self.lock();
if (ptr) {
ptr->Process(msg0, msg1);
} else {
AERROR << "Component object has been destroyed.";
}
};CreateRoutineFactory
to handle multiple message types. For the coroutine which is directly created in Component::Initialize()
(we name it here ascr
), the function f
following is the func
defined in step 5. For the coroutines which are crated in Reader::Init()
(we name them here as cr_M0
, cr_M1
), the function f
following is the func
defined in step 4.1.1. So coroutine cr
is in charge of fetching and processing the incoming messages(M0, M1 type), coroutine cr_M0
will fetch the coming M0 message and push into the message queue, same for coroutine cr_M1
. 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16factory.create_routine = [=]() {
return [=]() {
std::shared_ptr<M0> msg0;
std::shared_ptr<M1> msg1;
for (;;)
{
CRoutine::GetCurrentRoutine()->set_state(RoutineState::DATA_WAIT);
if (dv->TryFetch(msg0, msg1)) {
f(msg0, msg1);
CRoutine::Yield(RoutineState::READY);
} else {
CRoutine::Yield();
}
}
};
};
In 8, after the coroutine is created and dispatched(the dispatch policy is defined in SchedulerClassic
and SchedulerChoreography
class implemenation), an important call back function is registered to DataVisitor
object dv
: 1
2
3
4visitor->RegisterNotifyCallback([this, task_id]() {
if (cyber_unlikely(stop_.load())) { return; }
this->NotifyProcessor(task_id);
});
In step 4.1.5, a receiver is created with a lambda function input as following, this lambda function will be assigned to the member variable of Receiver
class : msg_listener_
. Another member function OnNewMessage()
is a listener of new message data, when new message data comes, and it will execute callback function msg_listener_
. I will show the loop of message transmitting and receiving in next section, here only focus on receiver side. 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15CreateReceiver<MessageT>(
role_attr, [](const std::shared_ptr<MessageT>& msg,
const transport::MessageInfo& msg_info,
const proto::RoleAttributes& reader_attr) {
(void)msg_info;
(void)reader_attr;
PerfEventCache::Instance()->AddTransportEvent(
TransPerf::DISPATCH, reader_attr.channel_id(),
msg_info.seq_num());
data::DataDispatcher<MessageT>::Instance()->Dispatch(
reader_attr.channel_id(), msg);
PerfEventCache::Instance()->AddTransportEvent(
TransPerf::NOTIFY, reader_attr.channel_id(),
msg_info.seq_num());
});1
2
3
4
5`OnNewMessage(msg, msg_info)` \
-> `msg_listener` func \
-> `DataDispatcher<MessageT>::Instance()->Dispatch(reader_attr.channel_id(), msg)` \
-> `buffer->Fill(msg)` + `notifier_->Notify(channel_id)` \
-> `notifier->callback()` \DataVisitor
, DataDispatcher
, DataNotifier
, ChannelBuffer
. DataDispatcher
class and DataNotifier
class are typical singleton pattern. In short words, when receives new M0 message data msg0
, it will be written into ChannelBuffer
buffer_m0_
. And notifier->callback()
will call exactly the callback function registered to DataVisitor
object dv
in step 8: this->NotifyProcessor(task_id)
, which means it does indeed "wake up" the coroutine cr_M0
to execute the coroutine function defined in step 7. The same process for receiving new M1 message data. For the coroutine cr
we named before, its TryFetch(msg0, msg1)
will fuse the new M0 and M1 message data first before processing the data: data_fusion_->Fusion(&next_msg_index_, m0, m1)
. In summary, all receivers push new coming data into the corresponding channel buffer, the main coroutine will fetch data from all buffers and process them.
II. MODE_SIMULATION
The initialization of a component under this mode has the following steps:
1. Create a node node_
2. Execute user customized Init()
function 3. Load config for readers 4. Create reader1
for M1 without reader function
4.1 Initialize the reader with reader_ptr->Init()
5. Define function func
which will invoke data process function of the messages M0,M1 6. Create reader0
for M0 with reader function func
defined in last step, put all readers into readers_
6.1Initialize readers with reader_ptr->Init()
In MODE_REALITY
, readers are Reader
class ojbects; in MODE_SIMULATION
, readers are IntraReader
class objects, its initialization Init()
is quite different: 1
2
3BlockerManager::Instance()->Subscribe<MessageT>(
this->role_attr_.channel_name(), this->role_attr_.qos_profile().depth(), this->role_attr_.node_name(),
std::bind(&IntraReader<MessageT>::OnMessage, this, std::placeholders::_1));OnNewMessage()
invokes msg_listener
in MODE_REALITY
, here is OnMessage()
invokes msg_callback_
. But msg_callback_
is not defined when creating the receiver like MODE_REALITY
. There is an important difference need to note: MODE_SIMULATION
uses reader and writer classes IntraReader
and IntraWriter
, it does't have transmitter and receiver level as MODE_REALITY
, Which means it doesn't use transmitter and receiver to handle messages but it uses Blocker
instead. OnMessage()
and msg_callback_
are direct members of IntraReader
class, but in MODE_REALITY
, OnNewMessage()
and msg_listener
are on the receiver level. Also there will no coroutines and Datavisitor
objects created.
The msg_callback_
is defined in step 5 as following. It will be used to initialize reader0
only. reader1
is initialized without msg_callback
. In 3 message type (M0,M1,M2) cases, reader1
and reader2
are initialized without msg_callback_
. Because the below function does the really data processing for all new message data(M0 msg0
as input, M1 msg1
got from blocker1
), it doesn't need to repeat in every reader. Next session will talk about the process of tranmitting message data into Blocker
. 1
2
3
4
5
6
7
8
9
10
11
12
13auto blocker1 = blocker::BlockerManager::Instance()->GetBlocker<M1>(config.readers(1).channel());
auto func = [self, blocker1](const std::shared_ptr<M0>& msg0) {
auto ptr = self.lock();
if (ptr) {
if (!blocker1->IsPublishedEmpty()) {
auto msg1 = blocker1->GetLatestPublishedPtr();
ptr->Process(msg0, msg1);
}
} else {
AERROR << "Component object has been destroyed.";
}
};
Here "data transmit" corresponds to the above session "data receiving/processing" which means the message data.
Apollo provides an API cyber_recorder play
to play back a record file, together with mainboard
or cyber_launch start
will simulate a whole data transmit->receive->process loop. So it's a good entrance to understand the data transmitt process.
1. Break down of cyber_recorder play
First is the break down of cyber_recorder/main.cc
: 1
2
3
4
5cyber_recorder command="play" in main.cc
-> player.Init()
-> producer_->Init()
-> player.Start()
-> producer->Start() + consumer_::Start()Play
, PlayTask
, PlayTaskBuffer
, PlayTaskProducer
and PlayTaskConsumer
. First, look at PlayTaskProducer::Init()
. It will execute the following two functions:
- ReadRecordInfo()
: Create recorder_reader
object(s) to read the channel message infos from record file(s), save them into MessageTypeMap msg_types_
.
- CreateWriters()
: Create a node node_
, then for loop with msg_types_
to create writers for all message types, put them into WriterMap writers_
.
Then we look at PlayTaskProducer::Start()
. It will directly run a thread:
1
produce_th_.reset(new std::thread(&PlayTaskProducer::ThreadFunc, this))
ThreadFunc
does the following: - Calculate the number of data transmissions according to the start/end time and time interval of the record, put it in a container object record_viewer
; - For loop with record_viewer
to create a PlayTask
and pushes it into task_buffer_
each time. For each item, it will use its corresponding channel name to find the corresponding writer in writers_
, and use this writer and raw message to create a PlayTask
.
Notice that class Player
has member variables:
- ConsumerPtr consumer_
- ProducerPtr producer_
- TaskBufferPtr task_buffer_
: a shared pointer which is shared among Player
, PlayTaskProducer
and PlayTaskConsumer
classes.
In PlayTaskConsumer::Start
could find similar ThreadFunc
as in PlayTaskProducer::Start()
. It will execute: 1
`task = task_buffer_->Front()` -> `task->Play()` -> `task_buffer_->PopFront()`
producer_
object keeps pushing new PlayTask
ojbects to task_buffer_
, while consumer_
keeps pulling PlayTask
and executing PlayTask::Play()
, and pops out from task_buffer_
after execution finished.
The only left thing is PlayTask::Play()
. It executes the kernal job of transmitting data: writer_->Write(msg_)
.
2. Data transmitt process
Before going into Writer::Write()
, it's necessary to know how this writer is initialized. In NodeChannelImpl::CreateWriter()
can find the writer will be a Writer
class object under MODE_REALITY
, will be a IntraWriter
class object under MODE_SIMULATION
.
I. MODE_REALITY
In Writer::Init()
, it will do the following: 1. Create a Tranmitter
object transmitter_
2. Add this tranmitter into the channel topology (so the receivers can subscribe to this channel message)
In step 1, according to the mode
value, CreateTransmitter()
will create transmitter_
from one of the following classes: - IntraTransmitter - ShmTransmitter - RtpsTransmitter - HybridTransmitter (default)
Notice this mode
has nothing to do with the RUN_MODE
, this mode means the relationship between communicating nodes under MODE_REALITY
which has the transmitter and receiver level. There are 4 modes defined: INTRA means communicating in the process, SHM means communicating between the host process, RTPS means conmmunicating across the host, HYBRID means mixed with those 3 modes.
After the writer initialization, we go to Writer::Write()
to see inside. Here takes INTRA
mode as example case. 1
2
3
4
5writer_->Write(msg_)
-> transmitter_->Transmit(msg_ptr)
-> dispatcher_->OnMessage(channel_id_, msg, msg_info)
-> handler->Run(message, message_info)
-> (*signals_[oppo_id])(msg, msg_info) IntraDispatcher
object dispatcher_
with DataDispatcher
class instance here, the first one is on transmitting side, the second one is on receiving side, both of them are singleton pattern. Notice dispatcher_
is a shared pointer in both IntraTransmitter
class and IntraReceiver
class.
To figure out what the last step (*signals_[oppo_id])(msg, msg_info)
does, need to back to the receiver part. Still remember member function Receiver<M>::OnNewMessage(msg, msg_info)
which is the beginning point of the receiving process, this function will be written into member variable msg_listeners_
in dispatcher_
object when IntraReceiver
object initializes as following:
1 | dispatcher_->AddListener<M>(this->attr_, std::bind(&IntraReceiver<M>::OnNewMessage, this, std::placeholders::_1, std::placeholders::_2)) |
The last step signals_[oppo_id]->Connect(listener)
will make signals_[oppo_id]
as a callable object with the listener function, (*signals_[oppo_id])(msg, msg_info)
will call the lisntener function OnNewMessage(msg, msg_info)
. This is called the signal and slot mechanism.
So this is the whole loop of data transmitting and receiving process for MODE_REALITY
.
MODE_SIMULATION
In IntraWriter::Init()
, it will create a blocker
object according to the message type, and put it to BlockerMap blockers_
in BlockerManager
singleton instance. About IntraWriter::Write()
we can break down to: 1
2
3
4
5
6
7writer_->Write(msg_)
-> blocker_manager_->Publish<MessageT>(this->role_attr_.channel_name(), msg)
-> blocker->Publish(msg)
-> Enqueue(msg)
-> published_msg_queue_.push_front(msg)
-> Notify(msg)
-> call published_callbacks_IntraReader::Init()
will do the following, it will register OnMessage()
into published_callbacks
, which is a member variable of Blocker
class. 1
2
3BlockerManager::Instance()->Subscribe<MessageT>(
this->role_attr_.channel_name(), this->role_attr_.qos_profile().depth(), this->role_attr_.node_name(),
std::bind(&IntraReader<MessageT>::OnMessage, this, std::placeholders::_1))MODE_SIMULATION
is also clear.