服务容错、降级
Table of Contents generated with DocToc (opens new window)
why
在网络通信中有很多不确定的因素,比如网络延迟、网络中断等,此类情况出现的话会造成当前这次请求出现失败。当服务通信出现这类问题时,需要采取一定措施来应对。 Dubbo 提供了容错机制来处理这类错误
# 服务容错
# 集群容错方式
方式概述
Dubbo 的集群容错机制就有以下 10 种 ,其中默认的扩展类为 FailoverCluster。 可双击Shifit搜索集群容错机制接口自行查看接口扩展类 org.apache.dubbo.rpc.cluster.Cluster
//默认为FailoverCluster
@SPI(FailoverCluster.NAME)
public interface Cluster {
/**
* Merge the directory invokers to a virtual invoker.
*
* @param <T>
* @param directory
* @return cluster invoker
* @throws RpcException
*/
@Adaptive
<T> Invoker<T> join(Directory<T> directory) throws RpcException;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
mock=org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper
failover=org.apache.dubbo.rpc.cluster.support.FailoverCluster
failfast=org.apache.dubbo.rpc.cluster.support.FailfastCluster
failsafe=org.apache.dubbo.rpc.cluster.support.FailsafeCluster
failback=org.apache.dubbo.rpc.cluster.support.FailbackCluster
forking=org.apache.dubbo.rpc.cluster.support.ForkingCluster
available=org.apache.dubbo.rpc.cluster.support.AvailableCluster
mergeable=org.apache.dubbo.rpc.cluster.support.MergeableCluster
broadcast=org.apache.dubbo.rpc.cluster.support.BroadcastCluster
zone-aware=org.apache.dubbo.rpc.cluster.support.registry.ZoneAwareCluster
2
3
4
5
6
7
8
9
10
# 集群容错路线
路线概述
集群容错真正发生在消费端。当消费端发起调用时,会先从服务目录(注册中心)查询满足需求的服务提供者信息,在此基础上进行路由,路由后的结果才会真正进行容错处理
# 容错机制使用
在consumer消费方(或服务提供者,但一般都是在消费方配置)使用Dubbo相关注解来实现容错机制
timeout
表示等待时间,超过这个时间就认为失败了;retries
是失败后重试的次数
# 容错机制
# 1. Failover Cluster (默认)
失败自动切换,当出现失败,重试其他服务器(缺省);通常用于读操作,但是重试会带来更长延迟。可以通过
retries = "2"
来设置重试次数(不含第一次)
@DubboReference(loadbalance = "random", retries=2, cluster = "failover")
# 2. Failfast Cluster
快速失败,只会发起一次调用,失败立即抛出
RpcException
。通常用于非幂等性的写操作,比如:新增记录
@DubboReference(loadbalance = "random", cluster = "failfase")
# 3. Failsafe Cluster
失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。不抛出异常,直接继续执行后续的业务
@DubboReference(loadbalance = "random", cluster = "failsafe")
# 4. Failback Cluster
失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。
@DubboReference(loadbalance = "random", cluster = "failback")
# 5. Forking Cluster
并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过设置参数
forks="2"
来设置最大并行数。
@DubboReference(loadbalance = "random", cluster = "forking", parameters = {"forks","2"})
# 6. Broadcast Cluster
广播调用所有提供者,逐个调用,任意一台报错则报错。通常用于通知所有提供者更新缓存或日志等本地资源信息。
在广播调用中,可以通过
broadcast.fail.percent
配置节点调用失败的比例,当达到这个比例后,BroadcastClusterInvoker
将不再调用其他节点,直接抛出异常。
broadcast.fail.percent
取值在 0~100 范围内。默认情况下当全部调用失败后,才会抛出异常。broadcast.fail.percent
只是控制的当失败后是否继续调用其他节点,并不改变结果(任意一台报错则报错)。broadcast.fail.percent
参数 在 dubbo2.7.10 及以上版本生效。例如配置
broadcast.fail.percent=20
代表了当 20% 的节点调用失败就抛出异常,不再调用其他节点。
@DubboReference(cluster = "broadcast", parameters = {"broadcast.fail.percent", "20"})
# 7. Available Cluster
调用目前可用的实例(只调用一个),如果当前没有可用的实例,则抛出异常。通常用于不需要负载均衡的场景。
# 8. Mergeable Cluster
将集群中的调用结果聚合起来返回结果,通常和
group
一起配合使用。通过分组对结果进行聚合并返回聚合后的结果,比如菜单服务,用group
区分同一接口的多种实现,现在消费方需从每种group
中调用一次并返回结果,对结果进行合并之后返回,这样就可以实现聚合菜单项。
# 9. ZoneAware Cluster
多注册中心订阅的场景,注册中心集群间的负载均衡。对于多注册中心间的选址策略有如下四种
- 指定优先级:preferred="true"注册中心的地址将被优先选择
<dubbo:registry address="zookeeper://127.0.0.1:2181" preferred="true" />
- 同中心优先:检查当前请求所属的区域,优先选择具有相同区域的注册中心
<dubbo:registry address="zookeeper://127.0.0.1:2181" zone="beijing" />
- 权重轮询:根据每个注册中心的权重分配流量
<dubbo:registry id="beijing" address="zookeeper://127.0.0.1:2181" weight="100" /> <dubbo:registry id="shanghai" address="zookeeper://127.0.0.1:2182" weight="10" />
- 缺省值:选择一个可用的注册中心
# 10. 集群模式配置
按照以下示例在服务提供方和消费方配置集群模式 <dubbo:service cluster="failsafe" /> 或 <dubbo:reference cluster="failsafe" />
# 总结
笔记
在实际应用中查询语句容错策略建议使用默认 Failover Cluster ,而增删改建议使用 Failfast Cluster 或 者 使用 Failover Cluster(retries=”0”) 策略 防止出现数据 重复添加等等其它问题!建议在设计接口时候把查询 接口方法单独做一个接口提供查询
# 服务降级
Dubbo可以通过服务降级功能临时屏蔽某个出错的非关键性服务,并定义降级后的返回策略
# 降级策略
提示
- 所谓降级策略,就是当系统遇到无法承受的压力时,选择暂时关闭一些非关键的功能,或者延时提供一些功能,把此刻所有的资源都提供给现在最关键的服务。
- 在秒杀场景中下订单就是最核心最关键的功能。当系统压力将要到达临界值时,可以暂时先关闭一些非核心功能如查询功能。当秒杀活动结束后,再将暂时关闭的功能开启。这样既保证了秒杀活动的顺利进行,也保护了系统没有崩溃。
- 还有一种降级策略,当系统依赖的下游服务出现错误,甚至已经完全不可用了,那么此时就不能再调用这个下游服务了,否则可能导致雪崩。所以直接返回兜底方案,把下游服务直接降级。
- 这里比较两个概念:服务降级与服务熔断,因为这两个概念比较相似。一般认为服务熔断是服务降级的一个方法,而服务降级还有很多其它方法,例如开关降级、流量降级等等。
# 延时策略
- 物理隔离:应用分别部署在不同物理机、不同机房,资源不会互相影响。
- 线程隔离:不同类型的请求进行分类,交给不同的线程池处理,当一类请求出现高耗时和异常,不影响另一类请求访问。
# Dubbo服务降级
# 降级策略配置
Dubbo框架是自带服务降级策略的,提供了三种常用的降级策略
# 1. 强制降级策略
@DubboService(methods = {@Method(name = "userInfo")},
version = "1.0.0"
// ,mock = "com.zdk.mock.MockMethods"
,mock = "force:return 222"
//强制返回222
)
2
3
4
5
6
# 2. 异常降级策略
<dubbo:reference id="xxxService" mock="throw com.zdk.exception.BizException" interface="com.zdk.service.xxxService" />
# 3. 自定义降级策略
package com.zdk.mock.impl;
import com.zdk.service.UserService;
/**
* @author zhangdikai
* @date 2022-07-18 9:46
*/
public class UserServiceMock implements UserService {
@Override
public String userInfo() {
return "我是mock";
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
配置:
package com.zdk.service.impl;
import com.zdk.service.OrderService;
import com.zdk.service.UserService;
import org.apache.dubbo.config.annotation.DubboReference;
import org.apache.dubbo.config.annotation.Method;
import org.springframework.stereotype.Service;
/**
* @author zdk
* @date 2022/5/28 20:40
*/
@Service
public class OrderServiceImpl implements OrderService {
@DubboReference(methods = {
@Method(
name = "userInfo",
timeout = 2000
)}, version = "1.0.0",
retries = 0
,mock = "com.zdk.mock.impl.UserServiceMock"
)
private UserService userService;
@Override
public String initOrder() {
//查询用户的收货地址
return userService.userInfo();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
注意
自定义降级策略的时候要注意,这个类必须实现你调用的服务的接口,实际调用的就是新的实现的类的相同的方法,只是方法返回就是我们自定义的结果。
还有一个问题,在Dubbo 2.7.8版本下,必须将Mock实现类和配置都写在consumer
端,才能实现mock
的调用,如果写在provider
端,则会爆 Class Not Found com.zdk.mock.impl.UserServiceMock
的异常,显示类加载器不能加载到这个Mock实现类,目前不知道不是版本问题
示例:
@Service
@DubboService(methods = {@Method(name = "userInfo")},
version = "1.0.0"
,mock = "com.zdk.mock.MockMethods"
)
public class UserServiceImpl implements UserService {
@Override
public String userInfo() {
System.out.println("进入服务方法内");
if (Math.random()<0.5){
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return "success";
}
}
// 必须实现对应的接口
@Component
public class MockMethods implements UserService {
@Override
public String userInfo() {
return "执行失败mock";
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 源码分析
当Consumer端调用Provider时,都会调用
MockClusterInvoker
,在这里判断调用是否抛出RpcException
,如果有则执行Mock
public class MockClusterInvoker<T> implements ClusterInvoker<T> {
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
String value = this.getUrl().getMethodParameter(invocation.getMethodName(), "mock", Boolean.FALSE.toString()).trim();
// 有mock属性的情况下 如果没有 直接执行消费逻辑
if (value.length() != 0 && !"false".equalsIgnoreCase(value)) {
// 如果是force 直接执行降级mock返回
if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + this.getUrl());
}
// 执行mock逻辑
result = this.doMockInvoke(invocation, (RpcException)null);
} else {
//不是force
try {
// 进行消费
result = this.invoker.invoke(invocation);
if (result.getException() != null && result.getException() instanceof RpcException) {
RpcException rpcException = (RpcException)result.getException();
//如果消费失败(产生异常) 抛出rpc异常 由下面catch
if (rpcException.isBiz()) {
throw rpcException;
}
// 服务消费失败执行mock逻辑
result = this.doMockInvoke(invocation, rpcException);
}
} catch (RpcException var5) {
if (var5.isBiz()) {
throw var5;
}
// catch后执行mock逻辑
if (logger.isWarnEnabled()) {
logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + this.getUrl(), var5);
}
result = this.doMockInvoke(invocation, var5);
}
}
} else {
// 服务消费默认执行FailoverClusterInvoker
result = this.invoker.invoke(invocation);
}
return result;
}
}
// MockInvoker执行
public class MockInvoker<T> implements Invoker<T> {
@Override
public Result invoke(Invocation invocation) throws RpcException {
String mock = getUrl().getParameter(invocation.getMethodName() + "." + Constants.MOCK_KEY);
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(this);
}
if (StringUtils.isBlank(mock)) {
mock = getUrl().getParameter(Constants.MOCK_KEY);
}
if (StringUtils.isBlank(mock)) {
throw new RpcException(new IllegalAccessException("mock can not be null. url :" + url));
}
mock = normalizeMock(URL.decode(mock));
// <mock="force:return 1">直接包装返回结果
if (mock.startsWith(Constants.RETURN_PREFIX)) {
mock = mock.substring(Constants.RETURN_PREFIX.length()).trim();
try {
Type[] returnTypes = RpcUtils.getReturnTypes(invocation);
Object value = parseMockValue(mock, returnTypes);
return new RpcResult(value);
} catch (Exception ew) {
throw new RpcException("mock return invoke error. method :" + invocation.getMethodName() + ", mock:" + mock + ", url: " + url, ew);
}
}
// <mock="throw">抛出异常
else if (mock.startsWith(Constants.THROW_PREFIX)) {
mock = mock.substring(Constants.THROW_PREFIX.length()).trim();
if (StringUtils.isBlank(mock)) {
throw new RpcException("mocked exception for service degradation.");
} else {
// 获取自定义异常
Throwable t = getThrowable(mock);
throw new RpcException(RpcException.BIZ_EXCEPTION, t);
}
}
// <mock="com.zdk.mock.HelloServiceMock">自定义mock策略
else {
try {
Invoker<T> invoker = getInvoker(mock);
return invoker.invoke(invocation);
} catch (Throwable t) {
throw new RpcException("Failed to create mock implementation class " + mock, t);
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# 一些问题
通过上述源码我们知道,如果在mock
属性中配置force
,那么不会执行真正的业务逻辑,而是强制只执行mock
逻辑,这一部分比较容易理解:
// 不执行消费逻辑直接返回
else if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
}
// 直接执行mock逻辑
result = doMockInvoke(invocation, null);
}
2
3
4
5
6
7
8
但是如果是其他mock
配置,则先执行业务代码,如果业务代码发生异常了再执行mock
逻辑:
try {
// 服务消费默认执行FailoverClusterInvoker
result = this.invoker.invoke(invocation);
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}
if (logger.isWarnEnabled()) {
logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
}
// 服务消费失败执行mock逻辑
result = doMockInvoke(invocation, e);
}
2
3
4
5
6
7
8
9
10
11
12
13
# 超时异常
测试的时候,在consumer
端引入Mock
实现类,并配置@DubboReference
的mock
参数为Mock实现类的全类名,并设置timeout
为2000(2秒),然后在provider
端对服务进行TimeUnit._SECONDS_.sleep(4);
,这种情况mock
是正常执行了,说明超时异常也属于RpcException
提示
要分析超时异常为什么可以被降级策略捕获,我们从以下两个类分析。DefaultFuture.get
方法采用了经典多线程保护性暂停模式,并且实现了异步转同步的效果,如果发生超时异常则抛出TimeoutException
异常:
public class DefaultFuture extends CompletableFuture<Object> {
@Override
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
// response对象为空
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
// 进行循环
while (!isDone()) {
// 放弃锁并使当前线程阻塞,直到发出信号或中断它或者达到超时时间
done.await(timeout, TimeUnit.MILLISECONDS);
// 阻塞结束后再判断是否完成
if (isDone()) {
break;
}
// 阻塞结束后判断超过超时时间
if(System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
// response对象仍然为空则抛出超时异常
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
提示
DubboInvoker
调用进行获取future的时候,如果捕获到上述TimeoutException
则会抛出RpcException
:
protected Result doInvoke(final Invocation invocation) throws Throwable {
// .. 省略
try {
// .....
} catch (TimeoutException var10) {
throw new RpcException(2, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var10.getMessage(), var10);
} catch (RemotingException var11) {
throw new RpcException(1, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var11.getMessage(), var11);
}
}
2
3
4
5
6
7
8
9
10
# 业务异常
我们把把非超时异常统称为业务异常,例如生产者业务执行时发生运行时异常可以归为业务异常,下面我们进行试验
5.2.1 代码实例 生产者执行过程中抛出运行时异常:
public class HelloServiceImpl implements HelloService {
public String sayHello(String name) throws Exception {
throw new RuntimeException("BizException");
}
}
2
3
4
5
消费者调用直接抛出异常:
源码分析 发现服务降级对业务异常没有生效,需要分析原因,从以下两点进行分析: (1) 消费者接收到什么消息
public class DefaultFuture extends CompletableFuture<Object> {
public static void received(Channel channel, Response response, boolean timeout) {
try {
DefaultFuture future = (DefaultFuture)FUTURES.remove(response.getId());
if (future != null) {
Timeout t = future.timeoutCheckTask;
if (!timeout) {
t.cancel();
}
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")).format(new Date()) + ", response status is " + response.getStatus() + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result.");
}
} finally {
CHANNELS.remove(response.getId());
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
response
用来接收服务端发送的消息,我们看到异常信息存放在Response的exception
属性:
Response [id=0, version=null, status=20, event=false, error=null, result=RpcResult [result=null, exception=java.lang.RuntimeException: BizException]]
(2) 异常在哪里被抛出
我们知道消费者对象是一个代理对象,首先会执行到InvokerInvocationHandler
:
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
// RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[JAVA前线], attachments={}]
RpcInvocation rpcInvocation = createInvocation(method, args);
// 消费者Invoker -> MockClusterInvoker(FailoverClusterInvoker(RegistryDirectory(invokers)))
Result result = invoker.invoke(rpcInvocation);
// 结果包含异常信息则抛出异常 -> 例如异常结果对象RpcResult [result=null, exception=java.lang.RuntimeException: sayHelloError1 error]
return result.recreate();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
AppResponse.recreate
方法会处理异常,如果发现异常对象不为空则抛出异常:
public class AppResponse implements Result {
@Override
public Object recreate() throws Throwable {
if (this.exception != null) {
try {
Class clazz;
for(clazz = this.exception.getClass(); !clazz.getName().equals(Throwable.class.getName()); clazz = clazz.getSuperclass()) {
}
Field stackTraceField = clazz.getDeclaredField("stackTrace");
stackTraceField.setAccessible(true);
Object stackTrace = stackTraceField.get(this.exception);
if (stackTrace == null) {
this.exception.setStackTrace(new StackTraceElement[0]);
}
} catch (Exception var4) {
}
throw this.exception;
} else {
return this.result;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
通过上述示例我们知道Dubbo自带的服务降级策略只能降级超时异常,而不能降级业务异常。
那么业务异常应该如何降级呢?我们可以整合Hystrix
进行业务异常熔断
参考整合hystrix (opens new window)