生产者消费者问题

一、基本概念

生产者 - 消费者问题是多线程同步与通信的经典案例。存在两类线程:

  • 生产者线程:负责生成数据,将数据放入共享缓冲区。

  • 消费者线程:负责从共享缓冲区取出数据并进行处理。

    共享缓冲区作为两者传递数据的中介,需保证操作线程安全。

二、核心问题

(一)线程同步

多个线程(生产者和消费者)同时操作共享缓冲区时,要保证操作原子性,避免数据不一致。比如生产者添加数据时,消费者不能同时取数据;生产者修改数据过程中,消费者不能读 “半成品” 数据。

(二)线程通信

  • 缓冲区满时,生产者应等待,直到消费者取出数据腾出空间。
  • 缓冲区空时,消费者应等待,直到生产者放入数据。

三、解决方案

(一)同步机制(以 synchronized 为例)

通过给共享资源(如案例中的 Queue 对象)加锁,保证同一时间只有一个线程能操作它,解决线程同步问题。例如代码中用 synchronized (q) 确保生产者修改数据和消费者读取数据的操作互斥,避免数据不一致。

(二)等待 - 通知机制(wait() 和 notify()/notifyAll()

  • wait():使当前线程进入等待状态,释放对象锁,直到其他线程调用该对象的 notify() 或 notifyAll() 方法唤醒。
  • notify():唤醒在此对象监视器上等待的单个线程。
  • notifyAll():唤醒在此对象监视器上等待的所有线程。

在生产者 - 消费者模型中,缓冲区满时生产者调用 wait() 等待;消费者取出数据后,调用 notify() 通知生产者继续生产。缓冲区空时,消费者调用 wait() 等待,生产者放入数据后调用 notify() 通知消费者继续消费。

四、代码示例与分析

(一)基础同步代码(解决数据一致性)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package test1017;

public class test08 {
public static void main(String[] args) {
Queue q = new Queue();
new Thread(new 生产者(q)).start();
new Thread(new 消费者(q)).start();
}
}

class Queue{
String name;
String tools;
}

class 生产者 implements Runnable{
Queue q;
生产者(Queue q){
this.q = q;
}

@Override
public void run() {
int i = 0;
while(true){
synchronized (q) {
if (i == 0) {
q.name = "蛋";
q.tools = "平底锅";
} else {
q.name = "核桃";
q.tools = "锤子";
}
i = (i + 1) % 2;
}
}
}
}

class 消费者 implements Runnable{
Queue q;
消费者(Queue q){
this.q = q;
}

@Override
public void run() {
while(true){
synchronized (q) {
System.out.println(q.name + " -- " + q.tools);
}
}
}
}
  • 该代码通过 synchronized 保证了数据一致性,消费者不会读到 “半成品” 数据,但存在线程 “忙等”(无意义循环,浪费 CPU 资源)问题。

(二)优化后的等待 - 通知代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package test1017;

public class test08 {
public static void main(String[] args) {
Queue q = new Queue();
new Thread(new 生产者(q)).start();
new Thread(new 消费者(q)).start();
}
}

class Queue{
String name;
String tools;
}

class 生产者 implements Runnable{
Queue q;
生产者(Queue q){
this.q = q;
}

@Override
public void run() {
int i = 0;
while(true){
synchronized (q) {
if (i == 0) {
q.name = "蛋";
q.tools = "平底锅";
} else {
q.name = "核桃";
q.tools = "锤子";
}
i = (i + 1) % 2;
q.notify();
try {
q.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

class 消费者 implements Runnable{
Queue q;
消费者(Queue q){
this.q = q;
}

@Override
public void run() {
while(true){
synchronized (q) {
try {
q.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(q.name + " -- " + q.tools);
q.notify();
}
}
}
}
  • 生产者生产完数据后,调用 notify() 通知消费者消费,然后自己调用 wait() 进入等待状态,释放锁。
  • 消费者等待到通知后,消费数据,然后调用 notify() 通知生产者生产,自己再调用 wait() 进入等待状态。实现了生产者和消费者有序协作,避免无意义循环,提高系统资源利用效率。

五、总结

生产者 - 消费者问题是理解多线程同步与通信的重要场景。通过同步机制保证线程安全,通过等待 - 通知机制实现线程间高效协作,能构建稳定、高效的多线程程序。实际开发中,还可使用更高级的并发工具(如 BlockingQueue 等)更简洁地实现生产者 - 消费者模型。