协程

why 协程

协程的两大特点:

  1. 占用的资源更少;
  2. 所有的切换和调度都发生在用户态。

协程是一种轻量级、用户态的执行单元。拥有自己的寄存器上下文和栈。协程调度切换时,把寄存器上下文和栈保存到其他地方。在切回来的时候再恢复回来。

线程:抢占式多任务。

协程:协作式多任务。

不管是进程还是线程,阻塞、切换调度的时候都需要陷入系统调用,由CPU跑调度程序,再由调度程序决定跑哪一个进程(线程)。而且由于抢占式调度无法确定执行顺序,所以需要考虑同步问题。而协程没有这些问题。协作式的任务,是由用户自己出让CPU时间片来让其他任务得到调度的。

关于微信使用协程进行改造:

https://www.infoq.cn/article/CplusStyleCorourtine-At-Wechat/

go中的协程

package main

import (
        "fmt"
        "time"
        "runtime"
)

func sleep(s string) {
        var i int = 0 
        for {
                i += 1
                fmt.Println(i)
                fmt.Println(s)
                time.Sleep(time.Second)
        }   
}

func main() {
        fmt.Println(runtime.GOMAXPROCS(0))
        go sleep("Hello")
        go sleep("World")
        time.Sleep(1000 * time.Second)
}

go中的time.sleep不会使线程休眠,而是使协程休眠。其实两个协程是运行在一个线程上的,同构这句: fmt.Println(runtime.GOMAXPROCS(0)) 可以看到当前协程所运行的线程的个数是1。

他们之间的关系是这样的:

A、B、C、D四个协程,都属于同一个线程。A进入sleep的时候,把A的上下文环境保存起来,然后这个线程转到B上执行。这个转过去本质上就是RSP寄存器(栈指针寄存器,保存当前栈顶)切换,具体可类比线程间的切换。GO的是现实每个协程都有自己的独立栈空间。

所以协程最重要的功能是 在休眠的时候线程不休眠,保存上下文,让线程去执行其他的协程

java中的协程 —— Fiber

例子

public class CoroutineTest {
    public static void main(String[] args) {
        Fiber fiberA = new FiberTask("hello");
        Fiber fiberB = new FiberTask("world");
        fiberA.start();
        fiberB.start();
        try {
            fiberA.join();
            fiberB.join();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

class FiberTask extends Fiber<Integer> {
    private String msg;

    public FiberTask(String msg) {
        this.msg = msg;
    }

    @Override
    protected Integer run() throws SuspendExecution, InterruptedException {
        for (int i = 0; i < 5; i++) {
            System.out.println(msg);
            Fiber.park(1000, TimeUnit.MILLISECONDS);
        }
        return 0;
    }
}

使用gradle去执行

使用协程提升IO效率

阻塞式的代码,因为调用一个IO接口后,线程就会被阻塞住,不能去处理其他IO请求。所以一个服务端能支持多少个客户端连接,往往取决于服务端所能创建的线程数量。而线程是很好内存资源的。

如果用协程,可以让内存资源降下来,只有线程的十分之一甚至是几十分之一。

协程的休眠和唤醒都是发生在用户态的。也就是说应用程序开发者要自己负责协程的休眠和唤醒。

一些常用的http client:

https://dzone.com/articles/high-concurrency-http-clients-on-the-jvm

Quasar

Quasar提供了高性能轻量级的线程,提供了类似Go的channel。

需要添加下面的包:

  • Core (必须) co.paralleluniverse:quasar-core:0.7.5[:jdk8] (对于 JDK 8,需要增加jdk8 classifier)
  • Actor co.paralleluniverse:quasar-actors:0.7.5
  • Clustering co.paralleluniverse:quasar-galaxy:0.7.5
  • Reactive Stream co.paralleluniverse:quasar-reactive-streams:0.7.5
  • Kotlin co.paralleluniverse:quasar-kotlin:0.7.5

Quasar fiber依赖java instrumentation修改你的代码。

Quasar最主要的奉献是提供了轻量级线程的实现,叫做fiber(纤程)。它们不是被操作系统管理的,是由一个或多个ForkJoinPool调度。一个idle fiber只占400字节内存,切换的时候占用更少的CPU。应用中可以有上百万的fiber。

Fiber只有在你的代码经常会被等待其他fiber阻塞的时候,才应该使用。对于CPU密集型的代码,很少遇到阻塞的情况,就应该首选thread。

例子

new Fiber<V>() {
  @Override
  protected V run() throws SuspendExecution, InterruptedException {
        // your code
    }
}.start();

Quasar原理

Fiber调度器FiberScheduler是一个高效的、work-stealing、多线程的调度器。默认的调度器是FiberForkJoinSchedule,但是可以使用自己的线程池去调度。

当一个类被加载时,Quasar的instrumentation模块搜索suspendable方法,每一个suspendable方法 f 通过下面的方法instrument:

  1. 搜索对其他suspendable方法( g )的调用,在那些方法前后插入一些代码,用于保存和恢复fiber栈本地变量的状态,记录这个暂停点。在这个“suspendable function chain”的最后,有一个Fiber.park调用,暂停这个fiber,扔出SuspendExecution异常。
  2. g block的时候,SuspendExecution异常会被Fiber捕获。当Fiber被唤醒(用unpack),方法 f 会被调用,执行记录显示它被block在 g 的调用上,所以程序会立即跳到 f 调用 g 的那一行,调用它。最终我们会到达暂停点,然后继续执行。当 g 返回时, f 中插入的代码会恢复 f 的本地变量。
public class Helloworld {
    static void m1() throws SuspendExecution, InterruptedException {
        String m = "m1";
        System.out.println("m1 begin");
        m = m2();
        m = m3();
        System.out.println("m1 end");
        System.out.println(m);
    }
    static String m2() throws SuspendExecution, InterruptedException {
        return "m2";
    }
    static String m3() throws SuspendExecution, InterruptedException {
        return "m3";
    }
    static public void main(String[] args) throws ExecutionException, InterruptedException {
        new Fiber<Void>("Caller", new SuspendableRunnable() {
            @Override
            public void run() throws SuspendExecution, InterruptedException {
                m1();
            }
        }).start();
    }
}
我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章