服务发现(consumer服务引入)
Table of Contents generated with DocToc (opens new window)
# 摘要
Dubbo服务引入全流程
# 服务发现大致原理
Provider将注册自己的信息到注册中心, Consumer操作从注册中心得知 Provider 的信息,然后自己封装一个调用类和Provider 进行深入地交流。而之前已经提到在 Dubbo中一个可执行体就是 Invoker,所有调用都要向 Invoker 靠拢,因此可以推断出应该要先生成一个 Invoker,然后又因为框架需要往不侵入业务代码的方向发展,那我们的 Consumer 需要无感知的调用远程接口,因此需要搞个代理类,包装一下屏蔽底层的细节。整体大致流程如下:
# 服务发现开端
服务的引入和服务的暴露一样,也是通过 spring 自定义标签机制解析生成对应的 Bean,Provider Service 对应解析的是 ServiceBean 而 Consumer Reference 对应的是 ReferenceBean
服务在 Spring 容器刷新完成之后开始暴露,而服务的引入时机有两种,第一种是饿汉式,第二种是懒汉式。
- 饿汉式是通过实现 Spring 的InitializingBean接口中的 afterPropertiesSet 方法,容器通过调用 ReferenceBean 的 afterPropertiesSet 方法时引入服务。
- 懒汉式是只有当这个服务被注入到其他类中时启动引入流程,也就是说用到了才会开始服务引入。
- 默认情况下,Dubbo 使用懒汉式引入服务,如果需要使用饿汉式,可通过配置 dubbo:reference 的 init 属性开启。
# BeanFactory 、FactoryBean、ObjectFactory
BeanFactory
其实就是 IOC 容器,简单的说就是 Spring 里面的 Bean 都归它管,而FactoryBean
也是 Bean 所以说也是归BeanFactory
管理的。那
FactoryBean
到底是个什么 Bean 呢?它其实就是把我们真实想要的 Bean 封装了一层,在真正要获取这个 Bean 的时候容器会调用FactoryBean#getObject()
方法,而在这个方法里面你可以进行一些复杂的组装操作。这个方法就封装了我们真实想要的对象复杂的创建过程。使用FactoryBean
封装了一层,屏蔽了底层创建的细节,便于 Bean 的使用。ObjectFactory
这个是用于延迟查找的场景,它就是一个普通工厂,当得到ObjectFactory
对象时,相当于 Bean 没有被创建,只有当getObject()
方法时,才会触发 Bean 实例化等生命周期。 主要用于暂时性地获取某个Bean Holder
对象,如果过早的加载,可能会引起一些意外的情况,比如当 Bean A 依赖 Bean B 时,如果过早地初始化 A,那么 B 里面的状态可能是中间状态,这时候使用 A 容易导致一些错误。(循环依赖的错误)。
总结的说 BeanFactory 就是 IOC 容器,FactoryBean 是特殊的 Bean, 用来封装创建比较复杂的对象,而 ObjectFactory 主要用于延迟查找的场景,延迟实例化对象。
# 服务发现的三种方式
服务的引入又分为了三种,第一种是本地引入;第二种是直接连接引入远程服务;第三种是通过注册中心引入远程服务。
# 本地服务发现
含义
之前服务暴露的流程每个服务都会通过一个本地暴露,走 injvm 协议(当然要是 scope = remote 就没本地引用了),因为存在一个服务端既是 Provider 又是 Consumer 的情况,然后有可能自己会调用自己的服务,因此就弄了一个本地引入,这样就避免了远程网络调用的开销。所以服务引入会先去本地缓存找找看有没有本地服务。
# 远程服务发现
这个其实就是平日测试的情况下用用,不需要启动注册中心,由 Consumer 直接配置写死Provider 的地址,然后直连即可
# 注册中心发现
含义
Consumer 通过注册中心得知 Provider 的相关信息,然后进行服务的引入,这里还包括多注册中心,同一个服务多个提供者的情况,如何抉择如何封装,如何进行负载均衡、容错并且让使用者无感知
# 服务发现源码解析
默认是懒汉式的,所以服务引入的入口就是 ReferenceBean 的 getObject 方法
@Override
public Object getObject() {
//懒汉式的
return get();
}
2
3
4
5
public synchronized T get() {
//懒汉式的
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {
init();
}
return ref;
}
2
3
4
5
6
7
8
9
10
# init()源码
//大部分就是检查配置然后将配置构建成 map 主要的方式的是 ref = createProxy(map);
public synchronized void init() {
//******** 省略前面的代码
serviceMetadata.getAttachments().putAll(map);
//从名字可以得到就是要创建的一个代理
ref = createProxy(map);
serviceMetadata.setTarget(ref);
serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
consumerModel.setProxyObject(ref);
consumerModel.init(attributes);
initialized = true;
checkInvokerAvailable();
// dispatch a ReferenceConfigInitializedEvent since 2.7.4
dispatch(new ReferenceConfigInitializedEvent(this, invoker));
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# createProxy()
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
private T createProxy(Map<String, String> map) {
if (shouldJvmRefer(map)) {
//如果是走本地的话,那么直接构建个走本地协议的 URL 然后进行服务的引入
// LOCAL_PROTOCOL: injvm , LOCALHOST_VALUE: 127.0.0.1
URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = REF_PROTOCOL.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
//如果配置里面设置了url ,那要么是点对点直连,要么是配置中心地址,都经过处理加入到urls中
urls.clear();
if (url != null && url.length() > 0) {
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
//得到配置的url,进行循环
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
url = url.setPath(interfaceName);
}
if (UrlUtils.isRegistry(url)) {
//如果是注册中心地址将 map转换为查询字符串,并作为 refer 参数的值添加到 url中
urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
} else {
//如果是点对点会合并url,移除服务提供者的一些配置
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else {
// 然后就是没配置 url 的情况 如果配置了url 那么不是直连的地址,到这里肯定走的就是注册中心引入远程服务了。
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
checkRegistry();
List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException(
"No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() +
" use dubbo version " + Version.getVersion() +
", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
}
if (urls.size() == 1) {
//如果只有一个URL直接转换成invoker
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
// 多个url挨个转换成invoker
Invoker<?> referInvoker = REF_PROTOCOL.refer(interfaceClass, url);
if (shouldCheck()) {
if (referInvoker.isAvailable()) {
invokers.add(referInvoker);
} else {
referInvoker.destroy();
}
} else {
invokers.add(referInvoker);
}
if (UrlUtils.isRegistry(url)) {
//用最后一个注册中心的地址
registryURL = url; // use last registry url
}
}
if (shouldCheck() && invokers.size() == 0) {
throw new IllegalStateException("Failed to check the status of the service "
+ interfaceName
+ ". No provider available for the service "
+ (group == null ? "" : group + "/")
+ interfaceName +
(version == null ? "" : ":" + version)
+ " from the multi registry cluster"
+ " use dubbo version " + Version.getVersion());
}
if (registryURL != null) { // registry url is available
// for multi-subscription scenario, use 'zone-aware' policy by default
String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
// The invoker wrap sequence would be: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
// //创建StaticDirectory实例,并由 Cluster对多个Invoker 进行合并,只暴露出一个 invoker便于调用
invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
} else { // not a registry url, must be direct invoke.
String cluster = CollectionUtils.isNotEmpty(invokers)
?
(invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) :
Cluster.DEFAULT)
: Cluster.DEFAULT;
invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
}
}
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
MetadataUtils.publishServiceDefinition(consumerURL);
// create service proxy
// 通过代理封装invoker返回代理
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
- 整个流程简述一下就是,先检查配置,通过配置构建一个 map ,然后利用 map 来构建 URL,再通过 URL 上的协议利用自适应扩展机制调用对应的
protocol.refer
得到相应的invoker
。 - 在有多个 URL 的时候,先遍历构建出
invoker
然后再由StaticDirectory
封装一下,然后通过cluster
进行合并,只暴露出一个invoker
。 - 然后再构建代理,封装
invoker
返回服务引用,之后Comsumer
调用的就是这个代理类。
# RegistryProtocol.refer()
@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//取registry参数值,并将其设置为协议头
url = getRegistryUrl(url);
// 获取中心实例
Registry registry = getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*" 将参数转为map
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
// 如果分group 的话
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(Cluster.getCluster(MergeableCluster.NAME), registry, type, url, qs);
}
}
Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
return doRefer(cluster, registry, type, url, qs);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# RegistryProtocol.doCreateInvoker()
主要就是获取注册中心实例,然后调用doCreateInvoker()
进行真正的 refer
protected <T> ClusterInvoker<T> doCreateInvoker(DynamicDirectory<T> directory, Cluster cluster, Registry registry, Class<T> type) {
// 设置注册中心实例
directory.setRegistry(registry);
// 设置动态生成的protocol $Adaptive
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
// 生成服务者消费链接
URL urlToRegistry = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(urlToRegistry);
// 向注册中心 注册消费者 在consumer 目录下创建新节点
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(urlToRegistry);
//再订阅注册中心的 providers目录、 configurators目录和routers目录,订阅好了之后就会触发 DubboProtocol的refer方法.
directory.subscribe(toSubscribeUrl(urlToRegistry));
//利用cluser封装direcotry其实就是封装多个invoker
return (ClusterInvoker<T>) cluster.join(directory);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
// 设置注册中心实例
directory.setRegistry(registry);
// 设置动态生成的protocol $Adaptive
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
// 生成服务者消费链接
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(subscribeUrl);
// 向注册中心 注册消费者 在consumer 目录下创建新节点
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
//再订阅注册中心的 providers目录、 configurators目录和routers目录,
//订阅好了之后就会触发 DubboProtocol的refer方法.
directory.subscribe(toSubscribeUrl(subscribeUrl));
//利用cluser封装direcotry:其实就是封装多个invoker
Invoker<T> invoker = cluster.join(directory);
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}
RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
for (RegistryProtocolListener listener : listeners) {
listener.onRefer(this, registryInvokerWrapper);
}
return registryInvokerWrapper;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
这个方法很关键,可以看到生成了 RegistryDirectory
这个 directory
,并向其中set了注册中心实例,它自身也实现了 NotifyListener
接口,因此注册中心的监听其实是它来处理的。
然后向注册中心注册自身的信息,并且向注册中心订阅了 providers
节点、 configurators
节点 和 routers
节点,订阅了之后 RegistryDirectory
会收到这几个节点下的信息,就会触发 DubboInvoker
的生成了,即用于远程调用的 Invoker。
然后通过 cluster
再包装一下得到 Invoker
,因此一个服务可能有多个提供者,最终在 ProviderConsumerRegTable
中记录这些信息,然后返回 Invoker
。
这时我们从注册中心拿到了远程provider
的信息,然后执行DubboProtocol.getClients()
方法进行服务的引入
private ExchangeClient[] getClients(URL url) {
// 是否共享连接
boolean useShareConnect = false;
int connections = url.getParameter(CONNECTIONS_KEY, 0);
List<ReferenceCountExchangeClient> shareClients = null;
// 如果没有配置,连接是共享的,否则,一个服务一个连接
if (connections == 0) {
useShareConnect = true;
/*
* xml 配置应该比属性配置具有更高的优先级
*/
String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS
DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
shareClients = getSharedClient(url, connections);
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
//如果使用的共享连接
if (useShareConnect) {
//得到共享的客户端
clients[i] = shareClients.get(i);
} else {
//得到初始化的新的客户端
clients[i] = initClient(url);
}
}
return clients;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
这里的重点在 getClients
方法,因为终究是要跟远程服务进行网络调用的,而 getClients
就是用于获取客户端实例,实例类型为 ExchangeClient
,底层依赖 Netty
来进行网络通信,并且可以看到默认是共享连接
然后最终得到的 Invoker
如下图,可以看到记录的很多信息,基本上该有的都有了,我这里走的是对应的服务只有一个 url
的情况,多个 url
无非也是利用 directory
和 cluster
封装一层
# 总结
总结
总的来说就是,通过配置组成 URL
,然后通过自适应得到对于的实现类进行服务引入,如果是注册中心那么会向注册中心注册自己的信息,然后订阅注册中心相关信息,得到远程 provider
的 ip
等信息,再通过netty
客户端进行连接。
并且通过directory
和cluster
进行底层多个服务提供者的屏蔽、容错和负载均衡等,最终得到封装好的 invoker
再通过动态代理封装得到代理类,让接口调用者无感知的调用方法