• 209查看
  • 0回复

[Autosar] FastDDS读者问答之消费者异常

[复制链接]


该用户从未签到

发表于 30-4-2024 19:27:08 | 显示全部楼层 |阅读模式

汽车零部件采购、销售通信录       填写你的培训需求,我们帮你找      招募汽车专业培训老师


01

读者问题

最近有读者提问,FastDDS中处于共享内存模式下的消费者异常退出,比如说segment fault,生产者该如何进行异常处理?这块共享内存该如何处理?

02

问题回答

关于FastDDS的前置知识见如下链接:

中间件之FastDDS源代码阅读(五)

中间件之FastDDS源代码阅读(四)

中间件之FastDDS源代码阅读(三)

中间件之FastDDS 源代码阅读(二)

FastDDS图片

FastDDS在使用共享内存进行通信时,FastDDS会实例化一个共享内存看门狗。共享内存看门狗会周期性检查是否消费者是否在线,如果消费者异常退出或者异常下线,则调用相关函数关闭并清除共享内存。

相关代码如下所示:

SharedMemWatchdog是共享内存看门狗,

alive_check_timeout函数和check_alive函数检查是否超时和是否存活。close_and_remove函数,remove_segment函数就是在清除共享内存。

classSegmentWrapper    {public:
        SegmentWrapper()        {        }
        SegmentWrapper(std::weak_ptr<SharedMemManager> shared_mem_manager,std::shared_ptr<SharedMemSegment> segment_,                SharedMemSegment::Id segment_id,conststd::string& segment_name)            : shared_mem_manager_(shared_mem_manager)            , segment_(segment_)            , segment_id_(segment_id)            , segment_name_(segment_name)        {            lock_file_name_ = segment_name + "_el";            update_alive_time(std::chrono::steady_clock::now());        }
std::shared_ptr<SharedMemSegment> segment(){return segment_;        }
voidupdate_alive_time(conststd::chrono::steady_clock::time_point& time){            last_alive_check_time_.store(time.time_since_epoch().count());        }
/**         * Singleton task, for SharedMemWatchdog, that periodically checks opened segments         * to garbage collect those closed by the origin         */classWatchTask :public SharedMemWatchdog::Task        {public:
staticstd::shared_ptr<WatchTask>& get(){staticstd::shared_ptr<WatchTask> watch_task_instance(new WatchTask());return watch_task_instance;            }
voidadd_segment(std::shared_ptr<SegmentWrapper> segment){// Add added segments to the watched setstd::lock_guard<std::mutex> lock(to_add_remove_mutex_);
                to_add_.push_back(segment);            }
voidremove_segment(std::shared_ptr<SegmentWrapper> segment){// Add added segments to the watched setstd::lock_guard<std::mutex> lock(to_add_remove_mutex_);
                to_remove_.push_back(segment);            }
virtual ~WatchTask()            {                shared_mem_watchdog_->remove_task(this);            }
private:
std::unordered_map<std::shared_ptr<SegmentWrapper>, uint32_t> watched_segments_;std::unordered_map<std::shared_ptr<SegmentWrapper>, uint32_t>::iterator watched_it_;
std::mutex to_add_remove_mutex_;std::vector<std::shared_ptr<SegmentWrapper>> to_add_;std::vector<std::shared_ptr<SegmentWrapper>> to_remove_;
// Keep a reference to the SharedMemWatchdog so that it is not destroyed until this instance is destroyedstd::shared_ptr<SharedMemWatchdog> shared_mem_watchdog_;
            WatchTask()                : watched_it_(watched_segments_.end())                , shared_mem_watchdog_(SharedMemWatchdog::get())            {                shared_mem_watchdog_->add_task(this);            }
voidupdate_watched_segments(){// Add / remove segments to the watched mapstd::lock_guard<std::mutex> lock(to_add_remove_mutex_);
for (auto& segment : to_add_)                {auto segment_it = watched_segments_.find(segment);if (segment_it != watched_segments_.end())                    {// The segment already exists, just increase the references                        (*segment_it).second++;                    }else// New segment                    {                        watched_segments_.insert({segment, 1});                    }                }
                to_add_.clear();
for (auto& segment : to_remove_)                {auto segment_it = watched_segments_.find(segment);if (segment_it != watched_segments_.end())                    {                        (*segment_it).second--;
if ((*segment_it).second == 0)                        {                            watched_segments_.erase(segment_it);                        }                    }                }
                to_remove_.clear();            }
voidrun(){constexpruint32_t MAX_CHECKS_PER_BATCH {100};constexprstd::chrono::milliseconds PER_BATCH_SLEEP_TIME {10};
auto now = std::chrono::steady_clock::now();
// Segments check was completed in the last runif (watched_it_ == watched_segments_.end())                {// Add / remove requested segments                    update_watched_segments();                    watched_it_ = watched_segments_.begin();                }
autonow_t = std::chrono::steady_clock::now();// Maximum time for checking half the watchdog periodautolimit_t = now_t + SharedMemWatchdog::period() / 2;uint32_t batch_count = 0;
while (watched_it_ != watched_segments_.end() && now_t < limit_t)                {auto& segment = (*watched_it_).first;// The segment has not been check for much time...if (segment->alive_check_timeout(now))                    {if (!(*watched_it_).first->check_alive())                        {                            watched_it_ = watched_segments_.erase(watched_it_);                        }else                        {                            watched_it_++;                        }                    }else                    {                        watched_it_++;                    }
// Every batch a sleep is performed to avoid high resources consumptionif (++batch_count ==  MAX_CHECKS_PER_BATCH)                    {                        batch_count = 0;std::this_thread::sleep_for(PER_BATCH_SLEEP_TIME);now_t = std::chrono::steady_clock::now();                    }                }            }
        };

boolcheck_alive(){            if (!RobustExclusiveLock::is_locked(lock_file_name_))            {                // The segment is not locked so the origin is no longer active                close_and_remove();
                returnfalse;            }
            update_alive_time(std::chrono::steady_clock::now());
            returntrue;        }
        boolalive_check_timeout(                conststd::chrono::steady_clock::time_point& now)const{            std::chrono::steady_clock::time_point last_check_time(                std::chrono::nanoseconds(last_alive_check_time_.load()));
            returnstd::chrono::duration_cast<std::chrono::seconds>(now - last_check_time).count()                   >= ALIVE_CHECK_TIMEOUT_SECS;        }
        voidclose_and_remove(){            // Remove from the namespace (just in case the origin didn't do it)            SharedMemSegment::remove(segment_name_.c_str());
            if (auto shared_mem_manager = shared_mem_manager_.lock())            {                shared_mem_manager->release_segment(segment_id_);            }        }

03

问题扩展

我们进一步思考,消费者异常退出,但是执行管理发现程序异常退出并重启消费者,此时共享内存还能保留吗?

答:由于消费者重新启动,此时消费者的GUID是不一样的,FastDDS系统会重新生成一段新的共享内存,用这段共享内存和数据生产者通信。

FastDDS读者问答之消费者异常w1.jpg



END

快速发帖

您需要登录后才可以回帖 登录 | 注册

本版积分规则

QQ|手机版|小黑屋|Archiver|汽车工程师之家 ( 渝ICP备18012993号-1 )

GMT+8, 1-2-2025 07:41 , Processed in 0.284041 second(s), 32 queries .

Powered by Discuz! X3.5

© 2001-2013 Comsenz Inc.