SpringBoot项目实现发布订阅模式
大家好我是老三在项目里经常会有一些主线业务之外的其它业务比如下单之后发送通知、监控埋点、记录日志……这些非核心业务如果全部一梭子写下去有两个问题一个是业务耦合一个是串行耗时。下单之后的逻辑所以一般在开发的时候都会把这些操作å抽象成观察者模式也就是发布/订阅模式这里就不讨论观察者模式和发布/订阅模式的不同而且一般会采用多线程的方式来异步执行这些观察者方法。观察者模式一开始我们都是自己去写观察者模式。自己实现观察者模式观察者简图观察者观察者定义接口/** * Author: fighter3 * Description: 观察者接口 * Date: 2022/11/7 11:40 下午 */ public interface OrderObserver { void afterPlaceOrder(PlaceOrderMessage placeOrderMessage); }具体观察者Slf4jpublic class OrderMetricsObserver implements OrderObserver {Overridepublic void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {log.info([afterPlaceOrder] metrics);}}Slf4jpublic class OrderLogObserver implements OrderObserver{Overridepublic void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {log.info([afterPlaceOrder] log.);}}Slf4jpublic class OrderNotifyObserver implements OrderObserver{Overridepublic void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {log.info([afterPlaceOrder] notify.);}}业务通知观察者日志记录观察者监控埋点观察者被观察者消息实体定义Data public class PlaceOrderMessage implements Serializable { /** * 订单号 */ private String orderId; /** * 订单状态 */ private Integer orderStatus; /** * 下单用户ID */ private String userId; //…… }被观察者抽象类public abstract class OrderSubject { //定义一个观察者列表 private ListOrderObserver orderObserverList new ArrayList(); //定义一个线程池,这里参数随便写的 ThreadPoolExecutor threadPoolExecutor new ThreadPoolExecutor(6, 12, 6, TimeUnit.SECONDS, new ArrayBlockingQueue(30)); //增加一个观察者 public void addObserver(OrderObserver o) { this.orderObserverList.add(o); } //删除一个观察者 public void delObserver(OrderObserver o) { this.orderObserverList.remove(o); } //通知所有观察者 public void notifyObservers(PlaceOrderMessage placeOrderMessage) { for (OrderObserver orderObserver : orderObserverList) { //利用多线程异步执行 threadPoolExecutor.execute(() - { orderObserver.afterPlaceOrder(placeOrderMessage); }); } } }这里利用了多线程来异步执行观察者。被观察者实现类/** * Author: fighter3 * Description: 订单实现类-被观察者实现类 * Date: 2022/11/7 11:52 下午 */ Service Slf4j public class OrderServiceImpl extends OrderSubject implements OrderService { /** * 下单 */ Override public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) { PlaceOrderResVO resVO new PlaceOrderResVO(); //添加观察者 this.addObserver(new OrderMetricsObserver()); this.addObserver(new OrderLogObserver()); this.addObserver(new OrderNotifyObserver()); //通知观察者 this.notifyObservers(new PlaceOrderMessage()); log.info([placeOrder] end.); return resVO; } }测试Test DisplayName(下单) void placeOrder() { PlaceOrderReqVO placeOrderReqVO new PlaceOrderReqVO(); orderService.placeOrder(placeOrderReqVO); }测试执行结果2022-11-08 00:11:13.617 INFO 20235 --- [pool-1-thread-1] c.f.obverser.OrderMetricsObserver : [afterPlaceOrder] metrics 2022-11-08 00:11:13.618 INFO 20235 --- [ main] cn.fighter3.obverser.OrderServiceImpl : [placeOrder] end. 2022-11-08 00:11:13.618 INFO 20235 --- [pool-1-thread-3] c.fighter3.obverser.OrderNotifyObserver : [afterPlaceOrder] notify. 2022-11-08 00:11:13.617 INFO 20235 --- [pool-1-thread-2] cn.fighter3.obverser.OrderLogObserver : [afterPlaceOrder] log.可以看到观察者是异步执行的。利用Spring精简可以看到观察者模式写起来还是比较简单的但是既然都用到了Spring来管理Bean的生命周期代码还可以更精简一些。Spring精简观察者模式观察者实现类定义成BeanOrderLogObserverSlf4jServicepublic class OrderLogObserver implements OrderObserver {Overridepublic void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) {log.info([afterPlaceOrder] log.);}}OrderMetricsObserverSlf4j Service public class OrderMetricsObserver implements OrderObserver { Override public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) { log.info([afterPlaceOrder] metrics); } }OrderNotifyObserverSlf4j Service public class OrderNotifyObserver implements OrderObserver { Override public void afterPlaceOrder(PlaceOrderMessage placeOrderMessage) { log.info([afterPlaceOrder] notify.); } }被观察者自动注入BeanOrderSubjectpublic abstract class OrderSubject {/*** 利用Spring的特性直接注入观察者*/Autowiredprotected ListOrderObserver orderObserverList;//定义一个线程池,这里参数随便写的ThreadPoolExecutor threadPoolExecutor new ThreadPoolExecutor(6, 12, 6, TimeUnit.SECONDS, new ArrayBlockingQueue(30));//通知所有观察者public void notifyObservers(PlaceOrderMessage placeOrderMessage) {for (OrderObserver orderObserver : orderObserverList) {//利用多线程异步执行threadPoolExecutor.execute(() - {orderObserver.afterPlaceOrder(placeOrderMessage);});}}}OrderServiceImplService Slf4j public class OrderServiceImpl extends OrderSubject implements OrderService { /** * 实现类里也要注入一下 */ Autowired private ListOrderObserver orderObserverList; /** * 下单 */ Override public PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) { PlaceOrderResVO resVO new PlaceOrderResVO(); //通知观察者 this.notifyObservers(new PlaceOrderMessage()); log.info([placeOrder] end.); return resVO; } }这样一来发现被观察者又简洁了很多但是后来我发现在SpringBoot项目里利用Spring事件驱动驱动模型event模型来实现更加地简练。Spring Event实现发布/订阅模式Spring Event对发布/订阅模式进行了封装使用起来更加简单还是以我们这个场景为例看看怎么来实现吧。自定义事件PlaceOrderEvent继承ApplicationEvent并重写构造函数。ApplicationEvent是Spring提供的所有应用程序事件扩展类。public class PlaceOrderEvent extends ApplicationEvent { public PlaceOrderEvent(PlaceOrderEventMessage source) { super(source); } }PlaceOrderEventMessage事件消息定义了事件的消息体。Data public class PlaceOrderEventMessage implements Serializable { /** * 订单号 */ private String orderId; /** * 订单状态 */ private Integer orderStatus; /** * 下单用户ID */ private String userId; //…… }事件监听者事件监听者有两种实现方式一种是实现ApplicationListener接口另一种是使用EventListener注解。事件监听者实现实现ApplicationListener接口实现ApplicationListener接口,重写onApplicationEvent方法将类定义为Bean这样一个监听者就完成了。OrderLogListenerSlf4j Service public class OrderLogListener implements ApplicationListenerPlaceOrderEvent { Override public void onApplicationEvent(PlaceOrderEvent event) { log.info([afterPlaceOrder] log.); } }OrderMetricsListenerSlf4j Service public class OrderMetricsListener implements ApplicationListenerPlaceOrderEvent { Override public void onApplicationEvent(PlaceOrderEvent event) { log.info([afterPlaceOrder] metrics); } }OrderNotifyListenerSlf4j Service public class OrderNotifyListener implements ApplicationListenerPlaceOrderEvent { Override public void onApplicationEvent(PlaceOrderEvent event) { log.info([afterPlaceOrder] notify.); } }使用EventListener注解使用EventListener注解就更简单了直接在方法上加上EventListener注解就行了。OrderLogListenerSlf4jServicepublic class OrderLogListener {EventListenerpublic void orderLog(PlaceOrderEvent event) {log.info([afterPlaceOrder] log.);}}OrderMetricsListenerSlf4jServicepublic class OrderMetricsListener {EventListenerpublic void metrics(PlaceOrderEvent event) {log.info([afterPlaceOrder] metrics);}}OrderNotifyListenerSlf4jServicepublic class OrderNotifyListener{EventListenerpublic void notify(PlaceOrderEvent event) {log.info([afterPlaceOrder] notify.);}}异步和自定义线程池异步执行异步执行也非常简单使用Spring的异步注解Async就可以了。例如OrderLogListenerSlf4j Service public class OrderLogListener { EventListener Async public void orderLog(PlaceOrderEvent event) { log.info([afterPlaceOrder] log.); } }当然还需要开启异步SpringBoot项目默认是没有开启异步的我们需要手动配置开启异步功能很简单只需要在配置类上加上EnableAsync注解就行了这个注解用于声明启用Spring的异步方法执行功能需要和Configuration注解一起使用也可以直接加在启动类上。SpringBootApplication EnableAsync public class DailyApplication { public static void main(String[] args) { SpringApplication.run(DairlyLearnApplication.class, args); } }自定义线程池使用Async的时候一般都会自定义线程池因为Async的默认线程池为SimpleAsyncTaskExecutor不是真的线程池这个类不重用线程默认每次调用都会创建一个新的线程。自定义线程池有三种方式Async自定义线程池实现接口AsyncConfigurer继承AsyncConfigurerSupport配置由自定义的TaskExecutor替代内置的任务执行器我们来看看三种写法实现接口AsyncConfigurerConfiguration Slf4j public class AsyncConfiguration implements AsyncConfigurer { Bean(fighter3AsyncExecutor) public ThreadPoolTaskExecutor executor() { //Spring封装的一个线程池 ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor(); //随便写的一些配置 executor.setCorePoolSize(10); executor.setMaxPoolSize(50); executor.setQueueCapacity(30); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setThreadNamePrefix(fighter3AsyncExecutor-); executor.initialize(); return executor; } Override public Executor getAsyncExecutor() { return executor(); } Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, params) - log.error(String.format([async] task{} error:, method), ex); } }继承AsyncConfigurerSupportConfiguration Slf4j public class SpringAsyncConfigurer extends AsyncConfigurerSupport { Bean public ThreadPoolTaskExecutor asyncExecutor() { ThreadPoolTaskExecutor threadPool new ThreadPoolTaskExecutor(); //随便写的一些配置 threadPool.setCorePoolSize(10); threadPool.setMaxPoolSize(30); threadPool.setWaitForTasksToCompleteOnShutdown(true); threadPool.setAwaitTerminationSeconds(60 * 15); return threadPool; } Override public Executor getAsyncExecutor() { return asyncExecutor(); } Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, params) - log.error(String.format([async] task{} error:, method), ex); } }配置自定义的TaskExecutorSlf4jServicepublic class OrderLogListener {EventListenerAsync(asyncExecutor)public void orderLog(PlaceOrderEvent event) {log.info([afterPlaceOrder] log.);}}配置线程池Configurationpublic class TaskPoolConfig {Bean(name asyncExecutor)public Executor taskExecutor() {ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor();//随便写的一些配置executor.setCorePoolSize(10);executor.setMaxPoolSize(20);executor.setQueueCapacity(200);executor.setKeepAliveSeconds(60);executor.setThreadNamePrefix(asyncExecutor-);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());return executor;}}使用Async注解的时候指定线程池推荐使用这种方式因为在项目里尽量做到线程池隔离不同的任务使用不同的线程池异步和自定义线程池这一部分只是一些扩展稍微占了一些篇幅大家可不要觉得Spring Event用起来很繁琐。发布事件发布事件也非常简单只需要使用Spring 提供的ApplicationEventPublisher来发布自定义事件。OrderServiceImplServiceSlf4jpublic class OrderServiceImpl implements OrderService {Autowiredprivate ApplicationEventPublisher applicationEventPublisher;/*** 下单*/Overridepublic PlaceOrderResVO placeOrder(PlaceOrderReqVO reqVO) {log.info([placeOrder] start.);PlaceOrderResVO resVO new PlaceOrderResVO();//消息PlaceOrderEventMessage eventMessage new PlaceOrderEventMessage();//发布事件applicationEventPublisher.publishEvent(new PlaceOrderEvent(eventMessage));log.info([placeOrder] end.);return resVO;}}在Idea里查看事件的监听者也比较方便点击下面图中的图标就可以查看监听者。查看监听者监听者测试最后我们还是测试一下。Test void placeOrder() { PlaceOrderReqVO placeOrderReqVO new PlaceOrderReqVO(); orderService.placeOrder(placeOrderReqVO); }执行结果2022-11-08 10:05:14.415 INFO 22674 --- [ main] c.f.o.event.event.OrderServiceImpl : [placeOrder] start. 2022-11-08 10:05:14.424 INFO 22674 --- [ main] c.f.o.event.event.OrderServiceImpl : [placeOrder] end. 2022-11-08 10:05:14.434 INFO 22674 --- [sync-executor-3] c.f.o.event.event.OrderNotifyListener : [afterPlaceOrder] notify. 2022-11-08 10:05:14.435 INFO 22674 --- [sync-executor-2] c.f.o.event.event.OrderMetricsListener : [afterPlaceOrder] metrics 2022-11-08 10:05:14.436 INFO 22674 --- [sync-executor-1] c.f.o.event.event.OrderLogListener : [afterPlaceOrder] log.可以看到异步执行而且用到了我们自定义的线程池。小结这篇文章里从最开始自己实现的观察者模式再到利用Spring简化的观察者模式再到使用Spring Event实现发布/订阅模式可以看到Spring Event用起来还是比较简单的。除此之外还有Guava EventBus这样的事件驱动实现大家更习惯使用哪种呢小结这篇文章里从最开始自己实现的观察者模式再到利用Spring简化的观察者模式再到使用Spring Event实现发布/订阅模式可以看到Spring Event用起来还是比较简单的。除此之外还有Guava EventBus这样的事件驱动实现大家更习惯使用哪种呢

相关新闻

【论文常识】快降重科研小助手“降重+降AIGC”功能,打造论文高效降重的黄金组合

【论文常识】快降重科研小助手“降重+降AIGC”功能,打造论文高效降重的黄金组合

在论文降重的过程中,很多人都会陷入两个极端:要么完全依赖免费 AIGC 工具,改完后重复率不达标还出现语义失真;要么一味追求付费查重降重服务,花费不菲却收效甚微。其实,论文高效降重的关键,在于…

2026/5/17 3:37:49 阅读更多 →
【零基础必看】HBuilderX 安装教程(超详细),入门到精通一篇就够

【零基础必看】HBuilderX 安装教程(超详细),入门到精通一篇就够

一、下载安装 1.在地址栏中直接输入https://www.dcloud.io 2.点击箭头所指,进去过后点击DOWNLOAD。 3.点击过后 选择自己所需要的版本。 HBuilder目前有两个版本,一个是windows版,一个是mac版。下载的时候根据自己的电脑选择适合自己的版本…

2026/5/17 3:37:48 阅读更多 →
【IEEE出版,往届已见刊并完成EI检索 | 杭州市仪器仪表学会主办 | 】第二届信号处理、通信与控制系统国际学术会议(SPCCS 2026)

【IEEE出版,往届已见刊并完成EI检索 | 杭州市仪器仪表学会主办 | 】第二届信号处理、通信与控制系统国际学术会议(SPCCS 2026)

第二届信号处理、通信与控制系统国际学术会议(SPCCS 2026) The 2nd International Conference on Signal Processing, Communication and Control Systems 会议时间:2026年3月13-15日 会议地点:中国-杭州 大会官网&#xff1a…

2026/5/17 3:37:48 阅读更多 →

最新新闻

鼠标性能终极测试:如何用免费开源工具精准评估你的鼠标表现

鼠标性能终极测试:如何用免费开源工具精准评估你的鼠标表现

鼠标性能终极测试:如何用免费开源工具精准评估你的鼠标表现 【免费下载链接】MouseTester 项目地址: https://gitcode.com/gh_mirrors/mou/MouseTester 你是否在游戏中总感觉鼠标"飘"得厉害?或者工作时鼠标指针不够精准?别…

2026/7/3 5:01:20 阅读更多 →
单例模式 超详细完整版

单例模式 超详细完整版

一、单例模式是什么?单例模式(Singleton) 是创建型设计模式。 核心定义: 保证一个类在整个程序运行中,有且仅有一个实例对象,并提供一个全局访问入口。二、单例模式三大核心特点(必背&#xff0…

2026/7/3 4:59:20 阅读更多 →
口碑出众的精准尺寸烤盘定制厂家

口碑出众的精准尺寸烤盘定制厂家

做工业化烘焙生产的技术和采购人员都懂,烤盘尺寸哪怕只有1mm的误差,放到自动化隧道炉、连续生产线上就容易出现卡盘、跳盘问题,轻则耽误生产进度,重则刮坏传输设备、提升产品报废率,因此找到靠谱的烘焙器具定制厂家&am…

2026/7/3 4:59:20 阅读更多 →
基于STM32的智能手环设计与实现

基于STM32的智能手环设计与实现

摘要:为满足对人体基础生理信息与日常活动状态的综合监测需求,设计了一套基于STM32的智能手环系统。系统以STM32F103C8T6为控制核心,结合MAX30102心率血氧传感器、DS18B20温度传感器、ADXL345加速度传感器、OLED显示屏、按键、蜂鸣器及ESP826…

2026/7/3 4:57:19 阅读更多 →
2026 年 7 月 openclaw 龙虾替代品推荐 九款分场景商用AI智能体实测对比参考

2026 年 7 月 openclaw 龙虾替代品推荐 九款分场景商用AI智能体实测对比参考

前言 OpenClaw 俗称龙虾,作为海外开源 AI 智能体框架,依托自主操控电脑、多技能扩展的能力积累不少使用者,但原版工具存在部署流程繁琐、国内网络适配度有限、数据跨境存在合规压力、中文长任务运行稳定性一般等现实使用门槛。2026 年国内市场…

2026/7/3 4:57:19 阅读更多 →
JVM 全套面试题整理(由简到难,2026最新完整版)

JVM 全套面试题整理(由简到难,2026最新完整版)

很多同学面试 JVM 很痛苦:知识点杂乱、背了不会用、面试问深一点就崩。本文按照 入门基础 → 内存模型 → GC 垃圾回收 → 类加载机制 → 底层原理 → 线上调优与故障排查 难度逐级递增整理,可直接背诵、可直接口述、可解决线上问题。 适合:J…

2026/7/3 4:53:18 阅读更多 →

日新闻

Nginx防御TLS重协商攻击实战:从原理到配置与监控

Nginx防御TLS重协商攻击实战:从原理到配置与监控

1. 项目概述:为什么TLS重协商攻击至今仍需警惕十多年前的CVE-2011-1473,一个关于TLS/SSL协议重协商机制的漏洞,现在提起来还有必要吗?很多运维和开发朋友可能会觉得,这都老掉牙了,现代服务器和客户端不都默…

2026/7/3 0:03:59 阅读更多 →
华为防火墙双通道远程管理实战:Web与SSH配置详解

华为防火墙双通道远程管理实战:Web与SSH配置详解

1. 项目概述:为什么需要双通道远程管理防火墙?在任何一个稍具规模的企业网络里,防火墙都是那个默默守护在边界的关键角色。作为网络工程师,我们不可能每次都跑到机房,插上console线去配置它。远程管理能力,…

2026/7/3 0:03:59 阅读更多 →
AD74413R与PIC18F65K40的高精度工业数据采集方案

AD74413R与PIC18F65K40的高精度工业数据采集方案

1. 项目概述:AD74413R与PIC18F65K40的协同工作在工业自动化和精密测量领域,同时实现高精度模数转换(ADC)和数模转换(DAC)功能是许多复杂系统的核心需求。AD74413R作为一款四通道可配置模拟输入/输出器件,与PIC18F65K40微控制器的组合&#xf…

2026/7/3 0:05:59 阅读更多 →

周新闻

月新闻