Nacos Client 源码分析(一)事件的发布与订阅
本文使用的 Nacos 版本为 2.2.2
1. 基本概念
发布—订阅模式是一种消息传递模式,它包含两种角色:发布者和订阅者。发布者负责发送消息,而订阅者则负责接收消息。发布者和订阅者之间没有直接联系,它们通过一个称为“消息队列”或“事件总线”的中介来交互。
在 Nacos Client 中,实际上是使用了观察者模式来实现事件的发布与订阅机制。观察者模式定义了一种一对多的依赖关系,即一个被观察者对象可以被多个观察者对象同时监听,当被观察者对象发生变化时,会通知所有的观察者对象。
Nacos Client 的发布订阅模型主要涉及到以下几个类和接口:
-
NotifyCenter:统一的事件通知中心。
-
Event:事件抽象类,主要实现类为
InstancesChangeEvent,表示实例变化事件。 -
EventPublisher: 事件发布者接口,主要实现类为
DefaultPublisher。 -
Subscriber:事件订阅者抽象类,主要实现类为
InstancesChangeNotifier,订阅InstancesChangeEvent事件并通知EventListener回调。 -
EventListener:事件监听器,处理订阅的事件,通常由用户实现。
2. 依赖关系
先分析各类以及接口之间的依赖关系。当我们采用如下代码订阅一个服务。
NamingService naming = NamingFactory.createNamingService(serverAddr);
List<String> cluster = new ArrayList<>();
cluster.add("A");
naming.subscribe("myService",cluster, event -> {
print(event);
});
NamingService 的实现类 NacosNamingService 的 init 方法会先在 NotifyCenter 中注册一个 InstancesChangeEvent 事件的发布者与订阅者。其中订阅者就是一个 InstancesChangeNotifier ,而发布者又是什么呢?继续追 NotifyCenter 的 registerToPublisher 方法。
private void init(Properties properties) throws NacosException {
// ...
this.notifierEventScope = UUID.randomUUID().toString();
this.changeNotifier = new InstancesChangeNotifier(this.notifierEventScope);
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
NotifyCenter.registerSubscriber(changeNotifier);
// ...
}
在 registerToPublisher 中最终执行了以下代码,该方法以 topic 为键,利用 EventPublisherFactory 生成一个 EventPublisher 作为值加入到 NotifyCenter 实例的 publisherMap 中。publisherMap 是一个 ConcurrentHashMap,其中topic 为监听事件的类名,EventPublishe 的默认类型为 DefaultPusher。NotifyCenter 的所有方法都为静态方法,操作一个静态实例INSTAENCE,在 NotifyCenter 中管理了所有的发布者,每一种事件类型对应一个发布者。
public static EventPublisher registerToPublisher(final Class<? extends Event> eventType,
final EventPublisherFactory factory, final int queueMaxSize) {
// ...
final String topic = ClassUtils.getCanonicalName(eventType);
synchronized (NotifyCenter.class) {
// MapUtils.computeIfAbsent is a unsafe method.
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, eventType, queueMaxSize);
}
// ...
}
继续查看 Subscriber 的去向。追踪 NotifyCenter 的 registerSubscriber 方法,最终在 addSubscriber 方法执行以下代码。该方法在 publisherMap 中找到对应 topic 的 EventPublisher,并将订阅者加入到发布者中。
private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType,
EventPublisherFactory factory) {
// ...
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher instanceof ShardedEventPublisher) {
((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);
} else {
publisher.addSubscriber(consumer);
}
DefaultPulisher 的 addSubscriber 方法如下,将订阅者加入到了其的订阅者集合 subscribers 中,subscribers 的数据类型为 ConcurrentHashSet<Subscriber>。
public void addSubscriber(Subscriber subscriber) {
subscribers.add(subscriber);
}
分析完 init 方法,我们继续看 NacosNamingService 的 subscribe 方法做了什么,追踪 subscribe,最终会执行到下面这句代码
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener)
throws NacosException {
// ...
changeNotifier.registerListener(groupName, serviceName, clusterString, listener);
// ...
}
InstancesChangeNotifier 的 registerListener 方法如下。
public void registerListener(String groupName, String serviceName, String clusters, EventListener listener) {
String key = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
ConcurrentHashSet<EventListener> eventListeners = listenerMap.computeIfAbsent(key, keyInner -> new ConcurrentHashSet<>());
eventListeners.add(listener);
}
我们知道 changeNotifier 是在 NacosNamingService 中定义的一个订阅者,之前已经被 NotifyCenter 加入到了 InstancesChangeEvent 事件的发布者的订阅者集合,这个方法将一个 EventListener 注册到了订阅者的监听者集合 listenerMap,其类型为 Map<String, ConcurrentHashSet<EventListener>>。key 为 由服务名、组名、集群所构成字符串,可以唯一标识一种服务订阅类型,值为监听该服务的 EventListrner 的集合。
综上,在 Nacos Client 中,NotifyCenter 拥有所有 EnventPulisher 的集合,某一个具体的 EnventPulisher 拥有它的所有 Subscriber 的集合,而 Subscriber 拥有它的所有 EventLsiter 的集合,这也体现出了观察者模式的一对多的关系。
3. 发布订阅流程。
我们从 NotifyCenter 的 publishEvent 方法开始入手,研究 Nacos Client 的发布订阅流程,该方法会请求发布者发布事件。
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher.publish(event);
}
// 1. 通过事件类型获得发布者主题
final String topic = ClassUtils.getCanonicalName(eventType);
// 2. 获得对应主题下的事件发布者
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
// 3. 发布事件
if (publisher != null) {
return publisher.publish(event);
}
if (event.isPluginEvent()) {
return true;
}
LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
return false;
}
事件发布调用了发布者的 publish 方法, 先来分析 DefaultPublisher 类。

DefaultPublisher 实现了 EventPublisher 接口和 Thread 类,也是就是说 DefaultPublisher 可以作为一个独立的线程运行。我们先来看一下它的 pushlish 方法。
可以看出 event 被直接放入到了 queue 中。queue 是在 DefaultPublisher 中定义的 BlockingQueue,我在之前有过对阻塞队列的简单介绍(阻塞队列)。但这里没有使用阻塞方法 put,而是直接使用了 offer,若队列已满则去执行 receiveEvent 方法。
public boolean publish(Event event) {
checkIsStart();
boolean success = this.queue.offer(event);
if (!success) {
LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
receiveEvent(event);
return true;
}
return true;
}
DefaultPublisher 的 run 方法如下,run 方法调用了 openEventHandler 方法,而 openEventHandler 会阻塞在 queue 的 take 方法上,当拿到一个 event 后就会执行 receiveEvent 方法,然后循环等待下一个事件的到来
@Override
public void run() {
openEventHandler();
}
void openEventHandler() {
// ...
while (!shutdown) {
final Event event = queue.take();
receiveEvent(event);
UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
}
// ...
}
在 receiveEvent 方法中,会遍历改发布者的所有订阅者,并调用 notifySubscriber 方法通知订阅者。
void receiveEvent(Event event) {
// ...
for (Subscriber subscriber : subscribers) {
// 省略其他细节判断
notifySubscriber(subscriber, event);
}
}
notifySubscriber 方法会调用 Subscriber 的 onEnven 方法。当 Subscriber 拥有自己的线程池时,会在 Subscriber 的线程池中执行异步回调,否者就在 DefaultPublisher 自身的线程中同步执行回调。
@Override
public void notifySubscriber(final Subscriber subscriber, final Event event) {
LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
final Runnable job = () -> subscriber.onEvent(event);
final Executor executor = subscriber.executor();
if (executor != null) {
executor.execute(job);
} else {
try {
job.run();
} catch (Throwable e) {
LOGGER.error("Event callback exception: ", e);
}
}
}
InstancesChangeNotifier 的继承关系如下所示,其实现了 Subscriber 类,每个 Subscriber 都会关注一种事件。

public class InstancesChangeNotifier extends Subscriber<InstancesChangeEvent> {//...}
InstancesChangeNotifier 的 onEvent 方法如下所示。
@Override
public void onEvent(InstancesChangeEvent event) {
// 1. 获取服务标识
String key = ServiceInfo
.getKey(NamingUtils.getGroupedName(event.getServiceName(), event.getGroupName()), event.getClusters());
// 2. 获得对监听该服务的所有 EventListener
ConcurrentHashSet<EventListener> eventListeners = listenerMap.get(key);
if (CollectionUtils.isEmpty(eventListeners)) {
return;
}
// 3. 执行 EventListener 的回调方法
for (final EventListener listener : eventListeners) {
// 事件类型转换
final com.alibaba.nacos.api.naming.listener.Event namingEvent = transferToNamingEvent(event);
// 如果 EventListener 有线程池就异步执行
if (listener instanceof AbstractEventListener && ((AbstractEventListener) listener).getExecutor() != null) {
((AbstractEventListener) listener).getExecutor().execute(() -> listener.onEvent(namingEvent));
} else {
// 否则在当前线程中同步执行
listener.onEvent(namingEvent);
}
}
}
需要注意是这里将 InstancesChangeEvent 事件转化为了 NamingEvent,但这两种事件实现的是不同的类或接口,前者是com.alibaba.nacos.common.notify包下的抽象类,后者是com.alibaba.nacos.api.naming.listener包下的接口。

最终会执行用户定义的 EventListener 的 onEvent 方法。
naming.subscribe("myService",cluster, event -> {
print(event);
});
4. 总结
- Nacos Cilent 采用了观察者模式实现了事件的发布订阅机制。其中 NotifyCenter 拥有所有 EventPublisher 的集合,每一种 Event 对应一个 EventPublisher。 EventPublisher 拥订阅对应 Event 的 Subscriber 的集合。Subscriber 拥有EventListener的集合,EventListener 通过由(serviceName,groupName,clusters)构成的三元组来标识其监听的服务类型。
- NacosNamingService 在初始化时会注册 InstancesChangeEvent 事件的发布者以及订阅者到 NotifyCenter,其中发布者由NotifyCenter 根据事件类型生成,订阅者为在 NacosNamingService 定义的 InstancesChangeNotifier。用户定义的 EventListener 会被注册到这个 InstancesChangeNotifier 中。
- DefaultPublisher 实现了 Thread 类,其内部维护了一个事件阻塞队列,NotifyCenter 将事件加入到 DefaultPublisher 的阻塞队列,DefaultPublisher 在自身线程的 run 方法中循环等待事件的到来并处理。