C++基于任务的异步计算模型

C++中的任务(task-based)是区别于传统的异步事件开发和多线程开发的一种更高级的开发手法,为更高级于线程的一种同步并发编程模型的抽象,他让使用者只需要关注通过特定的参数和逻辑完成特定的工作,并最终返回相应的结果,而不必关注线程的创建管理、数据传输、锁机制等底层细节性的东西,让开发者的精力可以更加集中与应用程序逻辑的实现。

C++的异步计算模型由future、promise、package_task和async这几个组件组成,他们实现的关键点是允许两个任务之间传输值,而无需显式的使用锁机制:任务执行的结果放入到一个promise中,关心需要此任务结果的角色则可以从future中提取结果,联系promise和future的是一个称之为共享状态(shared state)的对象,其构成除了任务正常执行完成通过set_value保存值、或者执行发生异常后通过set_exception保存异常信息之外,他还应该包含两个thread之间安全交换数据所需的信息、一个就绪信息表示是否可供future提取结果等部分构成。

上述结构实现了方便的进程间交换执行结果,如果使用std::thread自创线程,这种交换数据就必须使用一个共享变量来实现。

当然,Facebook的Folly库也有一个改良版的 future实现 ,现在还没能力高攀这个传说中不存在公司的高大上组件,话说Facebook的Call Chain风格还真显特色,不过其对美国联邦政府的态度真的让我唏嘘。

一、C++任务中组成部分解析

1.1 promise

promise为一个共享状态的句柄,其最有用的两个成员函数就是set_value和set_exception。promise没有拷贝操作,只支持移动操作,并且调用set_value/exception只能执行一次,否则会抛出future_error异常。

相比于传统的线程方式执行任务,一旦任务发生异常,默认情况下线程将会终止执行,从而默认导致整个程序的挂起,这里通过传递异常的方式,不仅让程序更加的稳健,而且调用者可以据此做相应的处理。

promise传递单一结果的值是被移入、移除共享状态的,所以可以快速高效的传递大尺寸对象。

1.2 package_task

package_task是标准库用于简化任务连接future和promise的类型,可以快速创建打包某个任务,其内部保存了一个任务和一个promise/future对。package_task的封装代码,会在执行任务(一个函数或者可调用对象)到return或throw的时候,自动负责将任务执行的返回值或者异常放入一个promise中,其promise的使用完全由package_task自动处理;而后通过调用valid()可以探测器内部是否包含一个未被移出过的就绪的共享状态;而get_future()可以向一个package_task发起请求,其会返回任务对应的future,用户通过这个future就可以提取任务执行的结果了。

1.3 future

future是访问share state的句柄,用于提取任务执行结果的存放的地方。通过valid()可以检测该future是否有效,而get()可以将结果移出,所以get()也只能被调用一次。

注意如果future

的值类型T为void或者一个引用,则对get()的调用具有特殊的规则限定:

(1). future

::get()不返回值,它只是简单返回到调用者,或者抛出一个异常;

(2). future

::get()返回一个T&。因为引用类型不是具体的对象,所以标准库会进行一些转换以得到一个T&结果,比如T*会解引用然后返回一个T&。

此外future还提供了wait()、wait_for()、wait_until()这几个阻塞调用来获取future的状态,其得到的结果可能是:ready(已经包含一个有效值)、timeout(阻塞直到超时)、deferred(任务被推迟执行,直到调用get())。

1.4 shared_future

shared_future是应对future的值只能被读取一次而设计的,针对需要反复读取或者多个读者需要读取结果的情况,就必须拷贝结果,让这些访问者读取副本即可。shared_future就是通过直接或者间接从具有相同类型的future中移出值来进行初始化,然后可以被反复读取的。

通过future初始化shared_future,他们两者之间的通信细节就被隐藏起来了,而且future具备的操作接口,shared_future也基本都具备。

因为shared_future 是被共享的,所以对其调用get()默认返回的类型是const T&以保证多个读者访问安全。

1.5 async

经过上面的组件,就可以完成创建任务、提取结果等各项操作,最后的一个组件就是一部任务启动器async,由他决定是否创建一个新thread、回收一个旧thread、亦或者简单地在当前thread上面运行执行任务。

template< class Function, class... Args >
std::future<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)>>
    async( std::launch policy, Function&& f, Args&&... args );

上面的policy参数具有:launch::async和launch::deferred两种类型,前者就像是创建了一个新thread在一个独立的环境下执行任务一样,而后者只有对任务的future执行get()的时候才执行任务,既任务的执行被延后了。如果什么都不指定,则默认策略是async|deferred,由编译器的实现者根据某种信息或者参数来选择合适的策略来执行当前的任务。

二、讨论

2.1 thread和task-based之差异对比

task-based库产生的目的,就是为了简化程序员并行开发所需要的工作量和额外的细节处理。

在系统中,线程是系统宝贵的资源,意味着系统创建的线程不可能无限制的多,超出系统所能提供的线程数目后再创建线程就会抛出std::system_error的异常,即使线程执行函数不会抛出异常,你可能还是需要关注这个异常的存在,并做相应的重试处理?

过多的线程常常会降低系统的效率,一方面线程context switch会占用系统资源,所以科学的线程数和硬件并发数目比可能需要调教,而且线程如果没有CPU粘滞性在多个core上面切换,那么对CPU的缓存将是绝大的伤害,从而严重影响线程执行的性能。

在C++11标准出来之后,这些脏活累活就可以全部丢给编译器实现者去了。通过task-based创建的任务,标准库可以根据环境和系统资源决定是否创建新的线程来执行该任务,从而避免上面超过系统线程数目抛出std::system_error异常;编译器可以实现更好的调度算法和work-stealing算法,从而保证整个系统有比较好的复杂均衡,而且这些决策都是system级别的而非program级别的。

不过也有一些情况,我们还是希望使用std::thread线程库甚至pthread这样更底层的线程实现来完成我们的服务,比如:

(1). 我们需要访问一些更底层的线程实现的其他接口,比如设置线程属性、控制线程的优先级等操作;

(2). 在特定环境上的服务进程,其系统资源是可估量的,此时可以使用std::thread进行更加确定的线程资源的分配操作。

2.2 task-based之启动策略

task-based有两种启动策略可供选择:launch::async和launch::deferred。

launch::async要求任务的执行必须是异步的,比如立刻创建另外一个线程中执行,或者回收之前空闲的线程资源来执行。这里当初看到过一篇文章,不同的编译器的实现是不一样的,有的采用线程池实现,有的直接one thread per task创建新的线程。

launch::deferred要求只有在对future进行get()或者wait()调用的时候,这个任务才会被同步执行,既惰性求值。此时调用者将会阻塞等待任务完成返回,这边没有文献明确说这个任务是在当前线程中执行,还是开辟新的线程执行,但是caller会被阻塞直到future的结果ready,所以对于使用者没有差异。而且如果get()或者wait()没有被调用,那么这个任务将永远不会执行。

使用默认的调度策略,可以把上面的选择难题交给标准库去实现,但是正是因为默认启动策略的不确定性,有些局面就难以把控,比如调用线程和任务线程并行执行导致某些竞争条件,亦或者调用端和任务端有某种依赖,而导致循环等待形成一种死锁。

auto fut = std::async(f);
if(fut.wait_for(0s) == std::future_status::deferred) {
    // use wait or get trigger task run sync
} else {
    while(fut.wait_for(100ms) != std::future_status::ready) {
        // sleep wait
    }
}

上面的例子正是因为async默认启动策略的不确定性,如果不添加deferred类型的检测,而恰好此时启动策略被选成了launch::deferred,则上面的wait_for将是一个死锁,因为任务永远不能被触发执行。

在实际使用中,上面的两种启动策略也没有绝对的优劣,通常需要根据业务类型进行测试才行。还有就是在调试程序的时候可以先使用launch::deferred策略,因为我们知道这种策略任务的执行是完全串行化的,那么就可以排除问题的出现时否是并发因素导致的。

三、小例子

这里用future模拟一个网络服务端的server端,其实看上来用packaged_task创建任务还是比较容易的,但是相对来说直接用std::future和std::async可控性更大,使用起来更加的灵活。

下面的这个例子表述了最初级的服务的运用,不过接收客户端链接和服务是串行执行的,这个版本的服务端TPS只能是1,即使我们用siege测试的并发连接数再多也不行。

#include <sys/types.h> 
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <iostream>
#include <future>
#include <unistd.h>

using namespace std;

int handle_echo(int socket, struct sockaddr inaddr, int addrlen) {
    char hbuf[NI_MAXHOST]{}, sbuf[NI_MAXSERV]{};
    char buff[4096]{};
    int ret = ::getnameinfo (&inaddr, addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf),
                      NI_NUMERICHOST | NI_NUMERICSERV);
    if(ret == 0) {
        std::cerr <<"Accept Connect:" << hbuf << ", " << sbuf << endl;
        ::read(socket, buff, sizeof(buff)); // std::cerr << "[" << buff << "]" << endl;
        std::string ret_str = "HTTP/1.0 200 OK\r\nContent-Length:4\r\n\r\nGOOD";
        ::write(socket, ret_str.c_str(), ret_str.size());
        ::sleep(1); // 模拟并发性
    } else {
        std::cerr << "getnameinfo failed: " << ret << std::endl;
    }
    return ret;
}

int main(int argc, char* argv[]) {

    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(7699);
    addr.sin_addr.s_addr = htonl(INADDR_ANY);

    int lsocket = ::socket(AF_INET, SOCK_STREAM, 0);
    ::bind(lsocket, (struct sockaddr *)&addr, sizeof(struct sockaddr_in));
    ::listen(lsocket, 50);
    std::cerr << "listening @7699 ..." << std::endl;

    while(true) {
        struct sockaddr inaddr;
        socklen_t in_len = sizeof(struct sockaddr);
        int infd = ::accept(lsocket, &inaddr, ∈_len);
        std::packaged_task<int(int, struct sockaddr, int)> task(handle_echo);
        std::future<int> result = task.get_future();
#if 0
        task(infd, inaddr, in_len);
#else
        std::thread task_td(std::move(task), infd, inaddr, in_len);
        task_td.join();
#endif
        std::cout << result.get() << std::endl;
    }
    return 0;
}

因为std::future和std::packaged_task两者都是可移动的,作为一种常见的耦合手段,可以借助容器也可以增加服务端的并发和并行计算的能力。

下面是一个简单的并行计算的模式,我们之前介绍知道std::launch::deferred启动策略是延迟后在get()调用线程中执行的,所以该策略下服务的并发性能限制于调用get()的线程数目;而std::launch::async则是另外启动线程来完成任务的,在siege并发数为10的情况下,下面代码的TPS达到了27,测试结果和上述理论是相符的!

#include <sys/types.h> 
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <iostream>
#include <future>
#include <unistd.h>
#include <list>
using namespace std;

#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>

boost::mutex mutex_;
std::list<std::future<int>> futures_;

void collect_func() {
    bool empty = false;

    while(true) {
        if(empty) 
            ::usleep(20*1000); // 20ms
        empty = true;

        {
            boost::unique_lock<boost::mutex> lock(mutex_);
            for (auto it = futures_.begin(); it != futures_.end(); ) {
                if (it->valid()) {
                    empty = false;
                    std::cout << "result: " << it->get() << std::endl;
                    it = futures_.erase(it);
                } else {
                    ++it;
                }
            }
        }
    }
}

int handle_echo(int socket, struct sockaddr inaddr, int addrlen) {
    char hbuf[NI_MAXHOST]{}, sbuf[NI_MAXSERV]{};
    char buff[4096]{};
    int ret = ::getnameinfo (&inaddr, addrlen, hbuf, sizeof(hbuf), sbuf, sizeof(sbuf),
                      NI_NUMERICHOST | NI_NUMERICSERV);
    if(ret == 0) {
        std::cerr <<"Accept Connect:" << hbuf << ", " << sbuf << endl;
        ::read(socket, buff, sizeof(buff)); // std::cerr << "[" << buff << "]" << endl;
        std::string ret_str = "HTTP/1.0 200 OK\r\nContent-Length:4\r\n\r\nGOOD";
        ::write(socket, ret_str.c_str(), ret_str.size());
        ::sleep(1); // 模拟并发性
    } else {
        std::cerr << "getnameinfo failed: " << ret << std::endl;
    }

    return ret;
}

int main(int argc, char* argv[]) {

    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(7699);
    addr.sin_addr.s_addr = htonl(INADDR_ANY);

    int lsocket = ::socket(AF_INET, SOCK_STREAM, 0);
    ::bind(lsocket, (struct sockaddr *)&addr, sizeof(struct sockaddr_in));
    ::listen(lsocket, 50);
    std::cerr << "listening @7699 ..." << std::endl;

    // future result collect task
    std::thread collect(collect_func);

    while(true) {
        struct sockaddr inaddr;
        socklen_t in_len = sizeof(struct sockaddr);
        int infd = ::accept(lsocket, &inaddr, ∈_len);
        
        {
            std::future<int> async_future = std::async(std::launch::async, 
                boost::bind(handle_echo, infd, inaddr, in_len));
            //std::future<int> async_future = std::async(std::launch::deferred, 
            //  boost::bind(handle_echo, infd, inaddr, in_len));

            boost::unique_lock<boost::mutex> lock(mutex_);
            futures_.push_back(std::move(async_future));
        }
    }

    collect.join();
    return 0;
}

这里还需要额外说明一下的是,无论代码中是否显式使用到了thread,只要用到future就需要在程序 连接中增加pthread库 ,不然的话虽然编译连接程序不出错,但是在执行的时候会给你抛出system_err的异常,这个小注意点折腾了我很长时间才摸索出来!

本文完!

我来评几句
登录后评论

已发表评论数()