Boost-Asio开发的相关基础知识(三):Strand序列化执行用户回调

请参看Boost.Asio的官方文档,上面清楚的描述道:

strand可以保证严格序列化地(而不会并行地)执行event handlers,通过使用strand可以保证安全的在多线程环境下执行程序,而不需要使用者显式的使用类似mutex等方式来进行保护和同步操作。strand的可以以隐式(implicit)或者显式(explicit)的方式存在:

(1). 如果只在一个线程中调用io_service::run(),那么所有的event handlers都是在一个隐式strand中序列化执行的;

(2). 对于一个连接如果只有一个单链异步操作的时候,其不可能并发的执行event handlers,这也是隐式strand的。(文中提到了比如HTTP的单双工协议,我的理解是HTTP只会进行客户端请求、服务端返回数据这种形式,没有并行执行event handler的可能性);

(3). 可以显式实例化一个strand对象,然后所有的event handlers必须使用io_service::strand::wrap()进行包裹,或者使用该strand对象显式调用post()/dispatch()发布异步操作请求;

(4). 对于composed asynchronous operations的操作,比如async_read()或者async_read_until(),其中间会多次调用async_read_some(),为了保证对象的安全,所有的intermediate handlers都必须在同一个strand中被串行化执行。

基本上strand对象创建之后,就从另外一个层次替换了io_service的行为,因为strand提供了和io_service_极其类似的成员函数:

(1). dispatch(CompletionHandler handler); 请求strand去执行handler,strand对象保证通过该strand添加的handlers不会被并行的执行,如果这个函数是在相同strand对象对应的handler中被调用的,那么该handler将会在函数中被立即执行(s.running_in_this_thread() == true);

(2). post(CompletionHandler handler); 请求strand去执行handler并立即返回;

(3). get_io_service()

(4). running_in_this_thread(); 如果当前线程正在执行由该strand提交的handler,则返回true;

(5). wrap(Handler handler); 创建一个函数对象,当被invoke的时候会自动传递到strand的dispatch函数;

strand串行化执行的顺序问题:

在strand的文档中,对提交进取的多个handler需要串行化执行,执行的顺序有部分的保证。在以下情况下会保证asio_handler_invoke(a1, &a1)会先于执行asio_handler_invoke(b1, &b1):

(1). s.post(a)先于执行s.post(b);

(2). s.post(a)先于执行s.dispatch(b),同时后者在strand外执行的;

(3). s.dispatch(a)先于执行s.post(b),并且前者是在strand外执行的;

(4). s.dispatch(a)先于执行s.dispatch(b),同时他们都是在strand外执行的;

(5). async_op_1(…, s.wrap(a)); async_op_2(…, s.wrap(b));这两者的执行顺序没有任何的保证,而strand所能给予的保证是a1和b1不会并行执行,如果s.wrap(x1)先被执行,那么x1也会先被执行;

其实,其要诀就是:如果在strand中,那么dispatch会直接在调用函数中执行,否则按照添加到队列中的顺序来排队执行。举个例子,比如假设

autowrapped_handler1 = strand.wrap(handler1);
autowrapped_handler2 = strand.wrap(handler2);
socket.async_read_some(buffer1, wrapped_handler1); // op1
socket.async_read_some(buffer2, wrapped_handler2); // op2

由于op1先于op2启动,所以保证buffer1在stream中接收到的数据是先于buffer2的,但是wrapped_handler1和wrapped_handler2的调用顺序是没有保证的,strand做出的保证是:

(1). handler1和handler2不会并发的执行;

(2). 如果wrapped_handler1先于wrapped_handler2被执行,那么handler1先于handler2被执行,反之亦然。

strand类定义在[strand.hpp]boost::asio::io_service::strand,这个类只是个空壳,主要包括两个数据成员detail::strand_service& service_和detail::strand_service::implementation_type impl_两个成员,具体实现需要查看strand_service类的实现细节,该类有几个重要的成员变量:

(1). io_service_impl& io_service_; 构造strand的时候传递进来的io_service对象;

(2). detail::mutex mutex_; 主要用来保护下面的locked_等内部变量;

(3). bool locked_; 如果当前有其他的handler正在被执行或正在被调度执行,那么这个变量是true,如果此时有新的handler需要被加入就需要等待;

(4). op_queue

waiting_queue_; 正在等待strand但是除非等到下次strand调度,否则不应当被运行,修改时候需要mutex_保护;

(5). op_queue ready_queue_; 已经拥有了lock_,即将被运行的队列;

想必看到上面的成员之后,对strand的串行化原理会猜个八九分了,但是如我们之前所跟的,一个async_read_some()调用的话,descriptor_state和reactor_op会多次加入到io_service上面,对于这个二段式的操作,其序列化需求还是跟之前的io_service直接调度有些区别的吧。strand其提供最常用的接口包括dispatch、post、wrap,我们可以从这些函数中了解strand串行化的机制:

(1). dispatch

如果当前的线程正在执行strand,那么就直接调用这个handler:

if(call_stack<strand_impl>::contains(impl)) {
fenced_block b(fenced_block::full);
 boost_asio_handler_invoke_helpers::invoke(handler, handler);
return;
}

其中的invoke调用代码为:

usingboost::asio::asio_handler_invoke;
asio_handler_invoke(function, boost::asio::detail::addressof(context));

可见这个asio_handler_invoke就是在 strand介绍文档 中看到的,其默认操作就是直接调用用户提供的handler。关于这个asio_handler_invoke,如果要深究下去东西也很多,可以参见参考中的 When to use asio_handler_invoke? ,其主要思想是提供了一个可供记录的上下文环境context,因为比如composed operation中, intermediate handler可能会被创建零或者多次,这个状态必须在外层的wrap中保留下来才可以。不过在上面的例子中,貌似没有做什么额外的事情。

否则上面的dispatch()会调用strand_service::do_dispatch(),这里的判断就更明显了

if(can_dispatch && !impl->locked_){
 impl->locked_ = true;
 impl->mutex_.unlock();
returntrue;
}

if(impl->locked_){
 impl->waiting_queue_.push(op);
 impl->mutex_.unlock();
}
else{
 impl->locked_ = true;
 impl->mutex_.unlock();
 impl->ready_queue_.push(op);
 io_service_.post_immediate_completion(impl, false);
}

如果当前是执行strand的线程并且没有lock_,那么就返回true,效果是直接执行handler;否则如果lock_了就添加到waiting_queue_上面;再则没有lock_就添加到ready_queue_队列上面,此时通过post_immediate_completion添加到io_service_.op_queue_上面被调度执行;

如果上面返回是true,此处就是在do_dispatch()函数内部执行了,通过设置on_dispatch_exit的RAII,在此次调用完后会把waiting_queue_的handler全部移动到ready_queue_,如果ready_queue_中真的有元素,就设置lock_为ture,并将队列中的handler添加到io_service_.op_queue_上面去。

上面介绍了都是善后工作,真正的函数内调用代码为:

completion_handler<Handler>::do_complete(
 &io_service_, o, boost::system::error_code(), 0);

static void do_complete(io_service_impl* owner, operation* base,
 const boost::system::error_code& /*ec*/, std::size_t /*bytes_transferred*/)
 {
 // Take ownership of the handler object.
 completion_handler* h(static_cast<completion_handler*>(base));
 ptr p = { boost::asio::detail::addressof(h->handler_), h, h };
...
 Handler handler(BOOST_ASIO_MOVE_CAST(Handler)(h->handler_));
 p.h = boost::asio::detail::addressof(handler);
 p.reset();

 // Make the upcall if required.
 if (owner) {
 fenced_block b(fenced_block::half);
 BOOST_ASIO_HANDLER_INVOCATION_BEGIN(());
 boost_asio_handler_invoke_helpers::invoke(handler, handler);
 BOOST_ASIO_HANDLER_INVOCATION_END;
 }
 }

这里调用的时候owner不是空的,所以会调用用户提供的handler。

(2). post

post的逻辑就比上面要简单很多了,如果locked_==true,那么就直接添加到waiting_queue_,否则的话说明当前strand没有运行,就设置lock_=true并添加到ready_queue_队列上,同时添加到io_service_.op_queue_上面等待调度执行;

(3). wrap

wrap函数算是最常用的函数了,当原来所有的async_xxxx所传入的handler,都可以直接使用strand_.wrap进行包装,就可以保证在多线程环境下序列化调用安全了。其wrap成员函数使用了detail::wrapped_handler进行包装,类成员也就dispatcher_和handler_两个成员变量。

wrap(Handler handler) {
returndetail::wrapped_handler<io_service::strand, Handler,
 detail::is_continuation_if_running>(*this, handler);
}

由于strand在我们操作接口中的角色就是添加了一个wrap,基本的业务流程还是在io_service中进行的,所以这里预测,有无strand.wrap的差异也就是在需要调用的handler的期间:

当socket的IO操作完成之后,会继续调用o->complete(*this, ec, task_result);,此时会按照如下的调用链:

task_io_service_operation::complete() -> boost_asio_handler_invoke_helpers::invoke(handler, handler.handler_);
-> asio_handler_invoke(function, boost::asio::detail::addressof(context));

asio_handler_invoke(Function& function,
 wrapped_handler<Dispatcher, Handler, IsContinuation>* this_handler)
{
 this_handler->dispatcher_.dispatch(
 rewrapped_handler<Function, Handler>(
 function, this_handler->handler_));
}

这里看出了dispatcher_.dispatch(),就跟上面分析的dispatch联系起来了,也就是上面的成员函数添加真正的handler。由之前的分析知道,用户提供的handler是在实际的IO执行完成之后才会回掉的,所以可以看出这里的strand不会保护底层的IO操作,只会保护用户提供的回调handler的串行化执行。

参考

我来评几句
登录后评论

已发表评论数()

相关站点

热门文章