|
汽车零部件采购、销售通信录 填写你的培训需求,我们帮你找 招募汽车专业培训老师
01
读者问题
最近有读者提问,FastDDS中处于共享内存模式下的消费者异常退出,比如说segment fault,生产者该如何进行异常处理?这块共享内存该如何处理?
02
问题回答
关于FastDDS的前置知识见如下链接:
中间件之FastDDS源代码阅读(五)
中间件之FastDDS源代码阅读(四)
中间件之FastDDS源代码阅读(三)
中间件之FastDDS 源代码阅读(二)
FastDDS图片
FastDDS在使用共享内存进行通信时,FastDDS会实例化一个共享内存看门狗。共享内存看门狗会周期性检查是否消费者是否在线,如果消费者异常退出或者异常下线,则调用相关函数关闭并清除共享内存。
相关代码如下所示:
SharedMemWatchdog是共享内存看门狗,
alive_check_timeout函数和check_alive函数检查是否超时和是否存活。close_and_remove函数,remove_segment函数就是在清除共享内存。
classSegmentWrapper {public:
SegmentWrapper() { }
SegmentWrapper(std::weak_ptr<SharedMemManager> shared_mem_manager,std::shared_ptr<SharedMemSegment> segment_, SharedMemSegment::Id segment_id,conststd::string& segment_name) : shared_mem_manager_(shared_mem_manager) , segment_(segment_) , segment_id_(segment_id) , segment_name_(segment_name) { lock_file_name_ = segment_name + "_el"; update_alive_time(std::chrono::steady_clock::now()); }
std::shared_ptr<SharedMemSegment> segment(){return segment_; }
voidupdate_alive_time(conststd::chrono::steady_clock::time_point& time){ last_alive_check_time_.store(time.time_since_epoch().count()); }
/** * Singleton task, for SharedMemWatchdog, that periodically checks opened segments * to garbage collect those closed by the origin */classWatchTask :public SharedMemWatchdog::Task {public:
staticstd::shared_ptr<WatchTask>& get(){staticstd::shared_ptr<WatchTask> watch_task_instance(new WatchTask());return watch_task_instance; }
voidadd_segment(std::shared_ptr<SegmentWrapper> segment){// Add added segments to the watched setstd::lock_guard<std::mutex> lock(to_add_remove_mutex_);
to_add_.push_back(segment); }
voidremove_segment(std::shared_ptr<SegmentWrapper> segment){// Add added segments to the watched setstd::lock_guard<std::mutex> lock(to_add_remove_mutex_);
to_remove_.push_back(segment); }
virtual ~WatchTask() { shared_mem_watchdog_->remove_task(this); }
private:
std::unordered_map<std::shared_ptr<SegmentWrapper>, uint32_t> watched_segments_;std::unordered_map<std::shared_ptr<SegmentWrapper>, uint32_t>::iterator watched_it_;
std::mutex to_add_remove_mutex_;std::vector<std::shared_ptr<SegmentWrapper>> to_add_;std::vector<std::shared_ptr<SegmentWrapper>> to_remove_;
// Keep a reference to the SharedMemWatchdog so that it is not destroyed until this instance is destroyedstd::shared_ptr<SharedMemWatchdog> shared_mem_watchdog_;
WatchTask() : watched_it_(watched_segments_.end()) , shared_mem_watchdog_(SharedMemWatchdog::get()) { shared_mem_watchdog_->add_task(this); }
voidupdate_watched_segments(){// Add / remove segments to the watched mapstd::lock_guard<std::mutex> lock(to_add_remove_mutex_);
for (auto& segment : to_add_) {auto segment_it = watched_segments_.find(segment);if (segment_it != watched_segments_.end()) {// The segment already exists, just increase the references (*segment_it).second++; }else// New segment { watched_segments_.insert({segment, 1}); } }
to_add_.clear();
for (auto& segment : to_remove_) {auto segment_it = watched_segments_.find(segment);if (segment_it != watched_segments_.end()) { (*segment_it).second--;
if ((*segment_it).second == 0) { watched_segments_.erase(segment_it); } } }
to_remove_.clear(); }
voidrun(){constexpruint32_t MAX_CHECKS_PER_BATCH {100};constexprstd::chrono::milliseconds PER_BATCH_SLEEP_TIME {10};
auto now = std::chrono::steady_clock::now();
// Segments check was completed in the last runif (watched_it_ == watched_segments_.end()) {// Add / remove requested segments update_watched_segments(); watched_it_ = watched_segments_.begin(); }
autonow_t = std::chrono::steady_clock::now();// Maximum time for checking half the watchdog periodautolimit_t = now_t + SharedMemWatchdog::period() / 2;uint32_t batch_count = 0;
while (watched_it_ != watched_segments_.end() && now_t < limit_t) {auto& segment = (*watched_it_).first;// The segment has not been check for much time...if (segment->alive_check_timeout(now)) {if (!(*watched_it_).first->check_alive()) { watched_it_ = watched_segments_.erase(watched_it_); }else { watched_it_++; } }else { watched_it_++; }
// Every batch a sleep is performed to avoid high resources consumptionif (++batch_count == MAX_CHECKS_PER_BATCH) { batch_count = 0;std::this_thread::sleep_for(PER_BATCH_SLEEP_TIME);now_t = std::chrono::steady_clock::now(); } } }
};
boolcheck_alive(){ if (!RobustExclusiveLock::is_locked(lock_file_name_)) { // The segment is not locked so the origin is no longer active close_and_remove();
returnfalse; }
update_alive_time(std::chrono::steady_clock::now());
returntrue; }
boolalive_check_timeout( conststd::chrono::steady_clock::time_point& now)const{ std::chrono::steady_clock::time_point last_check_time( std::chrono::nanoseconds(last_alive_check_time_.load()));
returnstd::chrono::duration_cast<std::chrono::seconds>(now - last_check_time).count() >= ALIVE_CHECK_TIMEOUT_SECS; }
voidclose_and_remove(){ // Remove from the namespace (just in case the origin didn't do it) SharedMemSegment::remove(segment_name_.c_str());
if (auto shared_mem_manager = shared_mem_manager_.lock()) { shared_mem_manager->release_segment(segment_id_); } }
03
问题扩展
我们进一步思考,消费者异常退出,但是执行管理发现程序异常退出并重启消费者,此时共享内存还能保留吗?
答:由于消费者重新启动,此时消费者的GUID是不一样的,FastDDS系统会重新生成一段新的共享内存,用这段共享内存和数据生产者通信。
END |
|