網(wǎng)站首頁(yè) 編程語(yǔ)言 正文
??EvenBus??? 是 Guava 中 Pub/Sub 模式的輕量級(jí)實(shí)現(xiàn)。平時(shí)開(kāi)發(fā)如果我們要實(shí)現(xiàn)自己的 Pub/Sub 模型,要寫(xiě)不少類(lèi),設(shè)計(jì)也挺復(fù)雜,對(duì)業(yè)務(wù)代碼也有一定的侵入,但是在使用了 ??EventBus?? 之后就很方便了。
在 Pub/Sub 模式中有 push 和 pull 兩種實(shí)現(xiàn),比如 ActiveMQ 就是 push,Kafka 就是 pull,而 ??EventBus??? 就是 push 這種方式,也可以把 ??EventBus?? 看成一個(gè)簡(jiǎn)易版的消息中間件吧。我個(gè)人感覺(jué)所謂的 Publisher 和 Subscriber 其實(shí)沒(méi)有必要強(qiáng)制分的,當(dāng)然也不是說(shuō)不分,主要想表達(dá)的是 Publisher 也可以做 Subscriber 的事,同樣的,Subscriber 也可以做 Publisher 的事。
先看一個(gè)簡(jiǎn)單的 Demo,看看 ??EventBus?? 是怎么玩的。
初識(shí) Demo
兩個(gè) Listener:
/**
* @author dongguabai
* @date 2019-03-18 18:09
* Listener1
*/
public class Listener1 {
@Subscribe //監(jiān)聽(tīng) 參數(shù)為 String 的消息
public void doSth(String info) {
System.out.println("Listener1 接收到了消息:" + info);
}
}
/**
* @author dongguabai
* @date 2019-03-18 18:09
* Listener2
*/
public class Listener2 {
@Subscribe //監(jiān)聽(tīng) 參數(shù)為 Date 的消息
public void doSth(Date info) {
System.out.println("Listener2 接收到了消息:" + info.toLocaleString());
}
}
Source:
/**
* @author dongguabai
* @date 2019-03-18 18:15
*/
public class EventBusSource {
public static void main(String[] args) {
EventBus eventBus = new EventBus();
eventBus.register(new Listener1());
eventBus.register(new Listener2());
eventBus.post("EventBus 發(fā)送的 String 消息");
eventBus.post(new Date());
}
}
運(yùn)行結(jié)果:
在上面的 Demo 中有兩個(gè) ??Listener???,一個(gè)監(jiān)聽(tīng) ??String??? 類(lèi)型的事件消息,一個(gè)監(jiān)聽(tīng) ??Date??? 類(lèi)型的事件消息。這兩個(gè) ??Listener??? 都被注冊(cè)到了 ??EventBus?? 中,通過(guò)這個(gè) Demo 非常簡(jiǎn)單的實(shí)現(xiàn)了一個(gè) Pub/Sub 模型。
要注意的是 ??EventBus??? 的 ??post()??? 方法只能傳入一個(gè)參數(shù),如果有多個(gè)參數(shù)的需求那就只能自己根據(jù)需要進(jìn)行封裝了。這里是 ??@Subscribe?? 的官方注釋?zhuān)f(shuō)的也很詳細(xì):
/**
* Marks a method as an event subscriber.
*
* <p>The type of event will be indicated by the method's first (and only) parameter. If this annotation is applied to methods with zero parameters, or more than one parameter, the object containing the method will not be able to register for event delivery from the {@link EventBus}.
*
* <p>Unless also annotated with @{@link AllowConcurrentEvents}, event subscriber methods will be invoked serially by each event bus that they are registered with.
*
* @author Cliff Biffle
* @since 10.0
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Beta
public @interface Subscribe {}
上面提到了 ??@AllowConcurrentEvents??,可以看看這個(gè)注解,說(shuō)白了就是讓 Subscribers 的 Method 是線(xiàn)程安全的,即串行執(zhí)行:
/**
* Marks an event subscriber method as being thread-safe. This annotation indicates that EventBus
* may invoke the event subscriber simultaneously from multiple threads.
*
* <p>This does not mark the method, and so should be used in combination with {@link Subscribe}.
*
* @author Cliff Biffle
* @since 10.0
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Beta
public @interface AllowConcurrentEvents {}
其實(shí)保證線(xiàn)程安全就是使用的 ??synchronized???,可以看 ??com.google.common.eventbus.Subscriber.SynchronizedSubscriber#invokeSubscriberMethod??:
@Override
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
synchronized (this) {
super.invokeSubscriberMethod(event);
}
}
而且 EventBus 沒(méi)有無(wú)法獲取執(zhí)行結(jié)果的返回值,EventBus 并未提供相應(yīng)的方法獲取執(zhí)行結(jié)果:
DeadEvent
其實(shí) ??DeadEvent??? 源碼很簡(jiǎn)單,它的具體使用場(chǎng)景我也不知道咋說(shuō),說(shuō)白了,它可以幫你封裝一下 ??event???,讓你給你可以獲取 ??EventBus?? 的一些信息。官方注釋是這么說(shuō)的:
/**
* Wraps an event that was posted, but which had no subscribers and thus could not be delivered.
*
* <p>Registering a DeadEvent subscriber is useful for debugging or logging, as it can detect misconfigurations in a system's event distribution.
*
* @author Cliff Biffle
* @since 10.0
*/
@Beta
public class DeadEvent {
private final Object source;
private final Object event;
...
...
}
DeadEvent 就是對(duì) ??post()??? 方法中的 ??event??? 的一個(gè)包裝。在 ??com.google.common.eventbus.EventBus#post?? 中有這樣一段描述:
* <p>If no subscribers have been subscribed for {@code event}'s class, and {@code event} is * not already a {@link DeadEvent}, it will be wrapped in a DeadEvent and reposted.
使用也很簡(jiǎn)單,就是 ??@Subscribe??? 標(biāo)注的方法的參數(shù)要是 ??DeadEvent??。
public class DeadEventListener {
@Subscribe
public void deadEventMethod(DeadEvent deadEvent){
System.out.println("DeadEvent_Source_Class:"+deadEvent.getSource().getClass());
System.out.println("DeadEvent_Source:"+deadEvent.getSource());
System.out.println("DeadEvent_Event:"+deadEvent.getEvent());
}
}
public class DeadEventEventBusSource {
public static void main(String[] args) {
EventBus eventBus = new EventBus();
eventBus.register(new DeadEventListener());
eventBus.post("EventBus 發(fā)送的 String 消息");
}
}
運(yùn)行結(jié)果:
DeadEvent_Source_Class:class com.google.common.eventbus.EventBus
DeadEvent_Source:EventBus{default}
DeadEvent_Event:EventBus 發(fā)送的 String 消息
可以看到 ??DeadEvent??? 就是將 ??event??? 進(jìn)行了一層封裝。運(yùn)行結(jié)果第二行輸出的 default 是因?yàn)???EventBus?? 默認(rèn)名稱(chēng)為 default:
/** Creates a new EventBus named "default". */
public EventBus() {
this("default");
}
異常處理
如果 ??@Subscribe?? 標(biāo)注的方法出現(xiàn)了異常怎么辦呢,這里先測(cè)試一下:
兩個(gè) Listener:
/**
* @author dongguabai
* @date 2019-03-18 18:09
* ExceptionListener1
*/
public class ExceptionListener1 {
@Subscribe //監(jiān)聽(tīng) 參數(shù)為 String 的消息
public void exceptionMethod(String info) {
System.out.println("ExceptionListener1 接收到了消息:" + info);
int i = 1/0;
}
}
/**
* @author dongguabai
* @date 2019-03-18 18:09
* Listener2
*/
public class Listener2 {
@Subscribe //監(jiān)聽(tīng) 參數(shù)為 Date 的消息
public void doSth(Date info) {
System.out.println("Listener2 接收到了消息:" + info.toLocaleString());
}
}
Source:
public class ExceptionEventBusSource {
public static void main(String[] args) {
EventBus eventBus = new EventBus();
eventBus.register(new ExceptionListener1());
eventBus.register(new Listener2());
eventBus.post("EventBus 發(fā)送的 String 消息");
eventBus.post(new Date());
}
}
運(yùn)行結(jié)果:
根據(jù)異常信息的提示去 ??com.google.common.eventbus.Subscriber#invokeSubscriberMethod?? 方法看看:
/**
* Invokes the subscriber method. This method can be overridden to make the invocation
* synchronized.
*/
@VisibleForTesting
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
try {
method.invoke(target, checkNotNull(event));
} catch (IllegalArgumentException e) {
throw new Error("Method rejected target/argument: " + event, e);
} catch (IllegalAccessException e) {
throw new Error("Method became inaccessible: " + event, e);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
}
throw e;
}
}
從這個(gè)方法也可以看出本質(zhì)還是在通過(guò)反射調(diào)用相應(yīng)的 subscribe 方法。其實(shí)如果再往上琢磨的話(huà)還可以看出這個(gè)方法是這么執(zhí)行的:
/** Dispatches {@code event} to this subscriber using the proper executor. */
final void dispatchEvent(final Object event) {
executor.execute(
new Runnable() {
@Override
public void run() {
try {
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
看到 ??executor??? 是不是就有一種眼前一亮的感覺(jué)了,就是使用線(xiàn)程池去執(zhí)行的,額,不能這么說(shuō),只能說(shuō)我們可以根據(jù)構(gòu)造傳入 ??Executor??? 的實(shí)現(xiàn),自定義方法執(zhí)行的方式,當(dāng)然也可以使用 JUC 中的線(xiàn)程池,這個(gè)在 ??AsyncEventBus??? 中會(huì)進(jìn)一步介紹。這里異常也使用了 ??try…catch??? 處理,在 ??catch??? 中調(diào)用了 ??handleSubscriberException()?? 方法:
/** Handles the given exception thrown by a subscriber with the given context. */
void handleSubscriberException(Throwable e, SubscriberExceptionContext context) {
checkNotNull(e);
checkNotNull(context);
try {
exceptionHandler.handleException(e, context);
} catch (Throwable e2) {
// if the handler threw an exception... well, just log it
logger.log(
Level.SEVERE,
String.format(Locale.ROOT, "Exception %s thrown while handling exception: %s", e2, e),
e2);
}
}
很自然要去看看這個(gè) ??exceptionHandler?? 是個(gè)啥東東,因?yàn)槌霈F(xiàn)異常怎么處理就是它來(lái)操作的。
原來(lái)就是個(gè)接口,我們可以通過(guò) ??EventBus?? 的構(gòu)造傳入:
public EventBus(SubscriberExceptionHandler exceptionHandler) {
this(
"default",
MoreExecutors.directExecutor(),
Dispatcher.perThreadDispatchQueue(),
exceptionHandler);
}
這也是一種讓代碼可擴(kuò)展的思路,是不是有一種好多源碼都是這個(gè)套路的感覺(jué),而且這里還使用了 ??context?? 去封裝一些相應(yīng)的參數(shù):
/**
* Context for an exception thrown by a subscriber.
*
* @since 16.0
*/
public class SubscriberExceptionContext {
private final EventBus eventBus;
private final Object event;
private final Object subscriber;
private final Method subscriberMethod;
}
自定義異常處理
既然可擴(kuò)展,那我們就擴(kuò)展一個(gè)來(lái)玩玩:
/**
* @author dongguabai
* @date 2019-04-01 13:21
* 自定義異常處理
*/
public class ExceptionHandler implements SubscriberExceptionHandler {
@Override
public void handleException(Throwable exception, SubscriberExceptionContext context) {
System.out.println("自定義異常處理....");
System.out.println(exception.getLocalizedMessage());
System.out.println("異常方法名稱(chēng):"+context.getSubscriberMethod().getName());
System.out.println("異常方法參數(shù):"+context.getSubscriberMethod().getParameters()[0]);
}
}
在構(gòu)造 EventBus 的時(shí)候傳入自定義的 Handler:
public class ExceptionEventBusSource {
public static void main(String[] args) {
//傳入自定義異常處理 Handler
EventBus eventBus = new EventBus(new ExceptionHandler());
eventBus.register(new ExceptionListener1());
eventBus.register(new Listener2());
eventBus.post("EventBus 發(fā)送的 String 消息");
eventBus.post(new Date());
}
}
執(zhí)行結(jié)果:
ExceptionListener1 接收到了消息:EventBus 發(fā)送的 String 消息
自定義異常處理…
/ by zero
異常方法名稱(chēng):exceptionMethod
異常方法參數(shù):java.lang.String info
Listener2 接收到了消息:2019-4-1 13:27:15
AsyncEventBus
??AsyncEventBus??? 是 ??EventBus?? 的子類(lèi)。看名稱(chēng)就能猜出可以異步執(zhí)行 subscribe 方法。先看一個(gè) Demo:
public class Listener1 {
@Subscribe //監(jiān)聽(tīng) 參數(shù)為 String 的消息
public void doSth0(String info) {
System.out.println(LocalDateTime.now()+" 方法0 執(zhí)行完畢");
}
@Subscribe //監(jiān)聽(tīng) 參數(shù)為 String 的消息
public void doSth1(String info) {
System.out.println(LocalDateTime.now()+" 方法1 執(zhí)行開(kāi)始");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(LocalDateTime.now()+" 方法1 執(zhí)行完畢");
}
@Subscribe //監(jiān)聽(tīng) 參數(shù)為 String 的消息
public void doSth2(String info) {
System.out.println(LocalDateTime.now()+" 方法2 執(zhí)行開(kāi)始");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(LocalDateTime.now()+" 方法2 執(zhí)行完畢");
}
@Subscribe //監(jiān)聽(tīng) 參數(shù)為 String 的消息
public void doSth3(String info) {
System.out.println(LocalDateTime.now()+" 方法3 執(zhí)行開(kāi)始");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(LocalDateTime.now()+" 方法3 執(zhí)行完畢");
}
@Subscribe //監(jiān)聽(tīng) 參數(shù)為 String 的消息
public void doSth4(String info) {
System.out.println(LocalDateTime.now()+" 方法4 執(zhí)行開(kāi)始");
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(LocalDateTime.now()+" 方法4 執(zhí)行完畢");
}
}
public class EventBusSource {
public static void main(String[] args) {
EventBus eventBus = new AsyncEventBus(Executors.newFixedThreadPool(10));
eventBus.register(new Listener1());
eventBus.post("EventBus 發(fā)送的 String 消息");
}
}
運(yùn)行結(jié)果:
2019-04-01T16:35:56.223 方法3 執(zhí)行開(kāi)始
2019-04-01T16:35:56.223 方法0 執(zhí)行完畢
2019-04-01T16:35:56.223 方法1 執(zhí)行開(kāi)始
2019-04-01T16:35:56.223 方法4 執(zhí)行開(kāi)始
2019-04-01T16:35:56.223 方法2 執(zhí)行開(kāi)始
2019-04-01T16:35:57.226 方法1 執(zhí)行完畢
2019-04-01T16:35:58.226 方法2 執(zhí)行完畢
2019-04-01T16:35:59.224 方法3 執(zhí)行完畢
2019-04-01T16:36:00.225 方法4 執(zhí)行完畢
可以看到的確是異步執(zhí)行了。其實(shí) ??AsyncEventBus??? 和 ??EventBus??? 所謂的異步和同步主要是跟 ??executor??? 有關(guān)。從 ??post()??? 方法一路進(jìn)去到 ??com.google.common.eventbus.Subscriber#dispatchEvent?? 方法:
/** Dispatches {@code event} to this subscriber using the proper executor. */
final void dispatchEvent(final Object event) {
executor.execute(
new Runnable() {
@Override
public void run() {
try {
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
本質(zhì)是通過(guò) ??executor??? 去執(zhí)行的,那么這個(gè) ??executor?? 是個(gè)啥呢:
/** Executor to use for dispatching events to this subscriber. */
private final Executor executor;
private Subscriber(EventBus bus, Object target, Method method) {
this.bus = bus;
this.target = checkNotNull(target);
this.method = method;
method.setAccessible(true);
this.executor = bus.executor();
}
就是 ??Subscriber??? 中的 ??EventBus??? 的 ??executor???,是 ??Executor??? 接口。在同步的 ??EventBus??? 中的 ??executor?? 是 Guava 自己提供的:
@GwtCompatible
enum DirectExecutor implements Executor {
INSTANCE;
@Override
public void execute(Runnable command) {
command.run();
}
@Override
public String toString() {
return "MoreExecutors.directExecutor()";
}
}
說(shuō)白了就是一個(gè)很普通的 ??Executor?? 實(shí)現(xiàn)。
在 ??AsyncEventBus??? 中 ??executor?? 我們可以自行傳入:
public AsyncEventBus(Executor executor) {
super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}
其實(shí)這個(gè)地方設(shè)計(jì)就不太好,前面也分析過(guò)了所謂的同步和異步主要與 ??executor??? 有關(guān)。如果我傳入的是一個(gè)同步的 ??executor??? 那怎么辦呢,這里測(cè)試一下(這里偷個(gè)懶,直接用 ??EventBus??? 中的同步的 ??Executor?? 實(shí)現(xiàn)):
public class EventBusSource {
public static void main(String[] args) {
AsyncEventBus eventBus = new AsyncEventBus(MoreExecutors.directExecutor());
eventBus.register(new Listener1());
eventBus.post("EventBus 發(fā)送的 String 消息");
}
}
運(yùn)行結(jié)果:
2019-04-01T17:13:23.158 方法4 執(zhí)行開(kāi)始
2019-04-01T17:13:27.162 方法4 執(zhí)行完畢
2019-04-01T17:13:27.162 方法0 執(zhí)行完畢
2019-04-01T17:13:27.162 方法1 執(zhí)行開(kāi)始
2019-04-01T17:13:28.166 方法1 執(zhí)行完畢
2019-04-01T17:13:28.166 方法2 執(zhí)行開(kāi)始
2019-04-01T17:13:30.171 方法2 執(zhí)行完畢
2019-04-01T17:13:30.171 方法3 執(zhí)行開(kāi)始
2019-04-01T17:13:33.175 方法3 執(zhí)行完畢
可以發(fā)現(xiàn)居然同步執(zhí)行了。這個(gè)地方也是設(shè)計(jì)不嚴(yán)謹(jǐn)?shù)牡胤剑烙?jì)作者本意是讓我們傳入一個(gè)線(xiàn)程池,如果是這樣的話(huà)可以把 ??executor??? 級(jí)別調(diào)低點(diǎn),比如使用 ??ThreadPoolExecutor??。
原文鏈接:https://blog.csdn.net/qq_32907491/article/details/131502174
- 上一篇:沒(méi)有了
- 下一篇:沒(méi)有了
相關(guān)推薦
- 2022-08-13 VMware vCenter 無(wú)法創(chuàng)建自定義規(guī)范
- 2022-07-30 Redis?keys命令的具體使用_Redis
- 2022-02-03 ionic 富文本編輯樣式后,前臺(tái)不能回顯樣式
- 2022-04-18 ASP.Net?Core?MVC基礎(chǔ)系列之獲取配置信息_基礎(chǔ)應(yīng)用
- 2023-02-10 Docker?跨主機(jī)容器間相互訪(fǎng)問(wèn)的實(shí)現(xiàn)_docker
- 2023-07-16 springboot動(dòng)態(tài)端口
- 2022-04-09 react中鍵盤(pán)事件無(wú)法在div上觸發(fā)的問(wèn)題解決
- 2022-11-28 go?mod文件內(nèi)容版本號(hào)簡(jiǎn)單用法詳解_Golang
- 欄目分類(lèi)
-
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過(guò)濾器
- Spring Security概述快速入門(mén)
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡(jiǎn)單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支