JAVA锁机制 & 线程阻塞队列(以 ArrayBlockingQueue 为例)笔记
Liuxz一、什么是阻塞队列?
阻塞队列(BlockingQueue)是 Java 并发包中一种特殊的队列,它在普通队列的基础上增加了 阻塞功能:
- 当队列满时,生产者线程会阻塞等待(无法插入元素);
- 当队列空时,取元素的线程会阻塞等待(无法获取元素)。
这种特性使其非常适合 生产者 - 消费者模型,无需手动处理线程同步,简化了多线程协作。
二、核心特性(以代码中的 ArrayBlockingQueue 为例)
有界队列:
ArrayBlockingQueue 是 有界队列(初始化时必须指定容量,如代码中 new ArrayBlockingQueue<>(10) 表示容量为 10),一旦队列满,生产者线程会被阻塞。
阻塞插入与取出:
- 插入元素:
put() 方法在队列满时会阻塞线程,直到队列有空闲位置;
- 取出元素:
take() 方法在队列空时会阻塞线程,直到队列中有元素。
(代码中:生产者用 put() 放数据,消费者用 take() 取数据,自动实现了 “生产 - 消费” 的同步)
三、代码中的生产者 - 消费者模型
生产者(Producter):
循环生成随机整数,通过 q.put(...) 放入队列。若队列已满(达到 10 个元素),put() 会阻塞生产者线程,直到消费者取走元素后才继续放入。
消费者(Consumer):
每 200ms 循环从队列中用 q.take(...) 取元素并打印。若队列空,take() 会阻塞消费者线程,直到生产者放入新元素后才继续取出。
自动协调:
无需额外加锁(如 synchronized 或 Lock),阻塞队列内部已实现线程安全,自动协调生产者和消费者的速度(生产快了就等消费,消费快了就等生产)。
四、常见阻塞队列及特点
| 阻塞队列类型 |
特点 |
适用场景 |
ArrayBlockingQueue |
基于数组实现,有界,初始化需指定容量 |
固定大小的生产者 - 消费者模型 |
LinkedBlockingQueue |
基于链表实现,默认无界(可指定容量) |
任务队列(如线程池任务队列) |
SynchronousQueue |
无容量,放入元素后必须等待被取走 |
线程间直接传递数据 |
PriorityBlockingQueue |
支持优先级排序,无界 |
按优先级处理任务 |
五、核心方法
阻塞队列的方法分为三类(以插入和取出为例):
| 操作类型 |
满队列时的行为 |
空队列时的行为 |
示例方法 |
| 阻塞式 |
阻塞等待 |
阻塞等待 |
put(e)、take() |
| 非阻塞式 |
返回 false 或抛出异常 |
返回 null 或抛出异常 |
add(e)、remove() |
| 超时式 |
超时后返回 false |
超时后返回 null |
offer(e, timeout)、poll(timeout) |
代码中使用的 put() 和 take() 是 阻塞式方法,最适合需要等待的场景。
六、优点
- 线程安全:内部实现了同步机制(如
ReentrantLock),无需手动处理锁;
- 简化协作:自动处理生产者 - 消费者的速度匹配(满则等、空则等);
- 高效:比手动用锁实现的队列更简洁,性能更稳定。
总结
阻塞队列是多线程协作的重要工具,尤其适合生产者 - 消费者模型。ArrayBlockingQueue 作为有界队列,通过 put() 和 take() 的阻塞特性,轻松实现了生产与消费的同步,避免了线程安全问题和复杂的锁管理。
代码:
以下是阻塞队列(ArrayBlockingQueue 及其他常用实现)的代码示例,结合不同场景展示其用法:
示例 1:基础生产者 - 消费者模型(ArrayBlockingQueue)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| import java.util.Random; import java.util.concurrent.ArrayBlockingQueue;
public class BlockingQueueDemo { public static void main(String[] args) { ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);
new Thread(new Producer(queue), "生产者").start(); new Thread(new Consumer(queue), "消费者").start(); }
static class Producer implements Runnable { private final ArrayBlockingQueue<Integer> queue;
public Producer(ArrayBlockingQueue<Integer> queue) { this.queue = queue; }
@Override public void run() { Random random = new Random(); while (true) { int num = random.nextInt(100); try { queue.put(num); System.out.println(Thread.currentThread().getName() + " 放入:" + num + ",当前队列大小:" + queue.size()); Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } }
static class Consumer implements Runnable { private final ArrayBlockingQueue<Integer> queue;
public Consumer(ArrayBlockingQueue<Integer> queue) { this.queue = queue; }
@Override public void run() { while (true) { try { int num = queue.take(); System.out.println(Thread.currentThread().getName() + " 取出:" + num + ",当前队列大小:" + queue.size()); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
|
运行效果:
- 生产者每 500ms 放一个数据,消费者每 1000ms 取一个数据;
- 当队列满(3 个元素)时,生产者会阻塞等待,直到消费者取走数据后再继续放入;
- 输出类似:
1 2 3 4 5 6 7
| 生产者 放入:45,当前队列大小:1 消费者 取出:45,当前队列大小:0 生产者 放入:72,当前队列大小:1 生产者 放入:33,当前队列大小:2 消费者 取出:72,当前队列大小:1 生产者 放入:18,当前队列大小:2 ...
|
示例 2:非阻塞与超时方法(ArrayBlockingQueue)
展示阻塞队列的三类方法(阻塞、非阻塞、超时):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit;
public class BlockingQueueMethods { public static void main(String[] args) throws InterruptedException { ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
queue.put("A"); queue.put("B"); System.out.println("阻塞式放入后:" + queue);
String s1 = queue.take(); System.out.println("阻塞式取出:" + s1 + ",剩余:" + queue);
boolean addOk = queue.add("C"); System.out.println("add成功:" + addOk + ",队列:" + queue);
try { queue.add("D"); } catch (IllegalStateException e) { System.out.println("add失败:队列已满"); }
boolean offerOk = queue.offer("D", 1, TimeUnit.SECONDS); System.out.println("offer超时后是否成功:" + offerOk);
String s2 = queue.poll(1, TimeUnit.SECONDS); System.out.println("poll超时取出:" + s2 + ",剩余:" + queue); } }
|
示例 3:无界队列(LinkedBlockingQueue)
LinkedBlockingQueue 默认无界(容量为 Integer.MAX_VALUE),适合任务数不确定的场景:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| import java.util.concurrent.LinkedBlockingQueue;
public class LinkedBlockingQueueDemo { public static void main(String[] args) throws InterruptedException { LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
for (int i = 0; i < 1000; i++) { queue.put("任务" + i); } System.out.println("放入1000个任务后,队列大小:" + queue.size());
System.out.println("取出:" + queue.take()); } }
|
示例 4:同步队列(SynchronousQueue)
SynchronousQueue 容量为 0,放入元素后必须等待被取走才能继续:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueDemo { public static void main(String[] args) { SynchronousQueue<String> queue = new SynchronousQueue<>();
new Thread(() -> { try { System.out.println("生产者:准备放入数据"); queue.put("同步数据"); System.out.println("生产者:数据被取走,继续执行"); } catch (InterruptedException e) { e.printStackTrace(); } }).start();
new Thread(() -> { try { Thread.sleep(1000); String data = queue.take(); System.out.println("消费者:取出数据:" + data); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
|
运行效果:
生产者放入数据后阻塞 1 秒,直到消费者取走数据才继续执行。
总结
阻塞队列通过内置的阻塞机制,简化了多线程协作:
ArrayBlockingQueue 适合固定容量场景,生产者 - 消费者速度匹配;
LinkedBlockingQueue 适合任务数不确定的场景;
SynchronousQueue 适合线程间直接传递数据(无缓冲);
- 核心方法分为阻塞式(
put/take)、非阻塞式(add/remove)、超时式(offer/poll),根据场景选择即可。