| 
 | 
	
汽车零部件采购、销售通信录       填写你的培训需求,我们帮你找      招募汽车专业培训老师 
 
 
 
 
01 
 
读者问题 
 
最近有读者提问,FastDDS中处于共享内存模式下的消费者异常退出,比如说segment fault,生产者该如何进行异常处理?这块共享内存该如何处理? 
 
02 
 
问题回答 
 
关于FastDDS的前置知识见如下链接: 
 
中间件之FastDDS源代码阅读(五) 
 
中间件之FastDDS源代码阅读(四) 
 
中间件之FastDDS源代码阅读(三) 
 
中间件之FastDDS 源代码阅读(二) 
 
FastDDS图片 
 
FastDDS在使用共享内存进行通信时,FastDDS会实例化一个共享内存看门狗。共享内存看门狗会周期性检查是否消费者是否在线,如果消费者异常退出或者异常下线,则调用相关函数关闭并清除共享内存。 
 
相关代码如下所示: 
 
SharedMemWatchdog是共享内存看门狗, 
 
alive_check_timeout函数和check_alive函数检查是否超时和是否存活。close_and_remove函数,remove_segment函数就是在清除共享内存。 
 
classSegmentWrapper    {public: 
        SegmentWrapper()        {        } 
        SegmentWrapper(std::weak_ptr<SharedMemManager> shared_mem_manager,std::shared_ptr<SharedMemSegment> segment_,                SharedMemSegment::Id segment_id,conststd::string& segment_name)            : shared_mem_manager_(shared_mem_manager)            , segment_(segment_)            , segment_id_(segment_id)            , segment_name_(segment_name)        {            lock_file_name_ = segment_name + "_el";            update_alive_time(std::chrono::steady_clock::now());        } 
std::shared_ptr<SharedMemSegment> segment(){return segment_;        } 
voidupdate_alive_time(conststd::chrono::steady_clock::time_point& time){            last_alive_check_time_.store(time.time_since_epoch().count());        } 
/**         * Singleton task, for SharedMemWatchdog, that periodically checks opened segments         * to garbage collect those closed by the origin         */classWatchTask :public SharedMemWatchdog::Task        {public: 
staticstd::shared_ptr<WatchTask>& get(){staticstd::shared_ptr<WatchTask> watch_task_instance(new WatchTask());return watch_task_instance;            } 
voidadd_segment(std::shared_ptr<SegmentWrapper> segment){// Add added segments to the watched setstd::lock_guard<std::mutex> lock(to_add_remove_mutex_); 
                to_add_.push_back(segment);            } 
voidremove_segment(std::shared_ptr<SegmentWrapper> segment){// Add added segments to the watched setstd::lock_guard<std::mutex> lock(to_add_remove_mutex_); 
                to_remove_.push_back(segment);            } 
virtual ~WatchTask()            {                shared_mem_watchdog_->remove_task(this);            } 
private: 
std::unordered_map<std::shared_ptr<SegmentWrapper>, uint32_t> watched_segments_;std::unordered_map<std::shared_ptr<SegmentWrapper>, uint32_t>::iterator watched_it_; 
std::mutex to_add_remove_mutex_;std::vector<std::shared_ptr<SegmentWrapper>> to_add_;std::vector<std::shared_ptr<SegmentWrapper>> to_remove_; 
// Keep a reference to the SharedMemWatchdog so that it is not destroyed until this instance is destroyedstd::shared_ptr<SharedMemWatchdog> shared_mem_watchdog_; 
            WatchTask()                : watched_it_(watched_segments_.end())                , shared_mem_watchdog_(SharedMemWatchdog::get())            {                shared_mem_watchdog_->add_task(this);            } 
voidupdate_watched_segments(){// Add / remove segments to the watched mapstd::lock_guard<std::mutex> lock(to_add_remove_mutex_); 
for (auto& segment : to_add_)                {auto segment_it = watched_segments_.find(segment);if (segment_it != watched_segments_.end())                    {// The segment already exists, just increase the references                        (*segment_it).second++;                    }else// New segment                    {                        watched_segments_.insert({segment, 1});                    }                } 
                to_add_.clear(); 
for (auto& segment : to_remove_)                {auto segment_it = watched_segments_.find(segment);if (segment_it != watched_segments_.end())                    {                        (*segment_it).second--; 
if ((*segment_it).second == 0)                        {                            watched_segments_.erase(segment_it);                        }                    }                } 
                to_remove_.clear();            } 
voidrun(){constexpruint32_t MAX_CHECKS_PER_BATCH {100};constexprstd::chrono::milliseconds PER_BATCH_SLEEP_TIME {10}; 
auto now = std::chrono::steady_clock::now(); 
// Segments check was completed in the last runif (watched_it_ == watched_segments_.end())                {// Add / remove requested segments                    update_watched_segments();                    watched_it_ = watched_segments_.begin();                } 
autonow_t = std::chrono::steady_clock::now();// Maximum time for checking half the watchdog periodautolimit_t = now_t + SharedMemWatchdog::period() / 2;uint32_t batch_count = 0; 
while (watched_it_ != watched_segments_.end() && now_t < limit_t)                {auto& segment = (*watched_it_).first;// The segment has not been check for much time...if (segment->alive_check_timeout(now))                    {if (!(*watched_it_).first->check_alive())                        {                            watched_it_ = watched_segments_.erase(watched_it_);                        }else                        {                            watched_it_++;                        }                    }else                    {                        watched_it_++;                    } 
// Every batch a sleep is performed to avoid high resources consumptionif (++batch_count ==  MAX_CHECKS_PER_BATCH)                    {                        batch_count = 0;std::this_thread::sleep_for(PER_BATCH_SLEEP_TIME);now_t = std::chrono::steady_clock::now();                    }                }            } 
        }; 
 
boolcheck_alive(){            if (!RobustExclusiveLock::is_locked(lock_file_name_))            {                // The segment is not locked so the origin is no longer active                close_and_remove(); 
                returnfalse;            } 
            update_alive_time(std::chrono::steady_clock::now()); 
            returntrue;        } 
        boolalive_check_timeout(                conststd::chrono::steady_clock::time_point& now)const{            std::chrono::steady_clock::time_point last_check_time(                std::chrono::nanoseconds(last_alive_check_time_.load())); 
            returnstd::chrono::duration_cast<std::chrono::seconds>(now - last_check_time).count()                   >= ALIVE_CHECK_TIMEOUT_SECS;        } 
        voidclose_and_remove(){            // Remove from the namespace (just in case the origin didn't do it)            SharedMemSegment::remove(segment_name_.c_str()); 
            if (auto shared_mem_manager = shared_mem_manager_.lock())            {                shared_mem_manager->release_segment(segment_id_);            }        } 
 
03 
 
问题扩展 
 
我们进一步思考,消费者异常退出,但是执行管理发现程序异常退出并重启消费者,此时共享内存还能保留吗? 
 
答:由于消费者重新启动,此时消费者的GUID是不一样的,FastDDS系统会重新生成一段新的共享内存,用这段共享内存和数据生产者通信。 
 
 
 
 
 
 
END |   
 
 
 
 |