目的
毒药丸是已知的预定义数据项,允许为单独的分布式消费过程提供优雅的关闭策略。
解释
真实世界的案例
让我们考虑一个消息队列,其中有一个生产者和一个消费者。生产者不断在队列中推送新消息,消费者不断阅读它们。最后,当该优雅地关闭时,生产商会发送毒药丸信息。
简而言之
毒药丸是结束消息交换的已知消息结构。
编程案例
让我们先定义消息结构。有接口 Message 和实现类SimpleMessage.
java
public interface Message {
...
enum Headers {
DATE, SENDER
}
void addHeader(Headers header, String value);
String getHeader(Headers header);
Map<Headers, String> getHeaders();
void setBody(String body);
String getBody();
}
public class SimpleMessage implements Message {
private final Map<Headers, String> headers = new HashMap<>();
private String body;
@Override
public void addHeader(Headers header, String value) {
headers.put(header, value);
}
@Override
public String getHeader(Headers header) {
return headers.get(header);
}
@Override
public Map<Headers, String> getHeaders() {
return Collections.unmodifiableMap(headers);
}
@Override
public void setBody(String body) {
this.body = body;
}
@Override
public String getBody() {
return body;
}
}我们使用消息队列来传递消息. 这里我们定义与消息队列相关的类型:MqPublishPoint, MqSubscribePoint and MessageQueue. SimpleMessageQueue实现了所有这些功能接口。
java
public interface MqPublishPoint {
void put(Message msg) throws InterruptedException;
}
public interface MqSubscribePoint {
Message take() throws InterruptedException;
}
public interface MessageQueue extends MqPublishPoint, MqSubscribePoint {
}
public class SimpleMessageQueue implements MessageQueue {
private final BlockingQueue<Message> queue;
public SimpleMessageQueue(int bound) {
queue = new ArrayBlockingQueue<>(bound);
}
@Override
public void put(Message msg) throws InterruptedException {
queue.put(msg);
}
@Override
public Message take() throws InterruptedException {
return queue.take();
}
}现在我们需要消息 Producer 和 Consumer.在内部,他们使用上面的消息队列。
重要的是要注意,当 Producer 停止时,它会发出毒药丸通知 Consumer 表示消息传递已完成。
java
public class Producer {
...
public void send(String body) {
if (isStopped) {
throw new IllegalStateException(String.format(
"Producer %s was stopped and fail to deliver requested message [%s].", body, name));
}
var msg = new SimpleMessage();
msg.addHeader(Headers.DATE, new Date().toString());
msg.addHeader(Headers.SENDER, name);
msg.setBody(body);
try {
queue.put(msg);
} catch (InterruptedException e) {
// allow thread to exit
LOGGER.error("Exception caught.", e);
}
}
public void stop() {
isStopped = true;
try {
queue.put(Message.POISON_PILL);
} catch (InterruptedException e) {
// allow thread to exit
LOGGER.error("Exception caught.", e);
}
}
}
public class Consumer {
...
public void consume() {
while (true) {
try {
var msg = queue.take();
if (Message.POISON_PILL.equals(msg)) {
LOGGER.info("Consumer {} receive request to terminate.", name);
break;
}
var sender = msg.getHeader(Headers.SENDER);
var body = msg.getBody();
LOGGER.info("Message [{}] from [{}] received by [{}]", body, sender, name);
} catch (InterruptedException e) {
// allow thread to exit
LOGGER.error("Exception caught.", e);
return;
}
}
}
}最后,我们准备好展示整个实例。
java
var queue = new SimpleMessageQueue(10000);
final var producer = new Producer("PRODUCER_1", queue);
final var consumer = new Consumer("CONSUMER_1", queue);
new Thread(consumer::consume).start();
new Thread(() -> {
producer.send("hand shake");
producer.send("some very important information");
producer.send("bye!");
producer.stop();
}).start();程序输出
Message [hand shake] from [PRODUCER_1] received by [CONSUMER_1]
Message [some very important information] from [PRODUCER_1] received by [CONSUMER_1]
Message [bye!] from [PRODUCER_1] received by [CONSUMER_1]
Consumer CONSUMER_1 receive request to terminate.类图

应用
适合使用毒药丸设计模式的情况:
- 需要从一个线程/进程向另一个线程/进程发送终止信号。