what
观察者模式中通常可以分为: 主题(可被观察的对象),观察者(处理被观察对象发生事件的应对对象)。
主题和观察者通常通过抽象的方式实现松耦合,在双方出现改动的情况下减少代码的改动。
NOTE: 需要在 主题 中维护观察者对象列表,如果观察者数量大且处理的事件属于延时性比较大的情况,各个观察者收到的通知可能会出现不一致。
Spring
以前的一些开发情景中了解过 Spring 中的 事件模式 采用的是 观察者模式。
可以通过自定义 Event 继承 ApplicationEvent,实现 ApplicationListener、ApplicationEventPublisherAware, 来对自定义事件进行监听。
// 主题事件
public class MyEvent extends ApplicationEvent {
public MyEvent(Object source) {
super(source);
}
}
// 观察者
@Component
public class MyEventListener implements ApplicationListener<MyEvent> {
@Override
public void onApplicationEvent(MyEvent event) {
System.out.println("收到 myEvent 事件通知 from: " + event.getSource().toString());
}
}
// 关联发布(不太准确的样子)
@Component
public class MyEventPublish implements ApplicationEventPublisherAware {
private ApplicationEventPublisher publisher;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
public void publish(Object obj) {
this.publisher.publishEvent(obj);
// 可以判断事件类型进行条件发布
// if (obj instanceof MyEvent) this.publisher.publishEvent(obj);
}
}
// 简单的调用
@RestController
@RequestMapping("/")
public class IndexController {
@Autowired
private MyEventPublish publish;
@GetMapping("/")
public String index() {
publish.publish(new MyEvent(this));
return "hello-spring-event";
}
}
// log
// 收到 myEvent 事件通知 from: com.example.IndexController@11f886a6
rxjava
rxjava 通过封装,实现基于事件流的链式调用。
// rxjava 3.*
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
emitter.onNext("A");
emitter.onNext("B");
emitter.onNext("C");
emitter.onNext("D");
emitter.onComplete();
}
});
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
System.out.println("onNext===>" + s);
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
observable.subscribe(observer);