java核心技术-I-12-并发


线程的概念

在早期的操作系统中并没有线程的概念,进程是拥有资源和独立运行的最小单位,也是程序执行的最小单位。任务调度采用的是时间片轮转的抢占式调度方式,而进程是任务调度的最小单位,每个进程有各自独立的一块内存,使得各个进程之间内存地址相互隔离。

  后来,随着计算机的发展,对CPU的要求越来越高,进程之间的切换开销较大,已经无法满足越来越复杂的程序的要求了。于是就发明了线程,线程是程序执行中一个单一的顺序控制流程,是程序执行流的最小单元,是处理器调度和分派的基本单位。一个进程可以有一个或多个线程,各个线程之间共享程序的内存空间(也就是所在进程的内存空间)。一个标准的线程由线程ID,当前指令指针PC,寄存器和堆栈组成。而进程由内存空间(代码,数据,进程空间,打开的文件)和一个或多个线程组成。

下面是一个Java中创建线程的代码:

  1. 将执行这个代码的任务放在一个类的run 方法中。注意这个类必须实现Runnable接口:

    1
    2
    3
    public interface Runnable{
    void run();
    }

    由于Runnable是一个函数式接口,因此可以用一个lambda表达式创建一个类:

    1
    Runnable r = () -> {task code};
  2. 从这个Runnable构造一个Thread对象。

    1
    var t = new Thread(r);
  3. 启动线程

    1
    t.start();

    注意:不要直接调用Thread或者Runnable对象的run方法。否则其只会在同一个线程中执行这个任务,而没有启动新的线程。

    例如:银行转账的例子:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    Runnable r = () -> {
    try{
    for(int i = 0; i < STEPS; i++){
    double amount = MAX_AMOUNT * Math.random();
    bank.tanfor(0, 2, amount);
    Thread.sleep((int) (DELAY * Math.random()));
    }
    }catch(Interruption e){

    }
    }

    var t = new Thread(r);
    t.start();

    对于给定的步骤数,这个线程会转账一个随机金额,然后休眠一个随机时常。

    线程状态

    Java中的线程有以下6种状态:

    • New-新建
    • Runnable-可运行
    • Blocked-阻塞
    • Waiting-等待
    • Timed waiting-计时等待
    • Terminated-终止

    要确定当前进程的状态,只需调用getState方法。

    新建线程

    当使用new操作符创建一个新线程时,如new Thread(r),这个线程哈没有开始要运行。此时其状态就是新建new

    可运行线程

    一旦调用start操作,线程就处于可运行(runnbale)状态。但是一个可运行的线程可能正在运行,也可能没有运行。要由操作系统为线程提供具体的运行时间。

    一个线程开始运行,它不一定始终保持运行。因为可能不同的操作紫铜调度机制不一样,可能存在抢占式。

    阻塞和等待线程

    当线程处于阻塞或等待状态时,他暂时是不活动的。期间不运行任何代码,而且消耗最少资源。要由线程调度器重新激活这个线程。

    • 当一个线程试图获取一个内部的对象锁,而这个锁母亲被其他线程占有,该线程就会被阻塞。当其他线程都释放了这个锁,并且线程调度器允许该线程持有这个锁时,它将变成非阻塞的。
    • 当线程等待另一个线程通知调度器出现一个条件时,这个线程会进入等待状态。实际上,阻塞状态和等待状态并没有很大区别。
    • 有几个方法有超时参数,调用这写方法时会让线程进入计时等待(timed waiting)状态,这一状态会一直保持到超时期满或者接收到适当的通知时。

    下面时各种状态的转换图:

    线程状态

终止线程

线程会由于以下两个原因之一而终止:

  • run方法正常退出,线程自然终止。
  • 因为一个没有捕获的异常终止了run方法,使线程意外终止。

另外线程的stop方法也可以手动杀死一个线程。但是该方法已经被废除,最好不要使用。

线程的属性

线程的各种属性包括中断的状态、守护线程、未捕获异常的处理器以及不应使用的一些遗留特征。

终端线程

如上终止线程所说,只有run方法自然退出,或因为异常不正常退出才能终止线程。

除了已经废除的stop方法,没有办法可以强制线程终止。

不过,interrupt方法可以用来请求终止一个线程。

当对一个线程调用interrupt方法时,就会设置线程的中断状态。这是每个线程都有的boolean状态。每个线程都应该不是地检查这个标志,以判断这个线程是否被中断。

但是当线程处于阻塞时,无法检测中断状态。

没有任何语言要求被中断地线程应该被终止。中断一个线程只是要引起它地注意。被中断的线程可以决定如何响应中断。某些线程非常重要,所以应当处理这个异常,然后在继续执行下去。但是,更普遍的情况时,线程只希望将中断解释为一个中断请求。其格式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Runnable r = () -> {
try{
//...
while(!Thread.currentThread().isInterrupeted() && more works to do){
//do more work
}
}
catch(InterrutedException e){
//thread was interrupted during sleep or wait
}finally{
//clean up if required
}
// existing the run method terminated the thread.
}

线程名

默认情况下,线程有容易记的名字,如Thread-2。 可以用setName方法为线程设置任何名字:

1
2
var t = new Thread(runnable);
t.setName("Web crawler");

未捕获异常的处理器

现场的run方法不能抛出任何检查型异常,但是,非检查型异常可能会导致线程终止。这种情况下,线程会死亡。

不过,对于可传播的异常,并没有任何catch子句。实际上,在线程死亡之前,异常会传递到一个用于处理未捕获异常的处理器。

这个处理器必须属于一个实现了Thread.UncaughtExceptionHandler接口的类。这个接口只有一个方法:

1
void uncaughtException(Thread t, Throwable e);

可以用setUncatchExceptionHandler方法为任何线程安装一个处理器。

也可以用Thread类的静态方法setDefaultsetUncatchExceptionHandler为所有线程安装一个默认的处理器。(这些处理器可以将异常信息保存到文件中。)

如果没有安装默认处理器,默认处理器则为null。但是,如果没有为单个线程安装处理器,那么处理器就是该线程的ThreadGroup对象。

线程组(ThreadGroup)是可以一起管理的线程的集合。默认情况下,所有的线程都属于同一个线程组,但是也可以建立其他的组。由于现在引入了更好的特性来处理线程集合,所以不建议再使用线程组。

ThreadGroup类实现了Thread.UncaughtExceptionHandler接口。它的uncatchException方法执行一下操作:

  • 如果该线程组有父线程组,那么调用父线程组的uncatchException方法。
  • 否则,如果Thread.getDefaultExceptionHandler方法返回一个非nul的处理器,则调用该处理器。
  • 否则,如果ThrowableThreadDeath的一个实例,什么也不做。
  • 否则,将线程的名字以及Throwable的轨迹输出到System.err

线程优先级

在Java程序设计语言中,每个线程都有一个优先级。其优先级有10级,从1(MIN_PRIORITY)到10(max_priority)。其默认为5(NORM_PRIORITY)。

每当线程调度器有机会选择新线程时,它会首先选择具有较高优先级的线程。但实际的线程优先级会根据操作系统的优先级。比如Windows有7个优先级。而Linux的线程则所有线程优先级都一样,所以会忽略线程优先级。

同步

在大多数实际的多线程应用中,两个或两个以上的新线程需要共同修改同一数据。而不同的调用顺序会导致不同的结果。这种情况通常被称为竞态条件(race condition)。

锁对象

有两种机制可以防止并发的访问代码块。

  • 使用synchronized关键字:会自动提供一个锁以及相关的“条件”。
  • 使用Java5引入的ReentrantLock类。

使用ReentrantLock保护代码块的基本结构如下:

1
2
3
4
5
6
7
var mylock = new ReentrantLock();			//新建锁对象,一般在其他位置
mylock.lock(); //使用该锁对象加锁
try{
//critical section
}finally{
mylock.unlock(); // 使用该锁对象解锁
}

这个结构确保任何时候只有一个线程进入临界区。一旦一个线程锁定了锁对象,其他任何使用到该锁的线程都无法通过lock语句。

要把unlock语句操作包括在finally语句中,这一点很关键。如果在临界区的代码抛出一个异常,锁必须释放。否则,其他线程将永远阻塞。

值得注意的是:

  1. 锁不能使用try-with-resources语句。其原因在于:
  • 解锁方法不是close
  • try-with-resources期望的是一个新变量且在首部声明,而锁变量不是。
  1. 重入锁(reentrant lock),每个对象都拥有唯一的锁。并且每个线程可以反复获得已拥有的锁。锁有一个持有计数(hold count)来跟踪对lock方法的嵌套调用。线程每调用一次lock后都要调用unlock来释放锁。

条件对象

通常,线程进入临界区后却发现只有满足了某个条件之后它才能执行。可以使用一个条件对象来管理那些已经获得一个锁却不能做有用工作的线程。在Java中,条件对象被称为条件变量(conditional variable)。

即当一个线程拥有锁,但是却仍然不能运行(可能缺少其它资源),那么此时这个线程应当放弃这个锁。其实就是防止产生死锁。

例如转账中,当发现账户余额不足的时候,应当放弃这个账户的锁。代码如:

1
2
3
4
5
6
7
8
class Bank{
private Condition stuffcientFunds;
//...
public Bank(){
//...
sufficientFunds = bankLock.newCondition(); //关键
}
}

此时如果transfer方法发现资金不足,他会调用:

stuffcientFunds.await();

当前线程现在暂停,并放弃锁。这就允许另一个线程来拥有这个锁,从而防止死锁。

一旦一个线程调用了await方法,他就进入这个条件的等待集(wait set)。当锁可用时,该线程也不会立即变为可运行状态。实际上,他仍然保持非活动状态,知道另一个线程在同一条件上调用signalAll方法。

即,当另外一个线程完成转账时,它应该调用

1
stuffcientFunds.signalAll();

这个调用会重新激活等待这个条件的所有线程。

此时,线程应当再次测试。不能保证现在一定满足条件-signalAll方法仅仅是通知等待的线程:现在有可能满足条件,值得再次检查条件。

通常,await调用应该放在如下形式的循环中:

1
2
while(!(process is ok))
condition.await();

注意signalAll调用不会立即激活一个等待的线程,它只是接触等待线程的阻塞,是这些线程可以在当前线程释放锁之后竞争访问对象。

另一个signal只是随机选择等待集合中的一个线程,并且解决这个线程的阻塞状态。这个方法虽然高效,但也是危险的,因为可能释放的那个线程可能仍然是不能运行的。

注意:只有当线程拥有一个条件的锁时,它才能在这个条件上调用awaitsignalAllsignal方法。

synchronized关键字

在了解synchronized关键字之前,先对锁和条件的要点做一个总结:

  • 锁用来保护代码片段,一次只能有一个线程执行被保护的代码。
  • 锁可以管理试图进入被保护代码段的线程。
  • 一个锁可以有一个或多个相关联的条件对象。
  • 每个条件对象管理那些已经进入被保护代码段但还不能运行的线程。

LockConditon接口允许程序员充分控制锁定。不过,在大多数情况下,你并不需要那样控制,完全可以使用Java语言内置的一种机制。从1.0开始,Java中的每个对象都有一个内部锁。如果一个方法声明时有synchronized方法修饰符,那么对象的锁将保护整个对象。也就是说,要调用这个方法,线程必须获得内部对象锁。

换句话说:

1
2
3
public synchronized void method(){
//method body
}

等于

1
2
3
4
5
6
7
8
public void method(){
this.intrinsicLock.lock();
try{
//method body
}finally{
this.intrnsicLock.unlock();
}
}

内部对象锁只有一个关联条件。wait方法将一个线程增加到等待集中,notifyAll/notify方法可以解除等待线程的阻塞。换句话说,调用waitnotifyAll等价于:

1
2
intrinsicCondition.await();
intrinsicCondition.signalAll();

注释:waitnotifyAll以及notify方法是Object类的final方法。Condition方法必须命名为awaitsignalAllsignal,从而不会与那些方法发生冲突。

例如,可以用Java如下实现Bank类:

1
2
3
4
5
6
7
8
9
10
11
class Bank{
private double[] accounts;

public synchronized void transfer(int from, int to, int amount) throws InterruptedException{
while(accounts[from] < amount)
wait(); //wait on intrinsic object lock`s single condition
accounts[from] -= amount;
accounts[to] += amount;
notifyAll(); //notify all the threads waiting on the condition
}
}

可以看到,使用synchronized关键字可以得到更为简洁的代码。

要理解这段代码,必须明白每个对象都有一个内部锁,并且这个锁有一个内部条件。这个锁管理试图进入synchronzed方法的线程,这个条件可以管理调用了wait的线程。

将静态方法声明为同步也可以是合法的。如果调用这样的方法,它会获得相关类对象的内部锁。并且其内置锁会被锁定。因此,没有其他线程可以调用这个类的该方法或任何其他同步静态方法。

内部锁和条件存在一些限制:

  • 不能中断一个正在尝试获得锁的线程。
  • 不能指定尝试获得锁时的超时时间。
  • 每个锁仅有一个条件可能是不够的。

那么在代码中应该使用哪种方式呢,LockCondition对象还是synchronized关键字呢?建议如下:

  • 最好既不是用LockCondition对象,也不是使用synchronized关键字。在多数情况下,可以使用java.util.concurrent包中的某种机制,他会自动处理所有的锁定。
  • 如果synchronized关键字可以满足需求,则使用synchronized关键字,这样可以减少代码量也可以减少错误。
  • 如果特别需要LockCondition结构提供的额外能力,则使用Lock/Condtion

同步块

正如前面说的,每一个Java对象都有一个锁。线程可以通过调用同步方法获得锁。还有另外一种方式获得锁:即进入一个同步块。

形式如下:

1
2
3
synchronized (obj){
//critical section
}

其内部的代码就会获得obj的锁。

监视器概念

锁和条件是实现线程同步的强大工具。但其严格的讲,并不是面向对象的。而监视器就是为解决这一点诞生的。

从Java的术语来说,监视器具有如下特征(Java中并没有如此实现):

  • 监视器是只包含私有字段的类。
  • 监视器类的每个对象有一个关联的锁。
  • 所有方法由这个锁锁定。也就是说,当调用obj.method()时,那么obj对象的锁在方法调用开始时自动获得,并且当方法调用自动释放该锁。并且由于所有字段都是私有的,就可以保证当一个线程处理一个字段时,其他的字段无法访问该字段。
  • 锁可以有任意多个相关联的条件。

Java设计者以不太严格的方式实现了监视器概念。其有3处不同于监视器概念:

  • 字段不要求是private
  • 方法不要求是synchronized
  • 内部锁对客户端是可用的。

volatile字段-可见性

volatile关键字为实例字段的同步访问提供了一种免锁机制。如果一个字段声明为volatile,那么编译器和虚拟机就知道该字段可能被另一个线程并发更新。

例如,假设一个对象有一个boolean标记为done,它的值由一个线程设置,而由另外一个线程查询,如同我们讨论的那样,可以用锁:

1
2
3
4
5
6
7
private boolean done;
public synchronized boolean isDone(){
return done;
}
public synchronized void setDone(){
done = true;
}

在这种情况下,将字段声明为volatile就很合适:

1
2
3
4
5
6
7
private volatile boolean done;
public boolean isDone(){
return done;
}
public void setDone(){
done = true;
}

编译器会插入适当的代码,以确保如果一个线程对done变量做了修改,这个修改对读取这个变量的所有其他线程都可见。

注意:volatile变量不能提供原子性。如:

1
2
3
public void flipDone(){
done = !done;
} //not atomic

不能确保反转字段中的值。不能保证读取、翻转和写入不被中断。

final变量

当然也可以使用final来确保读取,因为其只有初始化时可以赋值。

原子性

java.util.concurrent.atomic包中有很多类使用了很高效的机器级指令(没有使用锁)来保证其他操作的原子性。

例如,AtomicInteger类使用了方法incrementAndGetdecrementAntGet,它们分别以源自方式将一个整数进行自增或自减。

实例:

1
2
3
public static AtomicLong nextNumber = new AtomicLong();
//in some method
long id = nextNumber.incrementAndGet();

decrementAntGet方法以原子方式将AtomicLong自增,并返回自增后的值。也就是说:获得值,增加1,然后生产新的值的操作不会中断。所以也可以保证多个线程访问时不会出错。

这个包中有更多的方法(可以用名字看出其功能)

  • AtomicBoolean.classAtomicInteger.class
  • AtomicIntegerArray.class
  • AtomicIntegerFieldUpdater
  • classAtomicLong.class
  • AtomicLongArray.class
  • AtomicLongFieldUpdater.class
  • AtomicMarkableReference.class
  • AtomicReference.class
  • AtomicReferenceArray.class
  • AtomicReferenceFieldUpdater.class
  • AtomicStampedReference.class
  • DoubleAccumulator.class
  • DoubleAdder.class
  • LongAccumulator.class
  • LongAdder.class
  • Striped64.class

值得注意的是,如果有大量的线程要访问相同的原子值,性能就会大幅度下降,因为乐观锁更新需要太多次重试。

LongAdderLongAccumulator类解决了这个问题。LongAdder包含多个变量(加数),其总和值为当前值。可以有多个线程更新不同的加数,线程个数增加时会自动提供新的加数。通常情况下,只有当所有工作完成后才需要总和的值,这个时候用LongAdder效果就会很好。

所以,当预计会有很多线程竞争的时候,用LongAdder更高效。

而自增的时候,需要调用add方法,只有调用sum方法时,才会获得其值。

如:

1
2
3
4
5
6
7
8
9
var adder = new LongAdder();
for(...){
pool.submit(() -> {
while(...){
if(...) adder.increment();
}
})
}
long total = adder.sum();

同样也有DoubleAdderDoubleAccumulator来完成浮点数的操作。

线程局部变量

可以通过ThreadLocal为每个线程都单独提供一个变量,这个变量就不支持共享,也不会存在同步的问题。

如:

1
2
3
4
public static final ThreadLocal<SimpleDateFormat> dateFormate = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));

//...code
String dateStamp = dateFormat.get().format(new Date());

线程安全的集合

Java提供了一些线程安全的集合。用来对应常规的集合。

阻塞线程接口(BlockingQueue interface

阻塞线程用来对应常规的队列。其提供了两套API,可以在有问题的时候阻塞或者抛出错误。工作线程可以周期性的将中间结果存储在阻塞队列之中。其他工作线程移除中间结果,并进一步进行性修改。队列会自动地平衡负载。下表是阻塞队列地方法:

方法 正常动 特殊情况下地动作
add 添加一个元素 若队满,则抛出IllegalStateException异常
element 返回队头元素 若队空,则抛出NoSuchElementException异常
offer 添加一个元素并返回true 若队满,则返回false
peek 返回队头元素 若队空,则返回null
poll 移除并返回队头元素 若队空,则返回null
put 添加一个元素 若队满,则阻塞
remove 移除并返回队头元素 若队空,则抛出NoSuchElementException异常
take 移除并返回队头元素 若队空,则阻塞

阻塞队列方法分为3类(取决于队满或队空时的动作):

  1. 如果使用队列作为线程管理工具,将要使用puttake方法,这两个方法将阻塞进程。
  2. 当试图向满队添加元素或向从空队中获取元素时,add、remove和element操作将会抛出异常。
  3. offer、poll和peek方法将会给出提示而不是错误。而offer和poll还支持超时参数。

数组阻塞队列(ArrayBlockingQueue

数组阻塞队列继承自AbstractQueue并且实现了BlockingQueue

其基本实现还是数组,并且方法都是BlockingQueue接口中地方法。

同样地,与常规对应,还存在

  • LinkedBlockingQueue,其内部实现为链表,同样AbstractQueue并且实现了BlockingQueue
  • PriorityBlockingQueuePriorityQueue特性一致,不过是实现了同步。

高效映射、集和队列

java.util.concurrent包中还提供了映射、有序集合和队列地高效实现:

  • ConcurrentHashMap
  • ConcurrentSkipListMap
  • ConcurrentSkipListSet
  • ConcurrentLinkedQueue

这些集合使用复杂的算法,通过允许并发地访问数据结构地不同部分尽可能减少竞争。

注意这些集合返回的size并不一定能在常量时间内完成操作。确认这些集合的大小通常需要遍历。

并发散列集可以高效地支持大量的getter和一定数量的setter。一般认为其支持至多16个setter。当多于16个时,其他的将会被暂时阻塞。

并发散列集地批量操作

批量操作即提供方法来遍历散列集并执行一些操作。具体有3种不同的操作:

  • search(搜索):为每个键或值应用一个函数,直到函数生成一个非null的结果。然后搜索终止,返回这个函数的结果。
  • reduce(规约):组合所有建或值,这里要使用所提供的一个累加函数。
  • forEach(遍历):为所有的键或值应用一个函数。

每个操作有4个版本:

  • operationKeys: 处理键。
  • operationValues: 处理值。
  • operation: 处理键和值。
  • operationEntries: 处理Mpap.Entry对象。

对于上述操作,需要指定一个参数化阈值(parallesion threshold)。如果映射包含的元素多于这个阈值,就会并行完成批操作。

  • 如果希望批操作在一个线程中运行,可以使用阈值Long.MAX_VALUE
  • 如果希望批操作就能够可能快的完成,可以使用阈值1。

例如search方法,有以下版本:

  • U searchKeys(long threshold, BiFunction<? super K, ? extends U> f)
  • U searchValues(long threshold, BiFunction<? super K, ? extends U> f)
  • U search(long threshold, BiFunction<? super K, ? extends U> f)
  • U searchEntries(long threshold, BiFunction<? super K, ? extends U> f)

例,假设我们希望找出第一个出现次数超过1000次的单词。则需要搜索键和值:

1
2
//map:{key: word, value: appearTimes}
String result = map.search(threshold, (k, v) -> v > 1000 ? k: null);

并发集视图

并发集中并没有提供ConcurrentHashSet,但是可以用newKeySet方法会生成一个Set<k>,这实际上concurrentHasMap<K, Boolean>的一个包装类(所有的映射值都为TRUE,只不过将其视为一个Set,所以不关系其值)。

写入数组的拷贝

copyOnWriteArrayListcopyOnWriteArraySet是线程安全集合,其中所有更改器都会建立底层数组的一个副本。

并行数组算法

Arrays类提供了大量并行化运算。

  • Arrays.parallelSort方法可以对一个基本类型值或对象的数组排序。其用法于基本sort一致。
  • Arrays.paralleSetAll方法会由一个函数计算得到的值填充一个数组。
  • Arrays.parallelPrefix会使用提供的函数,并行地累积给定数组中的每个元素。

遗留的线程安全集合

从Java最开始的版本中,VectorHashtable就提供了动态数组和散列表的线程安全。不过这些类已经被认为是过时的,不应当再使用。

实际上,任何集合类都可以通过使用同步包装器(synchronized wapper)来变成线程安全的。

1
2
List<E> synchArray = Collections.synchronizedList(new ArrayList<E>());
Map<K, V> synchHashMap = Collections.synchronizedMap(new HashMap<K, V>());

结果集的方法应当使用锁加以保护,则可以提供线程安全的访问。并且确保没有任何线程通过原始的非同步方法访问数据结构。(其解决办法一般就是像代码中那样直接传入新建的集合)

不过最好通常使用java.util.concurrent包中定义的集合,而不是同步包装器。

  • 特别的,concurrenthashMap经过了精心实现,假设多个线程访问的是不同的位置,则不会发生阻塞。
  • 经常更改的数组列表是个例外。这种情况下,同步的ArrayLst要胜过CopyOnWriterArrayList

任务和线程池

线程池其实就是一组线程的集合,不过其调用是由其本身来决定的。如果程序中创建了大量的生命周期很短的线程,那么不应该把每个任务映射到一个单独的线程中,而应该使用线程池(Thread pool)。

CallableFuture

Runnable封装一个异步运行的任务,可以将其想象成一个没有参数和返回值的异步方法。

CallableRunnale类似,但是有返回值。

Callable接口是一个参数化的类型,只有一个方法call

1
2
3
public interface Callable<V>{
V call() throws Exception;
}

类型参数是返回值的类型。例如Callable<Integer>表示将返回Integer对象的异步方法。

Future保存异步计算的结果。

Future<V>接口有下面的方法:

  • V get():调用后会阻塞,直到计算完成。
  • V get(long timeout, TimeUnit unit):调用后也会阻塞,但是超时之后,会抛出一个TimeoutException
  • void cancel(boolean mayInterrupt):取消任务,如果计算任务还没开始,则不会再开始。如果已经开始,那么如果mayInterrupttrue,其将会被中断。
  • boolean isDone():查询任务是否完成。

执行Callable的一种方法是使用FutureTask,它实现了FutureRunnable接口。

例:

1
2
3
4
5
6
Callable<Integer> task = ...;
var futureTask = new FutureTask<Integer>(task);
var t = new Thread(futureTask);
t.start();
...;
Integer result = task.get(); //It`s a future

执行器

执行器(Executors)类有很多静态工厂方法,用来构造线程池。下表是一些方法:

方法 描述
newCachedThreadPool 必要时创建新线程,空闲线程会保留60秒。
newFixedThreadPool 池中包含固定数目的线程,空闲线程会一直保留。
newWorkStealingPool 一种适合“fork-join”任务的线程池,其中复杂的任务会分解为更简单的任务,空闲线程会“密取”较简单的任务。
newStringThreadExecutor 只有一个线程的池,会顺序地执行所提交的任务。
newScheduledThreadPool 用于调度执行的固定线程池。
newSingleThreadScheduledEcecutor 用户调度执行的单线程池。

newCachedThreadPool 方法会在线程池有线程的时候直接使用线程,没有的时候新建。

如果线程的生命周期很短,或者大量时间都在阻塞,那么可以使用一个缓存线程池。

为了得到最优的运行速度,并发线程数等于处理器内核数。

可以用以下方法将RunnableCallable对象提交给ExecutroService

  • Future<T> submit(Callable<T> task)
  • Future<?> submit(Runnable task)
  • Future<T> submit(Runnable task, T result)

线程池会在方便的时候尽早提交执行提交的任务。并且会返回一个Future对象,可以用来得到结果或者取消任务。

Future对象有两个方法来终止植线程池中的所有任务:

  • shutdown:这个方法会停止线程池接受新任务,当线程池中的任务都完成时,线程池就会死亡。
  • shutdownNow:线程池会取消所有尚未开始的任务。

控制任务组

执行器方法有两个执行方法:

  • invokeAny:接受一个Callable对象集合,并返回最先完成的那个任务的结果。
  • invokeAll:接受一个Callable对象集合,这个方法会阻塞,直到所有任务都完成,然后返回所有的结果集合。(这里的结果顺序是按提交的顺序)

但是当我们需要按计算结果的顺序得到结果,就可以使用ExecutorCompletionService

如:

1
2
3
var service = new ExecutorCompletionService<T>(executor);
for(Callable<T> task: tasks) service.submit(task);
processFurther(service.take().get());

fork-join

Java7引入了fork-join框架哦那个与计算密集型任务,其原理是通过递归将大任务分解为小任务到不同的线程。

其需要扩展RecursiveTask<T>类,结果会返回T类型的值。

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Counter extends RecursiveTask<Integer>{
//...
protected Integer compute(){
if(to - from < THRESHOLD){
//solve problem directly
}else{
int mid = (from + to) / 2;
var first = new Counter(values, from, mid, filter);
var first = new Counter(values, mid, to, filter);
invokeAll(first, second);
return first.join() + second.join();
}
}
}

可以看到整体就是个二分的思路,当小于阈值时直接计算,否则就二分。

异步计算

这里的异步计算,实际上是一种回调机制。

CompletableFuture

传统的Future对象,要获得其结果时,必须等待阻塞,直到计算完成。

CompletableFuture类实现了Future,它提供了一个回调的参数,允许将回调先传入,再结果可用时将自动调用。

如:

1
2
CompletableFuture<String> f = ...;
f.thenAccept(s -> Process the result string s);

通过这种回调的方式可以无需阻塞获得结果。

completableFuture的所有方法如下表(其用法与上述相同):

方法 参数 描述
thenApply T->U 对结果应用一个函数
thenAccept T->void 类似于thenApply,不过结果为void
thenCompose T->CompletableFuture<U> 对结果调用函数并执行返回的future
handle (T, Throwable)->U 处理结果或错误,生成一个新结果
whenComplete (T, Throwable)->void 类似于handle,不过结果为void
exceptionally Throwable->T 从错误计算一个结果
completeOnTimeout T, long, TimeUnit 如果超时,生成给定值作为结果
orTimeout long, TimeUnit 如果超时,生成一个一个TimeoutException异常
thenRun Runnable 执行Runnable,结果为void

下面还有一些方法来组个多个future

方法 参数 描述
thenCombine CompletableFuture<U>, (T, U)->V 执行两个动作并给定函数的组合结果
thenAcceptBoth CompletableFuture<U>, (T, U)->void thenCombine类似,不过结果为void
runAfterBoth CompletableFuture<?>, Runnable 两个都完成后执行runnable
applyToEdither CompletableFuture<T>, T->V 得到其中一个的结果时,传递给定的函数
acceptEither CompletableFuture<T>, T->void applyToEither类似,不过结果为void
runAfterEither CompletableFuture<?>, Runnable 其中一个完成后执行runnable
static allOf CompletableFuture<?>… 所有给定的future都完成后完成,结果为void
static anyOf CompletableFuture<?>… 任意给定的future完成后则完成,结果为void

前三个方法并发运行两个泛型类型不同的CompletableFuture,并组合结果。

后三个方法并发运行两个泛型类型相同的CompletableFuture。一旦一个完成,则传递它的结果,并忽略另外的。

后面的静态方法allOfanyOf取一组CompletableFuture,并生成CompletableFuture<Void>,然后分别在全部完成时任意一个完成时结束。但不会返回任何结果。

进程

进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。

建立一个进程

建立进程必须要指定要执行的命令,可以用List<String>,或者直接提供命令字符串。

如:

1
var builder = new ProcessBulder("gcc", "myapp.c");

工作目录

每个进程都有一个工作目录,用来解析相对的目录名。默认情况下,进程的工作与虚拟机相同,通常是启动Java程序的那个目的。也可以使用dictionary方法来 改变工作目录。

1
budiler = budiler.dictionary(path.toFile());

输入输出流

默认情况下,输入输出流可以用以下方法访问:

1
2
3
OutputStream processOut = p.getOutputStream();
InputStream processIn = p.getInputStream();
InputStream processErr = p.getErrorStream();

注意:进程的输入流是JVM的一个输出流。我们会写入这个流,而我们写的内容会成为进程的输入。即一个中继的概念。

可以指定新进程的输入、输出和错误流与JVM相同。如果在控制台运行JVM,则所有用户的输入会转发到进程。

1
builder.redirectIO();

环境变量

可以使用以下格式来设置环境变量:

1
2
3
4
Map<String, String> env = builder.environment();
env.put("LANG", "Fr_FR");
env.remove("JAVA_HOME");
Process p = builder.start();

运行一个进程

要等待进程完成,可以调用:

1
int result =process.waitFor();

或者,如果不想无限等待,可以这样做:

1
2
3
4
5
6
long delay = ...;
if(process.waitFor(delay, TimeUnit.SECONDS)){
int result = process.exitValue();
}else{
process.detroyForcibly();
}

进程句柄

可以通过进程句柄来获得进程的更多信息。有以下4中方式:

  • 给定一个Process对象p,使用p.toHandle()方法获得其句柄。
  • 给定一个long类型的操作系统进程ID,ProcessHandle.of(id)获得其句柄。
  • Process.current()是运行这个Java的虚拟机的进程句柄。
  • Process.allprocess()可以生成对当前进程可见的所有操作系统进程的Stream<ProcessHandle>

给定进程句柄,可以获得其id,父进程,子进程和后代进程:

  • long id = handle.pid();
  • Optional<ProcessHandle> parent = handle.parent();
  • Stream<ProcessHandle> children = handle.childrean();
  • Stream<ProcessHandle> descendants = handle.descendants();

还可以通过ProcessHandle.info来获取其他的信息。

也可以通过进程句柄来监视或终止进程。PorcessHandle接口有isAlivesupportsNormTerminationdestroydestroyForciblyonEixt()方法。

Powered by Hexo and Hexo-theme-hiker

Copyright © 2019 - 2024 My Wonderland All Rights Reserved.

UV : | PV :