JAVA 学习并发笔记(一)

1,010 阅读13分钟

线程:每一个任务称为一个线程(thread),它是线程控制的简称。

可以同时运行一个以上线程的程序称为多线程程序。

多进程和多线程的区别:

本质的区别在于每个进程拥有自己的一整套变量,而线程则共享数据。

共享变量使线程之间的通信比进程之间的通信更有效,更容易。

在线程中执行任务

  1. 将任务代码移到实现了 Runnable 接口的类的 run 方法中。这个接口只有一个方法

    public interface Runnable {
        public abstract void run();
    }
    

    由于 Runnable 是一个函数式接口,可以用 lambda 表达式建立一个实例:

    Runnable r = () -> { task code };
    
  2. 由 Runnable 创建一个Thread 对象:

    Thread t = new Thread(r);
    
  3. 启动线程:

    t.start();
    

也可以通过构建一个Thread类的子类定义一个线程

class MyThread extends Thread
{
    public void run()
    {
        task code
    }
}

注意: 不要调用 Thread 类或 Runnable 对象的run方法。直接调用run方法,只会执行同一个线程中的任务,而不会启动新线程。应该调用 Thread.start 方法。。该方法将创建一个执行run方法的新线程。

java.lang.Thread

  1. Thread(Runnable target) 构造一个新线程,用于调用给定目标的 run() 方法
  2. void start() 启动这个线程,将引发调用 run() 方法。这个方法将立即返回,并且新线程将并发运行
  3. void run() 调用关联 Runnable 的 run 方法

java.lang.Runnable

  • void run() 必须覆盖这个方法,并在这个方法中提供所要执行的任务指令

中断线程

当线程的run方法执行方法体中最后一条语句后,并经由执行return语句返回时,或者出现了在方法中没有捕获的异常时,线程将终止。 没有可以强制线程终止的方法。但是,interrupt 方法可以用来请求终止线程 当线程调用 interrupt 方法时,线程的中断状态将被置位。这是每一个线程都具有的 boolean 标志 每个线程都应该不时地检查这个标志,以判断线程是否被中断。

可调用静态的 Thread.currentThread 方法获得当前线程,然后调用isInterrupted 方法

while(!Thread.currentThread().isInterrupted() && more work to do){
            do more work
}

不过,如果线程被阻塞,就无法检测中断状态,这时就会抛出 InterruptedException 异常 当在一个被阻塞的线程(调用 sleepwait )上调用 interrupt 方法时,阻塞调用将会被 InterruptedException 异常中断

java.lang.Thread

  1. void interrupt() 向线程发送中断请求。线程的中断状态将被设置为true。如果该线程目前被sleep 或 wait 调用阻塞,那么,InterruptedException 异常被抛出
  2. static boolean interrupted() 测试当前线程是否被中断。注意,这是一个 静态方法。这一调用会产生副作用——它将当前线程的中断状态重置为false
  3. boolean isInterrupted() 测试线程是否被终止。这一调用不改变线程的中断状态
  4. static Thread currentThread() 返回代表当前执行线程的Thread 对象

线程状态

线程可以有如下6种状态:

  1. New (新创建)
  2. Runnable (可运行)
  3. Blocked (被阻塞)
  4. Waiting (等待)
  5. Timed waiting (计时等待)
  6. Terminated (被终止)

线程状态

新创建线程

当用 new 操作符创建一个新线程时,如 new Thread(r),该线程还没有开始运行。这意味着它的状态是new

可运行线程

一旦调用 start 方法,线程处于runnable 状态。一个可运行的线程可能正在运行也可能没有运行,这取决于操作系统给线程提供运行的时间。运行中的线程被中断,目的是为了让其他线程获得运行机会。线程调度的细节依赖于操作系统提供的服务。抢占式调度 给每一个可运行线程一个时间片来执行任务。当时间片用完,操作系统剥夺该线程的运行权,并给另一个线程运行机会。 在任何给定时刻,一个可运行的线程可能正在运行也可能没有运行(这就是为什么将这个状态称为可运行而不是运行)

被阻塞线程和等待线程

当线程处于被阻塞或等待状态时,它暂时不活动。它不运行任何代码且小号最少的资源。知道线程调度器重新激活它。细节取决于它是怎样达到非活动状态的

  • 当一个线程试图获取一个内部的对象锁,而该锁被其他线程持有,则该线程进入阻塞状态。当所有其他线程释放该锁,并且线程调度器允许本线程持有它的时候,该线程将变成非阻塞状态

  • 当线程等待另一个线程通知调度器一个条件时,它自己进入等待状态。在调用 Object.wait 方法或 Thread.join 方法,或者是等待 java.util.concurrent 库中的 Lock 或 Condition 时,就会出现这种情况

  • 有几个方法有一个超时参数。调用它们导致线程进入计时等待状态。这一状态将一直保持到超时期满或者接收到适当的通知。带有超时的方法有 Thread.sleep 和 Object.wait 、 Object.join 、 Lock.tryLock 以及 Condition.await 的计时版

被终止的线程

  • 因为 run 方法正常退出而自然死亡
  • 因为一个没有捕获的异常终止了 run 方法而意外死亡

同步

在大多数实际的多线程应用中,两个或两个以上的线程需要共享对同一数据的存取。如果两个线程存取相同的对象,并且每一个线程都调用了一个修改该对象状态的方法,将会发生什么呢?根据各线程访问数据的的次序,可能会产生讹误的对象。这样的情况通常被称为竞争条件

我们来模拟一个有若干账户的银行。随机地生成在这些账户之间转移在钱款的交易。每一个账户有一个线程。每一笔交易中,会从线程所服务的账户中随机转移一定数目的钱款到另一个随机账户。

账户转移的方法编写

public void transfer(int from, int to, double amount)
{
    if (accounts[from] < amount)
        return;
    System.out.println(Thread.currentThread());
    accounts[from] -= amount;
    System.out.printf("%10.2f from %d to %d", amount, from, to);
    accounts[to] += amount;
    System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
}

Runnable 类的代码

Runnable r = () ->
{
    try {
        while (true){
            //bank.size 返回的是Bank类accounts数组的长度,这里设置为100,数组中的值均为1000
            int toAccount = (int)(bank.size() * Math.random());
            //MAX_AMOUNT 为 1000
            double amount = MAX_AMOUNT * Math.random();
            bank.transfer(fromAccount, toAccount, amount);
            ///DELAY 为 10
            Thread.sleep((int)(DELAY * Math.random()));
        }
    }catch (InterruptedException e){

    }
};

当这个模拟程序运行时,我们不清楚某一时刻某一个账户余额剩多少钱,不过我们唯一能确定的是,所有账户的总金额应该保持不变。应为 NACCOUNTS * INITIAL_BALANCE 所以我们在每次交易的结尾,transfer 方法重新计算总值打印出来。 ps:这个程序是个死循环,只能按 Ctrl+C 终止程序

程序运行结果如下:

非同步结果

正如结果所示,出现了错误。银行的余额应该保持在10W,才是正确的结果。但是过了一段时间之间,这个结果变了。

附上完整源码

//测试类
public class UnsynchBankTest {

    public static final int NACCOUNTS = 100;
    public static final double INITIAL_BALANCE = 1000;
    public static final double MAX_AMOUNT = 1000;
    public static final int DELAY = 10;

    public static void main(String[] args)
    {
        BankUnSynch bank = new BankUnSynch(NACCOUNTS, INITIAL_BALANCE);

        for (int i=0;i<NACCOUNTS;i++) {
            int fromAccount = i;
            Runnable r = () ->
            {
                try {
                    while (true){
                        int toAccount = (int)(bank.size() * Math.random());
                        double amount = MAX_AMOUNT * Math.random();
                        bank.transfer(fromAccount, toAccount, amount);
                        Thread.sleep((int)(DELAY * Math.random()));
                    }
                }catch (InterruptedException e){

                }
            };
            Thread t = new Thread(r);
            t.start();
        }
    }
}

//银行类
import java.util.*;

public class BankUnSynch {

    private final double[] accounts;

    public BankUnSynch(int n, double initialBalance)
    {
        accounts = new double[n];
        Arrays.fill(accounts, initialBalance);
    }

    /**
     * 转账操作
     * @param from
     * @param to
     * @param amount
     */
    public void transfer(int from, int to, double amount)
    {
        if (accounts[from] < amount)
            return;
        //获取当前线程
        System.out.println(Thread.currentThread());
        accounts[from] -= amount;
        System.out.printf("%10.2f from %d to %d", amount, from, to);
        accounts[to] += amount;
        System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
    }

    /**
     * 获取 accounts 数组的总金额
     * @return
     */
    public double getTotalBalance()
    {
        double sum = 0;
        for (double a: accounts)
            sum += a;
        return sum;
    }

    public int size()
    {
        return accounts.length;
    }
}

竞争条件详解

上面的代码运行时,其实有几个线程更新银行账户余额。一段时间之后,就出现了错误。总额要么增加了,要是变少了。当线程试图同时更新同一个账户的时候,这个问题就出现了。假设两个线程同时执行指令

accounts[to] += amount;

这个时候会发生什么呢?由于这不是原子操作。该指令可能会被处理为:

  1. 将 accounts[to] 加载到寄存器
  2. 增加 amount
  3. 将结果写回 accounts[to]

现在我们假定第一个线程执行步骤1和2,然后,它被剥夺了运行权。第二个线程被唤醒并修改了 accounts 数组中的同一项,即第二个线程执行完了这三个步骤。然后,第一个线程被唤醒并完成其第三步 这样,这一动作就擦去了第二个线程所作的更新,于是,总金额不在正确。

锁对象

如何防止上面情况的发生呢? 有两种机制防止代码块受并发访问的干扰。

  1. 锁和条件对象
  2. synchronized 关键字

加锁

使用 ReentrantLock 保护代码块。代码如下

myLock.lock();// 一个ReentrantLock 对象 
try{
	critical section
}
finally{
	myLock.unlock();
}

这样我们就可以确保任何时刻只有一个线程进入临界区。一旦一个线程封锁了锁对象,其他任何线程都无法通过lock语句。当其他线程调用lock时,它们被阻塞,直到第一个线程释放锁对象

条件对象

现在我们来回想一下业务,当账户中没有足够的余额时,我们是不是应该等待直到另一个线程向账户中注入资金?但是因为这一线程刚刚获得了对bankLock的排他性访问,因此导致了其他线程都被阻塞,这就是成了死锁。这时,我们就需要用到条件对象了。

一个锁对象可以有一个或多个相关的条件对象。可以用 newCondition 方法获得一个条件对象。习惯上给每一个条件对象命名为可以反映它所表达的条件的名字。

例如:

class Bank 
{
    private Lock bankLock;
    private Condition sufficientFunds;

    public Bank()
    {
        ....
        bankLock = new ReentrantLock();
        sufficientFunds = bankLock.newCondition();
    }
}

transfer 方法发现余额不足,调用

sufficientFunds.await();

表示当前线程现在被阻塞了,并放弃了锁。我们希望这样可以使得另一个线程可以进行增加账户余额的操作。

注意

等待获得锁的线程和调用 await 方法的线程存在本质上的不同。一旦一个线程调用 await 方法,它进入该条件的 等待集。当锁可用时,该线程不能马上解除阻塞。相反,它处于阻塞状态,直到另一线程调用同一条件上的 signalAll 方法时为止。

signalAll 方法激活因为这一条件而等待的所有线程。当这些线程从等待集当中移出时,它们再次成为可运行的,调度器将再次激活它们。此时,线程应该再次测试该条件。由于无法确保该条件被满足—— signalAll 方法仅仅是通知正在等待的线程:此时有可能已经满足条件,值得再次去检测该条件。 因此,对await 的调用应该采用以下的方式

while(!(ok to proceed))
	condition.await();

何时调用 signalAll 方法呢?建议在对象的状态有利于等待线程的方向改变时调用 signalAll。如:当完成转账时,就调用 signalAll 方法。

public void transfer(int from,int to, double amount) throws InterruptedException
{
    bankLock.lock();
    try
    {
        while (accounts[from] < amount)
            sufficientFunds.await();
        System.out.println(Thread.currentThread());
        accounts[from] -= amount;
        System.out.printf("%10.2f from %d to %d", amount, from, to);
        accounts[to] += amount;
        System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
        sufficientFunds.signalAll();
    }finally {
        bankLock.unlock();
    }
}

另一个方法是signal,此方法是随机解除选择等待集中某个线程的阻塞状态。这个方法存在危险,即随机选择的线程发现自己仍然不能运行,那它就会再次被阻塞,若无其他线程再次调用signal,那系统就死锁了。

附上修改后的 Bank 源码

import java.util.Arrays;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Bank {

    private final double[] accounts;
    private Lock bankLock;
    private Condition sufficientFunds;

    /**
     * 初始化
     * @param n 数组长度
     * @param initialBalance
     */
    public Bank(int n ,double initialBalance)
    {
        accounts = new double[n];
        Arrays.fill(accounts, initialBalance);
        bankLock = new ReentrantLock();
        sufficientFunds = bankLock.newCondition();
    }

    /**
     * 转账操作
     * @param from
     * @param to
     * @param amount
     * @throws InterruptedException
     */
    public void transfer(int from,int to, double amount) throws InterruptedException
    {
        bankLock.lock();
        try
        {
            while (accounts[from] < amount)
                sufficientFunds.await();
            System.out.println(Thread.currentThread());
            accounts[from] -= amount;
            System.out.printf("%10.2f from %d to %d", amount, from, to);
            accounts[to] += amount;
            System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
            sufficientFunds.signalAll();
        }finally {
            bankLock.unlock();
        }
    }
    /**
     * 获取 accounts 数组的总金额
     * @return
     */
    public double getTotalBalance()
    {
        bankLock.lock();
        try{
            double sum = 0;
            for (double a: accounts)
                sum += a;
            return sum;
        }finally {
            bankLock.unlock();
        }
    }

    public int size()
    {
        return accounts.length;
    }
}

总结一下锁和条件的关键之处

  1. 锁用来保护代码片段,任何时刻只能有一个线程执行被保护的代码
  2. 锁可以管理试图进入被保护代码段的线程
  3. 锁可以拥有一个或多个相关的条件对象
  4. 每个条件对象管理那些已经进入被保护的代码段但还不能运行的线程

synchronized 关键字

Lock 和 Condition 接口为程序设计人员提供了高度的锁定控制。然而大多数情况下,并不需要那样的控制,并且可以使用一种嵌入到Java语言内部的机制。从 1.0 版本开始,Java 中的每一个对象都有一个内部锁。如果一个方法用 synchronized 关键字声明,那么对象的锁将保护整个方法。即,要调用该方法,线程必须获得内部的对象锁。 也就是说:

public synchronized void method()
{
	do something
}
//等价于
public void method()
{
	this.intrinsicLock.lock(); 
	try{
		do something
	}
	finally{
		this.intrinsicLock.unlock();
	}
}

内部对象锁只有一个相关条件。wait 方法添加一个线程到等待集中,notifyAll/notify 方法接触等待线程的阻塞状态。即,调用wait或notifyAll等价于

intrinsicCondition.await(); 
intrinsicCondition.signalAll(); 

内部锁和条件存在一些局限:

  1. 不能中断一个正在试图获得锁的线程
  2. 试图获得锁时不能设定超时
  3. 每个锁仅有单一的条件,可能是不够的

Lock 和 Condition 对象、同步方法的使用的建议:

  • 最好既不使用Lock/Condition 也不使用 synchronized 关键字。在许多情况下可以使用 java.util.concurrent 包中的一种机制,它会为你处理所有的加锁
  • 如果 synchronized 关键字适合你的程序,那么就尽量使用它。
  • 如果特别需要 Lock/Condition 结构提供的独有特性时,才使用 Lock/Condition

使用 synchronized 关键字修改源码

import java.util.Arrays;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BankSynch {

    private final double[] accounts;

    /**
     * 初始化
     * @param n 数组长度
     * @param initialBalance
     */
    public BankSynch(int n ,double initialBalance)
    {
        accounts = new double[n];
        Arrays.fill(accounts, initialBalance);
    }

    /**
     * 转账操作
     * @param from
     * @param to
     * @param amount
     * @throws InterruptedException
     */
    public synchronized void transfer(int from,int to, double amount) throws InterruptedException
    {
        System.out.println(Thread.currentThread());
        accounts[from] -= amount;
        System.out.printf("%10.2f from %d to %d", amount, from, to);
        accounts[to] += amount;
        System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
    }
    /**
     * 获取 accounts 数组的总金额
     * @return
     */
    public synchronized double getTotalBalance()
    {
        double sum = 0;
        for (double a : accounts)
            sum += a;
        return sum;
    }

    public int size()
    {
        return accounts.length;
    }
}

还有一种是客户端锁定,不过客户端锁定是非常脆弱的,通常不推荐使用,这里就不叙述了。

参考

JAVA核心技术(卷1)原书第10版