高并发之观察者模式

1.Observable 类

这个类的主要作用是设计我们需要的观察值,和获取观察值的函数

public interface Observable {
    /**
     * 这里是写需要观察的变量和对于提取需要的观察函数。
     */
    enum Cycle{
        STARTED,RUNNING,DONE,ERROR
    }
    Cycle getCycle();
    void start();
    void  interrupt();
}

从这里可以看出我们需要观察的是线程的生命周期,因此有  STARTED,RUNNING,DONE,ERROR四个,同时还定义了获取状态的函数getCycle。

2.TaskLifecycle类

这个类的作用类似于一个响应器,也就是在我们需要observer的变量发生变化时,就会做出响应。

public interface TaskLifecycle<T> {
    /**
     * 这里实际上相当于观察者一旦观察到变化后进行的相应
     * @param thread
     */
    void onStart(Thread thread);

    void onRunning(Thread thread);

    void onFinish(Thread thread);

    void onError(Thread thread);

    class EmptyTaskLisfcycle<T> implements TaskLifecycle {

        @Override
        public void onStart(Thread thread) {

        }

        @Override
        public void onRunning(Thread thread) {

        }

        @Override
        public void onFinish(Thread thread) {

        }

        @Override
        public void onError(Thread thread) {

        }
    }
}

从这里可以看出一个很有趣的东西,在这里设计了一个 EmptyTaskLisfcycle的类,这个类的作用就是在于可以重写 EmptyTaskLisfcycle,从而实现自定义,而且可以实现自定义。

3.Task类

这个类主要的作用是实现业务逻辑。

@FunctionalInterface
public interface Task<T> {
    T call();
}

4. ObservableThread类

这个类是Thread的实现类,首先这个类需要实现Observable接口。

public class ObservableThread <T> extends Thread implements Observable {}

同时它还有三个成员

private final TaskLifecycle<T> lifecycle;
private final Task<T> task;
private Cycle cycle;

task是用于实现业务逻辑,因此一定是需要的。而生命周期的响应是可有可无的,因此提供两个构造函数,当lifecycle没有的时候,实现一个空的lifecycle

   public ObservableThread(Task<T> task) {
        //一个emptyTaskLisfcycle的实现
        this(new TaskLifecycle.EmptyTaskLisfcycle<>(), task);
    }
    public ObservableThread(TaskLifecycle<T> lifecycle, Task<T> task) {
        super();
        if (task == null) {
            throw new IllegalArgumentException("Ths task is required");
        }
        this.lifecycle = lifecycle;
        this.task = task;
    }

而我们要实现一个cycle状态变化,需要一个update函数,这个函数是将各个类进行了一个交融。

private void update(Cycle cycle, T result, Exception e) {
        this.cycle = cycle;
        if (lifecycle == null) {
            return;
        }
        try {
            switch (cycle) {
                case STARTED:
                    this.lifecycle.onStart(currentThread());
                    break;
                case RUNNING:
                    this.lifecycle.onRunning(currentThread());
                    break;
                case DONE:
                    this.lifecycle.onFinish(currentThread());
                    break;
                case ERROR:
                    this.lifecycle.onError(currentThread());
                    break;
            }
        } catch (Exception ex) {
            ex.printStackTrace();
            if (cycle == Cycle.ERROR) {
                throw ex;
            }
        }

    }

首先update 将成员变量的cycle改变,然后判断成员的cycle变成了什么,利用switch,执行onStart,onRunning,onFinish等进行回调。

最后将是Thread的run函数,run函数起到的作用便是跟踪变量,而不是执行业务逻辑,所有的业务逻辑放在Task里面。

@Override
    public void run() {
        System.out.println("Run函数执行了");
        this.update(Cycle.STARTED, null, null);
        try {
            this.update(Cycle.RUNNING, null, null);
            T result = this.task.call();
            this.update(Cycle.DONE, result, null);
            System.out.println("run函数结束了");
        } catch (Exception e) {
            this.update(Cycle.ERROR, null, e);
        }
    }

5. 测试类

public class ObservableTest {
    public static void main(String[] args) {
        Task task = new Task() {

            @Override
            public Object call() {
                System.out.println("Task 执行了");
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();

                }
                System.out.println("finished it ");
                return null;
            }
        };
        final TaskLifecycle<String> lifecycle = new TaskLifecycle.EmptyTaskLisfcycle<String >(){
            @Override
            public void onFinish(Thread thread) {
                System.out.println("Thread "+Thread.currentThread().getName()+" is finished");
            }
        };
        Observable observableThread = new ObservableThread(lifecycle,task);

        observableThread.start();
    }
}

参考资料:

1. 深入理解JAVA虚拟机

2. Java高并发编程详解(多线程和架构设计)汪文君

我来评几句
登录后评论

已发表评论数()

相关站点

热门文章