admin管理员组文章数量:1607667
认识微服务
单体架构
单体架构:将业务的所有功能集中在一个项目中开发,打成一个包部署。
优点:
- 架构简单
- 部署成本低
缺点:
- 团队写作成本高
- 系统发布效率低
- 系统可用性差
总结:
单体架构适合开发功能相对简单,规模较小的项目。
微服务
微服务是一种软件架构风格,它是以专注于单一职责的很多小型项目为基础,组合出复杂的大型应用。
微服务架构,是服务化思想指导下的一套最佳实践架构方案。服务化,就是把单体架构中的功能模块拆分为多个单独项目。
- 粒度小
- 团队自治
- 服务自治
SpringCloud
SpringCloud是目前国内使用最广泛的微服务框架。官网地址:Spring Cloud。
SpringCloud集成了各种微服务功能组件,并基于SpringBoot实现了这些组件的自动装配,从而提供了良好的开箱即用体验:
SpringCloud和SpringBoot的版本对应:
SpringCloud版本 | SpringBoot版本 |
2022.0.x aka Kilburn | 3.0.x |
2021.0.x aka Jubilee | 2.6.x, 2.7.x (Starting with 2021.0.3) |
2020.0.x aka Ilford | 2.4.x, 2.5.x (Starting with 2020.0.3) |
Hoxton | 2.2.x, 2.3.x (Starting with SR5) |
Greenwich | 2.1.x |
Finchley | 2.0.x |
Edgware | 1.5.x |
Dalston | 1.5.x |
拆分微服务
服务拆分原则
什么时候拆分?
- 创业型项目:先采用单体架构,快速开发,快速试错。随着规模扩大,逐渐拆分。
- 确定的大型项目:资金充足,目标明确,可以直接选择微服务架构,避免后续拆分的麻烦。
怎么拆分?
从拆分目标来说,要做到:
- 高内聚:每个微服务的职责要尽量要尽量单一,包含的业务相互关联度高、完整度高。
- 低耦合:每个微服务的功能要相对独立,尽量减少对其它微服务的依赖。
从拆分方式来说,一般包含两种方式:
- 纵向拆分:按照业务模块来拆分
- 横向拆分:抽取公共服务,提高复用性
☆拆分服务
工程结构有两种:
- 独立Project
- Maven聚合
☆远程调用(复习苍穹外卖的HttpClient)
拆分后碰到的第一个问题是什么,如何解决?
- 拆分后,某些数据在不同服务,无法直接调用本地方法查询数据
- 利用RestTemplate发送Http请求,实现远程调用
☆服务治理
注册中心原理
服务治理中的三个角色分别是什么?
- 注册中心:记录并监控微服务各实例状态,推送服务变更信息。
- 服务提供者:暴露服务接口,供其他服务调用。
- 服务调用者:调用其他服务提供的接口
消费者如何指导提供者的地址?
- 服务提供者会在启动时注册自己的信息到注册中心,消费者可以从注册中心订阅和拉取服务信息。
消费者如何得知服务状态变更?
- 服务提供者通过心跳机制向注册中心报告自己的健康状态,当心跳异常时注册中心会将异常服务剔除,并通知订阅了该服务的消费者。
当提供者有多个实例,消费者该选哪一个?
- 消费者可以通过负载均衡算法,从多个实例中选择一个。
什么是负载均衡?
Nacos注册中心
Nacos是目前国内企业中占比最多的注册中心组件。它是阿里巴巴的产品,目前已经加入SpringCloudAlibaba中。
Nacos是一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。
官网:Nacos官网| Nacos 配置中心 | Nacos 下载| Nacos 官方社区 | Nacos
服务注册
提供者需要连接nacos以暴露服务
在pom.xml文件中导入Nacos依赖
<!-- nacos 服务注册发现 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
在bootstrop.yaml中配置Nacos地址
spring:
application:
name: item-service # 服务名称
cloud:
nacos:
server-addr: 192.168.100.103:8848 # nacos地址
服务发现
消费者需要连接nacos以拉取和订阅服务,前两步与服务注册一样,后面要加上服务调用
服务调用:
// 加上类注解@RequiredArgsConstructor
private final DiscoveryClient discoveryClient;
private void handleCartItems(List<CartVO> vos) {
// 1根据服务名称获取服务的实例列表
List<ServiceInstance> instances = discoveryClient.getInstances("item-service");
if (CollUtils.isEmpty(instances)) {
return;
}
// 2手写负载均衡,从实例列表中挑选一个实例
ServiceInstance instance = instances.get(RandomUtil.randomInt(instances.size()));
// 3利用RestTemplate发起http请求,得到http的响应
ResponseEntity<List<ItemDTO>> response = restTemplate.exchange(
instance.getUri() + "/items/ids={ids}",
HttpMethod.GET,
null,
new ParameterizedTypeReference<List<ItemDTO>>() {
},
Map.of("ids", CollUtils.join(itemIds, ","))
);
// 4 解析响应
if (!response.getStatusCode().is2xxSuccessful()) {
return;
}
List<ItemDTO> items = response.getBody();
}
@RequiredArgsConstructor是Lombok库中的一个注解,用于自动生成一个包含所有final和@NonNull字段的构造函数。这个注解可以帮助简化Java类的编写,避免手动编写繁琐的构造函数代码。
OpenFeign
OpenFeign是一个声明式的http客户端,是SpringCloud在Eureka公司开源的Feign基础上改造而来。官网:GitHub - OpenFeign/feign: Feign makes writing java http clients easier
其作用是基于SpringMVC的常见注解,帮我们优雅的实现http请求发送。
快速入门
OpenFeign已经被SpringCloud自动装配,实现起来非常简单:
①.引入依赖
在
cart-service
服务的pom.xml中引入OpenFeign
的依赖和loadBalancer
依赖:
<!--openFeign-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<!--负载均衡器-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
②.启用OpenFeign
在
cart-service
的CartApplication
启动类上添加注解,启动OpenFeign
功能:
@MapperScan("com.itcam.cart.mapper")
@SpringBootApplication
//@EnableFeignClients(basePackages = "com.itcam.api.client") // 指定FeignClient所在包
@EnableFeignClients(clients = {ItemClient.class},defaultConfiguration = DefaultFeignConfig.class) // 指定FeignClient字节码
public class CartApplication {
public static void main(String[] args) {
SpringApplication.run(CartApplication.class, args);
}
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
}
③.编写FeignClient
在
cart-service
中,定义一个新的接口,编写Feign
客户端:
@FeignClient(value = "item-service", fallbackFactory = ItemClientFallbackFactory.class)
public interface ItemClient {
@GetMapping("/items")
List<ItemDTO> queryItemByIds(@RequestParam("ids") Collection<Long> ids);
}
这里只需要声明接口,无需实现方法。接口中的几个关键信息:
@FeignClient("item-service")
:声明服务名称
@GetMapping
:声明请求方式
@GetMapping("/items")
:声明请求路径
@RequestParam("ids") Collection<Long> ids
:声明请求参数
List<ItemDTO>
:返回值类型
④.使用FiengClient实现远程调用
在
cart-service
的com.hmall.cart.service.impl.CartServiceImpl
中改造代码,直接调用ItemClient
的方法:
@Service
@RequiredArgsConstructor
public class CartServiceImpl extends ServiceImpl<CartMapper, Cart> implements ICartService {
private final ItemClient itemClient;
@Override
public void addItem2Cart(CartFormDTO cartFormDTO) {
...
}
@Override
public List<CartVO> queryMyCarts() {
...
}
private void handleCartItems(List<CartVO> vos) {
// 1.获取商品id
Set<Long> itemIds = vos.stream().map(CartVO::getItemId).collect(Collectors.toSet());
// 2.查询商品
List<ItemDTO> items = itemClient.queryItemByIds(itemIds);
if (CollUtils.isEmpty(items)) {
return;
}
// 3.转为 id 到 item的map
Map<Long, ItemDTO> itemMap = items.stream().collect(Collectors.toMap(ItemDTO::getId, Function.identity()));
// 4.写入vo
for (CartVO v : vos) {
ItemDTO item = itemMap.get(v.getItemId());
if (item == null) {
continue;
}
v.setNewPrice(item.getPrice());
v.setStatus(item.getStatus());
v.setStock(item.getStock());
}
}
@Override
@Transactional
public void removeByItemIds(Collection<Long> itemIds) {
...
}
}
连接池
Feign底层发起http请求,依赖于其它的框架。其底层支持的http客户端实现包括:
HttpURLConnection:默认实现,不支持连接池
Apache HttpClient :支持连接池
OKHttp:支持连接池
①.引入依赖
在cart-service的pom.xml中引入依赖:
<!--OK http 的依赖 -->
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-okhttp</artifactId>
</dependency>
②.开启连接池
在cart-service的application.yml配置文件中开启Feign的连接池功能:
feign:
okhttp:
enabled: true # 开启OKHttp功能
最佳实践
避免重复编码的办法就是抽取。不过这里有两种抽取思路:
- 思路1:抽取到微服务之外的公共module
- 思路2:每个微服务自己抽取一个module
不难看出:
方案1抽取更加简单,工程结构也比较清晰,但缺点是整个项目耦合度偏高。
方案2抽取相对麻烦,工程结构相对更复杂,但服务之间耦合度降低。
抽取Feign客户端
创建hm-api模块
pom.xml的依赖如下:
<dependencies>
<!--open feign-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<!-- load balancer-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
<!-- swagger 注解依赖 -->
<dependency>
<groupId>io.swagger</groupId>
<artifactId>swagger-annotations</artifactId>
<version>1.6.6</version>
<scope>compile</scope>
</dependency>
</dependencies>
扫描包
在其他模块导入hm-api模块的依赖:
<!--feign模块-->
<dependency>
<groupId>com.heima</groupId>
<artifactId>hm-api</artifactId>
<version>1.0.0</version>
</dependency>
案例
在hm-api的client包下定义一个接口(比如CartClient):
@FeignClient("cart-service")
public interface CartClient {
@DeleteMapping("/carts")
void deleteCartItemByIds(@RequestParam("ids") Collection<Long> ids);
}
在其他模块(比如cart-service)的启动类上添加@EableFeignClients注解:
@MapperScan("com.itcam.cart.mapper")
@SpringBootApplication
@EnableFeignClients(basePackages = "com.itcam.api.client") // 指定FeignClient所在包
public class CartApplication {
public static void main(String[] args) {
SpringApplication.run(CartApplication.class, args);
}
}
日志配置
日志级别有4级:
- NONE:不记录任何日志信息,这是默认值。
- BASIC:仅记录请求的方法,URL以及响应状态码和执行时间
- HEADERS:在BASIC的基础上,额外记录了请求和响应的头信息
- FULL:记录所有请求和响应的明细,包括头信息、请求体、元数据。
Feign默认的日志级别就是NONE,所以默认我们看不到请求日志。
在hm-api模块下新建一个配置类,定义Feign的日志级别:
public class DefaultFeignConfig {
@Bean
public Logger.Level feignLoggerLevel() {
return Logger.Level.FULL;
}
}
局部生效:在某个FeignClient中配置,只对当前FeignClient生效
@FeignClient(value = "item-service", configuration = DefaultFeignConfig.class)
比如:
@FeignClient(value = "cart-service",configuration = DefaultFeignConfig.class)
public interface CartClient {
@DeleteMapping("/carts")
void deleteCartItemByIds(@RequestParam("ids") Collection<Long> ids);
}
全局生效:在@EnableFeignClients中配置,针对所有FeignClient生效
@EnableFeignClients(defaultConfiguration = DefaultFeignConfig.class)
比如:
@MapperScan("com.itcam.cart.mapper")
@SpringBootApplication
@EnableFeignClients(basePackages = "com.itcam.api.client",defaultConfiguration = DefaultFeignConfig.class) // 指定FeignClient所在包
public class CartApplication {
public static void main(String[] args) {
SpringApplication.run(CartApplication.class, args);
}
}
☆网关路由
快速入门
1.创建新模块
2.引入网关依赖
<dependencies>
<!--common-->
<dependency>
<groupId>com.heima</groupId>
<artifactId>hm-common</artifactId>
<version>1.0.0</version>
</dependency>
<!--网关-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<!--nacos discovery-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!--负载均衡-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
</dependencies>
3.编写启动类
@SpringBootApplication(exclude={DataSourceAutoConfiguration.class, HibernateJpaAutoConfiguration.class})
public class GatewayApplication {
public static void main(String[] args) {
SpringApplication.run(GatewayApplication.class, args);
}
}
4.配置路由规则
在
hm-gateway
模块的resources
目录新建一个application.yaml
文件,内容如下:
server:
port: 8080
spring:
application:
name: gateway
cloud:
nacos:
server-addr: 192.168.100.103:8848
gateway:
routes:
- id: item # 路由规则id,自定义,唯一
uri: lb://item-service # 路由的目标服务,lb代表负载均衡,会从注册中心拉取服务列表
predicates: # 路由断言,判断当前请求是否符合当前规则,符合则路由到目标服务
- Path=/items/**,/search/** # 这里是以请求路径作为判断规则
- id: cart
uri: lb://cart-service
predicates:
- Path=/carts/**
- id: user
uri: lb://user-service
predicates:
- Path=/users/**,/addresses/**
- id: trade
uri: lb://trade-service
predicates:
- Path=/orders/**
- id: pay
uri: lb://pay-service
predicates:
- Path=/pay-orders/**
路由属性(bootstrap.yaml)
spring:
cloud:
gateway:
routes:
- id: item
uri: lb://item-service
predicates:
- Path=/items/**,/search/**
其中ctrl+鼠标左键进入routes对应的源码如下:
public void setRoutes(List<RouteDefinition> routes) {
this.routes = routes;
if (routes != null && routes.size() > 0 && this.logger.isDebugEnabled()) {
this.logger.debug("Routes supplied from Gateway Properties: " + routes);
}
}
routes是一个List<RouteDefinition>类型的集合,其中RouteDefinition的源码如下:
@Validated
public class RouteDefinition {
private String id;
private @NotEmpty @Valid List<PredicateDefinition> predicates = new ArrayList();
private @Valid List<FilterDefinition> filters = new ArrayList();
private @NotNull URI uri;
private Map<String, Object> metadata = new HashMap();
private int order = 0;
略...
}
四个属性含义如下:
id
:路由的唯一标示
predicates
:路由断言,其实就是匹配条件
filters
:路由过滤条件,后面讲
uri
:路由目标地址,lb://
代表负载均衡,从注册中心获取目标微服务的实例列表,并且负载均衡选择一个访问。
路由断言,也就是predicates,SpringCloudGateway中支持的断言类型有12种。
名称 | 说明 | 示例 |
After | 是某个时间点后的请求 | - After=2037-01-20T17:42:47.789-07:00[America/Denver] |
Before | 是某个时间点之前的请求 | - Before=2031-04-13T15:14:47.433+08:00[Asia/Shanghai] |
Between | 是某两个时间点之前的请求 | - Between=2037-01-20T17:42:47.789-07:00[America/Denver], 2037-01-21T17:42:47.789-07:00[America/Denver] |
Cookie | 请求必须包含某些cookie | - Cookie=chocolate, ch.p |
Header | 请求必须包含某些header | - Header=X-Request-Id, \d+ |
Host | 请求必须是访问某个host(域名) | - Host=**.somehost,**.anotherhost |
Method | 请求方式必须是指定方式 | - Method=GET,POST |
Path | 请求路径必须符合指定规则 | - Path=/red/{segment},/blue/** |
Query | 请求参数必须包含指定参数 | - Query=name, Jack或者- Query=name |
RemoteAddr | 请求者的ip必须是指定范围 | - RemoteAddr=192.168.1.1/24 |
Weight | 权重处理 | - Weight=group1, 2 |
XForwarded Remote Addr | 基于请求的来源IP做判断 | - XForwardedRemoteAddr=192.168.1.1/24 |
路由过滤器,也就是filters,网关中提供了33种,这里主要列举其中6种。
名称 | 说明 | 示例 |
AddRequestHeader | 给当前请求添加一个请求头 | AddrequestHeader=headerName,headerValue |
RemoveRequestHeader | 移除请求中的一个请求头 | RemoveRequestHeader=headerName |
AddResponseHeader | 给响应结果中添加一个响应头 | AddResponseHeader=headerName,headerValue |
RemoveResponseHeader | 从响应结果中移除有一个响应头 | RemoveResponseHeader=headerName |
RewritePath | 请求路径重写 | RewritePath=/red/?(?<segment>.*), /$\{segment} |
StripPrefix | 去除请求路径中的N段前缀 | StripPrefix=1,则路径/a/b转发时只保留/b |
... |
网关登录校验
思考
网关路由是配置的,请求转发是Gateway内部代码,我们如何在转发之前做登录校验?
网关校验JWT之后,如何将用户信息传递给微服务?
微服务之间也会相互调用,这种调用不经过网关,又该如何传递用户信息?
网关过滤器
登录校验必须在请求转发到微服务之前做,否则就失去了意义。而网关的请求转发是Gateway
内部代码实现的,要想在请求转发之前做登录校验,就必须了解Gateway
内部工作的基本原理。
网关过滤器链中的过滤器有两种:
-
GatewayFilter
:路由过滤器,作用范围比较灵活,可以是任意指定的路由Route
.(暂时跳过) -
GlobalFilter
:全局过滤器,作用范围是所有路由,不可配置。
@Component
public class MyGlobalFilter implements GlobalFilter, Ordered {
/**
* 处理请求并将其传递给下一个过滤器
* @param exchange 当前请求的上下文,其中包含request、response等各种数据
* @param chain 过滤器链,基于它向下传递请求
* @return 根据返回值标记当前请求是否被完成或拦截,chain.filter(exchange)就放行了。
*/
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// TODO:模拟登录校验逻辑
// 获取请求
ServerHttpRequest request = exchange.getRequest();
// 过滤器业务处理
HttpHeaders headers = request.getHeaders();
System.out.println("headers = " + headers);
// 因为filter是在调用过滤链中的下一个过滤器,调用过程中把exchange传入下一个过滤器,这样下一个过滤器也能够使用
return chain.filter(exchange);// 放行
}
@Override
public int getOrder() {
// 过滤器执行顺序,值越小,优先级越高
// todo: 注意 0不是最高优先级,自带的过滤器拥有更高的优先级
return 0;
}
}
☆身份认证
身份认证,也就是登录校验,它需要用到JWT,而且JWT的加密需要秘钥和加密工具。
按顺序添加以下代码:
@Data
@ConfigurationProperties(prefix = "hm.jwt")
public class JwtProperties {
private Resource location;
private String password;
private String alias;
private Duration tokenTTL = Duration.ofMinutes(10);
}
@Configuration
@EnableConfigurationProperties(JwtProperties.class)
public class SecurityConfig {
@Bean
public PasswordEncoder passwordEncoder(){
return new BCryptPasswordEncoder();
}
@Bean
public KeyPair keyPair(JwtProperties properties){
// 获取秘钥工厂
KeyStoreKeyFactory keyStoreKeyFactory =
new KeyStoreKeyFactory(
properties.getLocation(),
properties.getPassword().toCharArray());
//读取钥匙对
return keyStoreKeyFactory.getKeyPair(
properties.getAlias(),
properties.getPassword().toCharArray());
}
}
@Data
@Component
@ConfigurationProperties(prefix = "hm.auth")
public class AuthProperties {
private List<String> includePaths;
private List<String> excludePaths;
}
@Component
public class JwtTool {
private final JWTSigner jwtSigner;
public JwtTool(KeyPair keyPair) {
this.jwtSigner = JWTSignerUtil.createSigner("rs256", keyPair);
}
/**
* 创建 access-token
*
* @param userId 用户信息
* @return access-token
*/
public String createToken(Long userId, Duration ttl) {
// 1.生成jws
return JWT.create()
.setPayload("user", userId)
.setExpiresAt(new Date(System.currentTimeMillis() + ttl.toMillis()))
.setSigner(jwtSigner)
.sign();
}
/**
* 解析token
*
* @param token token
* @return 解析刷新token得到的用户信息
*/
public Long parseToken(String token) {
// 1.校验token是否为空
if (token == null) {
throw new UnauthorizedException("未登录");
}
// 2.校验并解析jwt
JWT jwt;
try {
jwt = JWT.of(token).setSigner(jwtSigner);
} catch (Exception e) {
throw new UnauthorizedException("无效的token", e);
}
// 2.校验jwt是否有效
if (!jwt.verify()) {
// 验证失败
throw new UnauthorizedException("无效的token");
}
// 3.校验是否过期
try {
JWTValidator.of(jwt).validateDate();
} catch (ValidateException e) {
throw new UnauthorizedException("token已经过期");
}
// 4.数据格式校验
Object userPayload = jwt.getPayload("user");
if (userPayload == null) {
// 数据为空
throw new UnauthorizedException("无效的token");
}
// 5.数据解析
try {
return Long.valueOf(userPayload.toString());
} catch (RuntimeException e) {
// 数据格式有误
throw new UnauthorizedException("无效的token");
}
}
}
@Component
@RequiredArgsConstructor // 生成构造函数
@EnableConfigurationProperties(AuthProperties.class)
public class AuthGlobalFilter implements GlobalFilter, Ordered {
private final AuthProperties authProperties;
private final JwtTool jwtTool;
private final AntPathMatcher antPathMatcher = new AntPathMatcher();
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 1.获取request
ServerHttpRequest request = exchange.getRequest();
// 2.判断是否需要做登录拦截
if (isExclude(request.getPath().toString())) {
// 无需拦截,直接放行
return chain.filter(exchange);
}
// 3.获取token
String token = null;
List<String> headers = request.getHeaders().get("authorization"); // authorization n.批准书,授权书;批准,授权
if (headers != null && !headers.isEmpty()) {
token = headers.get(0);
}
// 4.校验并解析token
Long userId = null;
try {
userId = jwtTool.parseToken(token);
} catch (Exception e) {
// 拦截,设置响应状态码为401
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.UNAUTHORIZED); // 设置枚举类型显得专业
return response.setComplete(); // 完结
}
// 5.TODO:传递用户信息
System.out.println("userId = " + userId);
// 6.放行
return chain.filter(swe);
}
private boolean isExclude(String path) {
for (String pathPattern : authProperties.getExcludePaths()) {
if (antPathMatcher.match(pathPattern, path)) {
return true;
}
}
return false;
}
@Override
public int getOrder() {
return 0;
}
}
传递用户信息
保存用户到请求头
在第5步传递用户信息编写保存用户信息到请求头中的代码:
@Component
@RequiredArgsConstructor // 生成构造函数
@EnableConfigurationProperties(AuthProperties.class)
public class AuthGlobalFilter implements GlobalFilter, Ordered {
private final AuthProperties authProperties;
private final JwtTool jwtTool;
private final AntPathMatcher antPathMatcher = new AntPathMatcher();
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 1.获取request
ServerHttpRequest request = exchange.getRequest();
// 2.判断是否需要做登录拦截
if (isExclude(request.getPath().toString())) {
// 无需拦截,直接放行
return chain.filter(exchange);
}
// 3.获取token
String token = null;
List<String> headers = request.getHeaders().get("authorization"); // authorization n.批准书,授权书;批准,授权
if (headers != null && !headers.isEmpty()) {
token = headers.get(0);
}
// 4.校验并解析token
Long userId = null;
try {
userId = jwtTool.parseToken(token);
} catch (Exception e) {
// 拦截,设置响应状态码为401
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.UNAUTHORIZED); // 设置枚举类型显得专业
return response.setComplete(); // 完结
}
// 5.传递用户信息
String userInfo = userId.toString();
ServerWebExchange swe = exchange.mutate() // mutate 对下游请求做出改变
.request(builder -> builder.header("user-info", userInfo))
.build();
// 6.放行
return chain.filter(swe);
}
private boolean isExclude(String path) {
for (String pathPattern : authProperties.getExcludePaths()) {
if (antPathMatcher.match(pathPattern, path)) {
return true;
}
}
return false;
}
@Override
public int getOrder() {
return 0;
}
}
保存和获取用户
提供的保存和获取用户的方法:
public class UserContext {
private static final ThreadLocal<Long> tl = new ThreadLocal<>();
/**
* 保存当前登录用户信息到ThreadLocal
* @param userId 用户id
*/
public static void setUser(Long userId) {
tl.set(userId);
}
/**
* 获取当前登录用户信息
* @return 用户id
*/
public static Long getUser() {
return tl.get();
}
/**
* 移除当前登录用户信息
*/
public static void removeUser(){
tl.remove();
}
}
拦截器获取用户
在hm-common模块下定义一个拦截器:
public class UserInfoInterceptors implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 1.获取登录用户信息
String userInfo = request.getHeader("user-info");
// 2.判断是否获取了用户,如果有,存入ThreadLocal
if (StrUtil.isNotBlank(userInfo)) {
UserContext.setUser(Long.valueOf(userInfo));
}
// 3.放行
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
// 清理用户
UserContext.removeUser();
}
}
接着在hm-common模块下编写SpringMVC的配置类,配置登录拦截器:
@Configuration
@ConditionalOnClass(DispatcherServlet.class)
// 只要是微服务,都有SpringMvc,有SpringMvc就一定有SpringMvc的核心api就是DispatcherServlet.class
public class MvcConfig implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new UserInfoInterceptors());
}
}
注意:记得在META-INFO/spring.factories添加com.hmallmon.config.MvcConfig,\
OpenFeign传递用户
微服务项目中的很多业务要多个微服务共同合作完成,而这个过程中也需要传递登录用户信息。
- 微服务之间调用是基于OpenFeign来实现的,并不是我们自己发送的请求,我们要借助Feign中提供的一个拦截器接口:RequestInterceptor
由于FeignClient之前我们都是放在了hm-api模块里,所以我们在hm-api中的com.hmall.api.config.DefaultFeignConfig中编写这个拦截器:
public class DefaultFeignConfig {
@Bean
public Logger.Level feignLoggerLevel() {
return Logger.Level.FULL;
}
@Bean
public RequestInterceptor userRequestInterceptor() {
return new RequestInterceptor() {
@Override
public void apply(RequestTemplate requestTemplate) {
// 获取登录用户
Long userId = UserContext.getUser();
if (userId != null) {
// 如果不为空则放入请求头中,传递给下游微服务
requestTemplate.header("user-info", userId.toString());
}
}
};
}
}
☆配置管理
配置共享
分为两步:
- 在Nacos中添加共享配置
- 微服务拉取配置
在nacos的配置列表中配置以下的yaml文件:
jdbc相关的shared-jdbc.yaml
spring:
datasource:
url: jdbc:mysql://${hm.db.host}:${hm.db.port:3306}/${hm.db.database}?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
username: ${hm.db.un:root}
password: ${hm.db.pw:123456}
mybatis-plus:
configuration:
default-enum-type-handler: com.baomidou.mybatisplus.core.handlers.MybatisEnumTypeHandler
global-config:
db-config:
update-strategy: not_null
id-type: auto
${hm.db.port:3306}中:后面表示优先查找的值
日志log相关的shared-log.yaml
logging:
level:
com.hmall: debug
pattern:
dateformat: HH:mm:ss:SSS
file:
path: "logs/${spring.application.name}"
swagger相关的shared-swagger.yaml
knife4j:
enable: true
openapi:
title: ${hm.swagger.title:黑马商城接口文档}
description: ${hm.swagger.desc:黑马商城接口文档}
email: zhanghuyi@itcast
concat: Cammy
url: https://www.itcast
version: v1.0.0
group:
default:
group-name: default
api-rule: package
api-rule-resources:
- ${hm.swagger.package}
在每个微服务的application.yaml文件下可以占位数据,比如购物车服务
hm:
db:
database: hm-cart
swagger:
title: "黑马商城购物车服务接口文档"
package: com.itcam.cart.controller
拉取共享配置
基于NacosConfig拉取共享配置代替微服务的本地配置
1.引入依赖
<!--nacos配置管理-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!--读取bootstrap文件-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
2.新建bootstrap.yaml
spring:
application:
name: cart-service #微服务名称
profiles:
active: dev
cloud:
nacos:
server-addr: 192.168.100.103:8848 #nacos地址
config:
file-extension: yaml # 文件后缀名
shared-configs: # 共享配置
- data-id: shared-jdbc.yaml # 共享mybatis配置
- data-id: shared-log.yaml # 共享日志配置
- data-id: shared-swagger.yaml # 共享日志配置
配置热更新
配置热更新:当修改配置文件中的配置时,微服务无需重启即可使配置生效。
前提条件:
nacos中要有一个与微服务名有关的配置文件
微服务中要以特定的方式读取需要热更新的配置属性
第一种方式是@ConfiturationProperties的方式读取
第二种方式使用传统的方式使用@Value读取属性,要在类上加上@RefreshScope
在购物车模块下演示
package com.itcam.cart.config;
@Data
@Component
@ConfigurationProperties(prefix = "hm.cart")
public class CartProperties {
private Integer maxItems;
}
package com.itcam.cart.service.impl;
@Service
@RequiredArgsConstructor
public class CartServiceImpl extends ServiceImpl<CartMapper, Cart> implements ICartService {
...
private final CartProperties cartProperties;
...
private void checkCartsFull(Long userId) {
Long count = lambdaQuery().eq(Cart::getUserId, userId).count();
if (count >= cartProperties.getMaxItems()) {
throw new BizIllegalException(StrUtil.format("用户购物车课程不能超过{}",
cartProperties.getMaxItems()));
}
}
...
}
动态路由
要实现动态路由首先要将路由配置保存到Nacos,当Nacos中的路由配置变更时,推送最新配置到网关,实时更新网关中的路由信息。
监听Nacos的配置
需要完成两件事情:
- 监听Nacos配置变更的消息
- 当配置变更时,将最新的路由信息更新到网关路由表
nacos官方文档:Java SDK
监听配置
如果希望 Nacos 推送配置变更,可以使用 Nacos 动态监听配置接口来实现。
public void addListener(String dataId, String group, Listener listener)
请求参数
参数名 | 参数类型 | 描述 |
dataId | string | 配置 ID,采用类似 package.class(如com.taobao.tc.refund.log.level)的命名规则保证全局唯一性,class 部分建议是配置的业务含义。 全部字符小写。只允许英文字符和 4 种特殊字符("."、":"、"-"、"_")。不超过 256 字节。 |
group | string | 配置分组,建议填写产品名:模块名(如 Nacos:Test)保证唯一性。 只允许英文字符和4种特殊字符("."、":"、"-"、"_"),不超过128字节。 |
listener | Listener | 监听器,配置变更进入监听器的回调函数。 |
返回值
参数类型 | 描述 |
---|---|
string | 配置值,初始化或者配置变更的时候通过回调函数返回该值。 |
示例
String serverAddr = "{serverAddr}";
String dataId = "{dataId}";
String group = "{group}";
Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
ConfigService configService = NacosFactory.createConfigService(properties);
String content = configService.getConfig(dataId, group, 5000);
System.out.println(content);
configService.addListener(dataId, group, new Listener() {
@Override
public void receiveConfigInfo(String configInfo) {
System.out.println("recieve1:" + configInfo);
}
@Override
public Executor getExecutor() {
return null;
}
});
// 测试让主线程不退出,因为订阅配置是守护线程,主线程退出守护线程就会退出。 正式代码中无需下面代码
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
核心有两个步骤:
-
第一步:创建ConfigService,目的是连接到Nacos
-
第二步:添加配置监听器,编写配置变更的通知处理逻辑
NacosConfigAutoConfiguration中就创建了ConfigService
只要我们拿到NacosConfigManager就等于拿到了ConfigService,第一步就实现了。然后就是第二步编写监听器。
编写监听器
1.在网关gateway中引入依赖
<!--统一配置管理-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!--加载bootstrap-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
2.在网关gateway的resource目录下创建bootstrap.yaml文件
spring:
application:
name: gateway
cloud:
nacos:
server-addr: 192.168.100.103
config:
file-extension: yaml
shared-configs:
- dataId: shared-log.yaml # 共享日志配置
3.修改gateway的resources目录下的application.yml,把之前的路由移除
server:
port: 8080 # 端口
hm:
jwt:
location: classpath:hmall.jks # 秘钥地址
alias: hmall # 秘钥别名
password: hmall123 # 秘钥文件密码
tokenTTL: 30m # 登录有效期
auth:
excludePaths: # 无需登录校验的路径
- /search/**
- /users/login
- /items/**
4.在gateway中定义配置监听器
package com.itcam.gateway.route;
@Slf4j
@Component
@RequiredArgsConstructor
public class DynamicRouteLoader {
private final NacosConfigManager nacosConfigManager;
private final RouteDefinitionWriter routeDefinitionWriter;
// 路由配置文件的id和分组
private final String dataId = "gateway-routes.json";// 为了方便解析从Nacos读取到的路由配置,推荐使用json格式的路由配置
private final String group = "DEFAULT_GROUP";
// 保存更新过的路由id
private final Set<String> routeIds = new HashSet<>();
@PostConstruct // 在Bean初始化之后执行
public void initRouteConfigListener() throws NacosException {
// 1.项目启动时,先拉去一次配置,并且添加配置监听器
String configInfo = nacosConfigManager.getConfigService()
.getConfigAndSignListener(dataId, group, 5000, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(String configInfo) {
// 2.监听到配置变更,需要更新路由表
updateConfigInfo(configInfo);
}
});
// 3.第一次读取到配置文件,也需要更新路由表
updateConfigInfo(configInfo);
}
private void updateConfigInfo(String configInfo) {
log.debug("监听到路由配置信息:{}", configInfo);
// 1.解析配置信息,转为RouteDefinition
List<RouteDefinition> routeDefinitions = JSONUtil.toList(configInfo, RouteDefinition.class);
// 2.删除旧的路由表
for (String routeId : routeIds) {
routeDefinitionWriter.delete(Mono.just(routeId)).subscribe();
}
routeIds.clear();
// 3.更新路由表
for (RouteDefinition routeDefinition : routeDefinitions) {
// 3.1更新路由表
// Mono是响应式编程的容器
routeDefinitionWriter.save(Mono.just(routeDefinition)).subscribe();
// 3.2记录路由id,便于下一次更新时删除
routeIds.add(routeDefinition.getId());
}
}
}
6.最后在Nacos控制台添加路由,路由文件名为gateway-routes.json,类型为json
[
{
"id": "item",
"predicates": [{
"name": "Path",
"args": {"_genkey_0":"/items/**", "_genkey_1":"/search/**"}
}],
"filters": [],
"uri": "lb://item-service"
},
{
"id": "cart",
"predicates": [{
"name": "Path",
"args": {"_genkey_0":"/carts/**"}
}],
"filters": [],
"uri": "lb://cart-service"
},
{
"id": "user",
"predicates": [{
"name": "Path",
"args": {"_genkey_0":"/users/**", "_genkey_1":"/addresses/**"}
}],
"filters": [],
"uri": "lb://user-service"
},
{
"id": "trade",
"predicates": [{
"name": "Path",
"args": {"_genkey_0":"/orders/**"}
}],
"filters": [],
"uri": "lb://trade-service"
},
{
"id": "pay",
"predicates": [{
"name": "Path",
"args": {"_genkey_0":"/pay-orders/**"}
}],
"filters": [],
"uri": "lb://pay-service"
}
]
*.更新的路由,也就是RouteDefinition的常见字段
-
id:路由id
-
predicates:路由匹配规则
-
filters:路由过滤器
-
uri:路由目的地
☆服务保护
什么是雪崩问题
雪崩问题:微服务调用链路中的某个服务故障,引起整个链路中的所有微服务都不可用,这就是雪崩。
雪崩问题产生的原因是什么?
- 微服务相互调用,服务提供者出现故障或阻塞
- 服务调用者没有做好异常处理,导致自身故障
- 调用链中所有服务级联失败,导致整个集群故障
解决问题的思路有哪些?
- 尽量避免服务出现故障或阻塞(保证代码健壮性、保证网络畅通、能应对较高的并发请求)
- 服务调用者做好远程调用异常的后备方案,避免故障扩散
雪崩问题的解决方案
请求限流
请求限流:限制访问微服务的请求的并发量,避免服务因流量激增出现故障。
线程隔离
线程隔离:也叫舱壁模式,模拟船舱隔板的防水原理。通过限定每个业务能使用的线程数量而将故障业务隔离,避免故障扩散。
服务熔断
服务熔断:由断路器统计请求的异常比例或慢调用比例,如果超出阈值则会熔断该业务,则拦截该接口的请求。熔断期间,所有请求快速失败,全都走fallback逻辑。
解决雪崩问题的常见方案有哪些?
- 请求限流:限制流量在服务可以处理的范围,避免因突发流量而故障
- 线程隔离:控制业务可用的线程数量,将故障隔离在一定范围
- 服务熔断:将异常比例过高的接口断开,拒绝所有请求,直接走fallback
- 失败处理:定义fallback逻辑,让业务失败不在抛出异常,而返回默认数据或友好提示
Sentinel | Hystrix | |
线程隔离 | 信号量隔离 | 线程池隔离/信号量隔离 |
熔断策略 | 基于慢调用比例或异常比例 | 基于异常比率 |
限流 | 基于 QPS,支持流量整形 | 有限的支持 |
Fallback | 支持 | 支持 |
控制台 | 开箱即用,可配置规则、查看秒级监控、机器发现等 | 不完善 |
配置方式 | 基于控制台,重启后失效 | 基于注解或配置文件,永久生效 |
初识Sentinel
Sentinel是阿里巴巴开源的一款微服务流量控制组件。官网:home | Sentinel
Sentinel 的使用可以分为两个部分:
-
核心库(Jar包):不依赖任何框架/库,能够运行于 Java 8 及以上的版本的运行时环境,同时对 Dubbo / Spring Cloud 等框架也有较好的支持。在项目中引入依赖即可实现服务限流、隔离、熔断等功能。
-
控制台(Dashboard):Dashboard 主要负责管理推送规则、监控、管理机器信息等。
搭建Sentinel的控制台
- 1.下载jar包
- 下载地址:https://github/alibaba/Sentinel/releases
- 2.运行
- 在jar包路径下cmd进入终端输入:java -Dserver.port=8090 -Dcsp.sentinel.dashboard.server=localhost:8090 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard.jar
- 3.访问
- 访问http://localhost:8090页面,就可以看到sentinel的控制台了
簇点链路
簇点链路,就是单机调用链路。是一次请求进入服务后经过的每一个被Sentinel监控的资源链。默认Sentinel会监控SpringMVC的每一个Endpoint(Http接口)。限流、熔断等都是针对簇点链路的资源设置的。而资源名默认就是接口的请求路径。
微服务整合
在cart-service
模块中整合sentinel,连接sentinel-dashboard
控制台。
1.引入sentinel依赖
<!--sentinel-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
2.配置控制台
spring:
cloud:
sentinel:
transport:
dashboard: localhost:8090
Restful风格的API请求路径一般都相同,这会导致簇点资源名称重复。因此我们要修改配置,把请求方式+请求路径作为簇点资源名称:
spring:
cloud:
sentinel:
transport:
dashboard: localhost:8090
http-method-specify: true # 开启请求方式前缀
请求限流
在簇点链路后面点击流控按钮,即可对其做限流配置
簇点资源的流量限制在了每秒6个,也就是最大QPS为6
QPS 是 "每秒查询数"(Queries Per Second),它是一个衡量系统性能的指标,特别是在数据库、网络和服务器等领域中常常被用来评估系统的处理能力。
线程隔离
限流可以降低服务器压力,尽量减少因并发流量引起的服务故障的概率,但并不能完全避免服务故障。一旦某个服务出现故障,我们必须隔离对这个服务的调用,避免发生雪崩。
比如,查询购物车的时候需要查询商品,为了避免因商品服务出现故障导致购物车服务级联失败,我们可以把购物车业务中查询商品的部分隔离起来,限制可用的线程资源:
这样,即便商品服务出现故障,最多导致查询购物车业务故障,并且可用的线程资源也被限定在一定范围,不会导致整个购物车服务崩溃。
所以,我们要对查询商品的FeignClient接口做线程隔离。
OpenFeign整合Sentinel
在需要开启线程隔离的模块的application.yml文件开启Feign的sentinel功能:
feign:
sentinel:
enabled: true # 开启feign对sentinel的支持
注意:
默认情况下SpringBoot项目的tomcat最大线程数是200,允许的最大连接是8492,单机测试很难打满。
server:
port: 8082
tomcat:
threads:
max: 50 # 允许的最大线程数
accept-count: 50 # 最大排队等待数量
max-connections: 100 # 允许的最大连接
配置线程隔离
点击对应的簇点资源后面的流控按钮
请求限流选QPS,线程隔离选并发线程数,两者不同。
例如,如果有5个并发线程,如果单线程的QPS为2,则5线程的QPS为10。
服务熔断
利用线程隔离对查询购物车业务进行隔离,保护了购物车服务的其它接口。由于查询商品的功能耗时较高(我们模拟了500毫秒延时),再加上线程隔离限定了线程数为5,导致接口吞吐能力有限,最终QPS只有10左右。这就导致了几个问题:
- 第一,超出的QPS上限的请求就只能抛出异常,从而导致购物车的查询失败。但从业务角度来说,即便没有查询到最新的商品信息,购物车也应该展示给用户,用户体验更好。也就是给查询失败设置一个降级处理逻辑。
- 第二,由于查询商品的延迟较高(模拟的500ms),从而导致查询购物车的响应时间也变的很长。这样不仅拖慢了购物车服务,消耗了购物车服务的更多资源,而且用户体验也很差。对于商品服务这种不太健康的接口,我们应该直接停止调用,直接走降级逻辑,避免影响到当前服务。也就是将商品查询接口熔断。
编写降级逻辑
触发限流或熔断后的请求不一定要直接报错,也可以返回一些默认数据或者友好提示,用户体验会更好。
比如:在查询cart业务中,对item远程调用的client编写fallback;这样查询cart不会失败,而是返回默认值。
给FeignClient编写失败后的降级逻辑有两种方式:
-
方式一:FallbackClass,无法对远程调用的异常做处理
-
方式二:FallbackFactory,可以对远程调用的异常做处理,我们一般选择这种方式。
演示方式二的失败降级处理
步骤一:在hm-api模块中给ItemClient
定义降级处理类,实现FallbackFactory
package com.hmall.api.client.fallback;
@Slf4j
public class ItemClientFallback implements FallbackFactory<ItemClient> {
@Override
public ItemClient create(Throwable cause) {
return new ItemClient() {
@Override
public List<ItemDTO> queryItemByIds(Collection<Long> ids) {
log.error("远程调用ItemClient#queryItemByIds方法出现异常,参数:{}", ids, cause);
// 查询购物车允许失败,查询失败,返回空集合
return CollUtils.emptyList();
}
@Override
public void deductStock(List<OrderDetailDTO> items) {
// 库存扣减业务需要触发事务回滚,查询失败,抛出异常
throw new BizIllegalException(cause);
}
};
}
}
步骤二:在hm-api
模块中的com.hmall.api.config.DefaultFeignConfig
类中将ItemClientFallback
注册为一个Bean
public class DefaultFeignConfig {
...
@Bean
public ItemClientFallbackFactory itemClientFallbackFactory() {
return new ItemClientFallbackFactory();
}
步骤三:在hm-api
模块中的ItemClient
接口中使用ItemClientFallbackFactory
@FeignClient(value = "item-service", fallbackFactory = ItemClientFallbackFactory.class)
public interface ItemClient {
@GetMapping("/items")
List<ItemDTO> queryItemByIds(@RequestParam("ids") Collection<Long> ids);
@PutMapping("items/stock/deduct")
void deductStock(@RequestBody List<OrderDetailDTO> items);
@PutMapping("items/stock/restore")
void restoreStock(@RequestBody List<OrderDetailDTO> orderDetailDTOS);
}
熔断
熔断是解决雪崩问题的重要手段。思路是由断路器统计服务调用的异常比例、慢请求比例,如果超出阈值则会熔断该服务。即拦截访问该服务的一切请求;而当服务恢复时,断路器会放行访问该服务的请求。
断路器的工作状态切换由一个状态机来控制:
状态机包括三个状态:
-
closed:关闭状态,断路器放行所有请求,并开始统计异常比例、慢请求比例。超过阈值则切换到open状态
-
open:打开状态,服务调用被熔断,访问被熔断服务的请求会被拒绝,快速失败,直接走降级逻辑。Open状态持续一段时间后会进入half-open状态
-
half-open:半开状态,放行一次请求,根据执行结果来判断接下来的操作。
-
请求成功:则切换到closed状态
-
请求失败:则切换到open状态
-
配置熔断策略
在控制台通过点击簇点后的熔断
按钮来配置熔断策略
按照慢调用比例来做熔断
-
RT超过200毫秒的请求调用就是慢调用
-
统计最近1000ms内的最少5次请求,如果慢调用比例不低于0.5,则触发熔断
-
熔断持续时长20s
RT是平均响应时间(Response Time),是用于衡量服务或资源处理请求的速度的一个关键指标。
☆分布式事务
在分布式系统中,如果一个业务需要多个服务合作完成,而且每一个服务都有事务,多个事务必须同时成功或失败,这样的事务就是分布式事务。其中的每个服务的事务就是一个分支事务。整个业务称为全局事务。
由于订单、购物车、商品分别在三个不同的微服务,而每个微服务都有自己独立的数据库,因此下单过程中就会跨多个数据库完成业务。而每个微服务都会执行自己的本地事务:
-
交易服务:下单事务
-
购物车服务:清理购物车事务
-
库存服务:扣减库存事务
整个业务中,各个本地事务是有关联的。因此每个微服务的本地事务,也可以称为分支事务。多个有关联的分支事务一起就组成了全局事务。我们必须保证整个全局事务同时成功或失败。
虽然每个单独的业务都能在本地遵循ACID,但是它们互相之间没有感知,不知道有人失败了,无法保证最终结果的统一,也就无法遵循ACID的事务特性了。
这就是分布式事务问题,出现以下情况之一就可能产生分布式事务问题:
-
业务跨多个服务实现
-
业务跨多个数据源实现
初识Seata
Seata是 2019 年 1 月份蚂蚁金服和阿里巴巴共同开源的分布式事务解决方案。致力于提供高性能和简单易用的分布式事务服务,为用户打造一站式的分布式解决方案。
Seata官网:Apache Seata
分布式事务解决思路:
解决分布式事务,各个子事务之间必须能感知到彼此的事务状态,才能保证状态一致。
Seata事务管理中有三个重要的角色:
- TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,协调全局事务提交或回滚。
- TM (Transaction Manager) - 事务管理器:定义全局事务的范围、开始全局事务、提交或回滚全局事务。
- RM (Resource Manager) - 资源管理器:管理分支事务,与TC交谈以注册分支事务和报告分支事务的状态
- TM和RM可以理解为Seata的客户端部分,引入到参与事务的微服务依赖中即可。将来TM和RM就会协助微服务,实现本地分支事务与TC之间交互,实现事务的提交或回滚。
- TC服务则是事务协调中心,是一个独立的微服务,需要单独部署。
部署TC服务
微服务集成Seata
引入依赖
为了方便各个微服务集成seata,我们需要把seata配置共享到nacos,因此trade-service
模块不仅仅要引入seata依赖,还要引入nacos依赖
<!--统一配置管理-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!--读取bootstrap文件-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
<!--seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
改造配置
在nacos上添加一个共享的seata配置,命名为shared-seata.yaml
seata:
registry: # TC服务注册中心的配置,微服务根据这些信息去注册中心获取tc服务地址
type: nacos # 注册中心类型 nacos
nacos:
server-addr: 192.168.100.103:8848 # nacos地址
namespace: "" # namespace,默认为空
group: DEFAULT_GROUP # 分组,默认是DEFAULT_GROUP
application: seata-server # seata服务名称
username: nacos
password: nacos
tx-service-group: hmall # 事务组名称
service:
vgroup-mapping: # 事务组与tc集群的映射关系
hmall: "default"
改造trade-service
模块,添加bootstrap.yaml
spring:
application:
name: trade-service # 服务名称
profiles:
active: dev
cloud:
nacos:
server-addr: 192.168.100.103 # nacos地址
config:
file-extension: yaml # 文件后缀名
shared-configs: # 共享配置
- dataId: shared-jdbc.yaml # 共享mybatis配置
- dataId: shared-log.yaml # 共享日志配置
- dataId: shared-swagger.yaml # 共享日志配置
- dataId: shared-seata.yaml # 共享seata配置
改造trade-service
模块的application.yaml文件
server:
port: 8085
feign:
okhttp:
enabled: true # 开启OKHttp连接池支持
sentinel:
enabled: true # 开启Feign对Sentinel的整合
hm:
swagger:
title: 交易服务接口文档
package: com.hmall.trade.controller
db:
database: hm-trade
测试
在trade-service
模块下的com.hmall.trade.service.impl.OrderServiceImpl
类中的createOrder
方法,也就是下单业务方法。将其上的@Transactional
注解改为Seata提供的@GlobalTransactional
@GlobalTransactional
注解就是在标记事务的起点,将来TM就会基于这个方法判断全局事务范围,初始化全局事务。
通过对trade-service模块的改造再分别改造
item-service
、cart-service
这两个微服务。
分布式事务解决方案
Seata支持四种不同的分布式事务解决方案:
-
XA
-
TCC
-
AT
-
SAGA
XA模式
一阶段的工作:
- ①RM注册分支事务到TC
- ②RM执行分支业务sql但不提交
- ③RM报告执行状态到TC
二阶段的工作:
- TC检测各分支事务执行状态
- a.如果都成功,通知所有RM提交事务
- b.如果有失败,通知所有RM回滚事务
- RM接收TC指令,提交或回滚事务
XA模式优缺点:
优点:1.事务的强一致性,满足ACID原则
2.常用数据库都支持,实现简单,并且没有代码侵入
缺点:1.因为一阶段需要锁定数据库资源,等待二阶段结束才释放,性能较差
2.依赖关系型数据库实现事务
实现步骤:
- 在Nacos中的共享shared-seata.yaml配置文件中设置
seata:
data-source-proxy-mode: XA
- 利用@GlobalTransactional标记分布式事务的入口
AT模式
AT模式同样是分阶段提交的事务模型,不过缺弥补了XA模型中资源锁定周期过长的缺陷。
阶段一RM的工作:
- 注册分支事务
- 记录undo-log(数据快照)
- 执行业务sql并提交
- 报告事务状态
阶段二提交时RM的工作:
- 删除undo-log即可
- 阶段二回滚时RM的工作:
- 根据undo-log恢复数据到更新前
实现步骤:
- 再数据库中对应模块添加AT表
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT NOT NULL COMMENT '分支事务id',
`xid` VARCHAR(128) NOT NULL COMMENT '全局事务id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
- 在Nacos中的共享shared-seata.yaml配置文件中设置
seata:
data-source-proxy-mode: AT # 开启数据源代理的AT模式
简述AT模式与XA模式最大的区别是什么?
- XA模式一阶段不提交事务,锁定资源;AT模式一阶段直接提交,不锁定资源
- XA模式依赖数据库机制实现回滚;AT模式利用数据快照实现数据回滚。
- XA模式强一致;AT模式最终一致
☆异步通信
初识MQ
同步调用
同步调用的优势是什么?
- 时效性强等待结果后才返回
同步调用的问题是什么?
- 拓展性差
- 性能下降
- 级联失败问题
异步调用
异步调用通常是基于消息通知的方式,包含三个角色:
- 消息发送者:投递消息的人,就是原来的调用者
- 消息接收者:接收和处理消息的人,就是原来的服务提供者
- 消息代理:管理、暂存、转发消息,你可以把它理解成微信服务器
异步调用的优势是什么?
- 解除耦合,拓展性强
- 无需等待,性能好
- 故障隔离
- 缓存消息,流量削峰填谷
异步调用的问题是什么?
- 不能立即得到调用结果,时效性差
- 不确定下游业务执行是否成功
- 业务安全依赖于Broker的可靠性
MQ技术选型
消息队列(MessageQueue),简称为MQ,也就是异步调用中的Broker
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
RabbitMQ
RabbitMQ是基于Erlang语言开发的开源消息通信中间件
官网地址:RabbitMQ: One broker to queue them all | RabbitMQ
RabbitMQ的整体架构及核心概念:
- virtual-host:虚拟主机,起到数据隔离的作用
- publisher:消息发送者
- consumer:消息的消费者
- queue:队列,存储消息
- exchange:交换机,负责路由消息
消息发送的注意事项有哪些?
- 交换机只能路由消息,无法存储消息
- 交换机只会路由消息给与其绑定的队列,因此队列必须与交换机绑定
SpringAMQP
SpringAmqp的官方地址:Spring AMQP
SpringAMQP提供了三个功能:
-
自动声明队列、交换机及其绑定关系
-
基于注解的监听器模式,异步接收消息
-
封装了RabbitTemplate工具,用于发送消息
SpringAMQP快速入门
新建一个项目,包括三部分:
-
mq-demo:父工程,管理项目依赖
-
publisher:消息的发送者
-
consumer:消息的消费者
步骤一:引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
消息发送
步骤二:配置MQ地址
在publisher
服务的application.yml
中添加配置:
spring:
rabbitmq:
host: 192.168.100.103 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: hmall # 密码
步骤三:在publisher
服务中编写测试类SpringAmqpTest
,并利用RabbitTemplate
实现消息发送
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
消息接收
步骤四:在consumer
服务的application.yml
中添加配置:
spring:
rabbitmq:
host: 192.168.100.103 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: hmall # 密码
步骤五:在consumer
服务的listener
包中新建一个类SpringRabbitListener
@Component
public class SpringRabbitListener {
// 利用RabbitListener来声明要监听的队列信息
// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
// 可以看到方法体中接收的就是消息体的内容
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
步骤六:启动consumer服务,然后在publisher服务中运行测试代码,发送MQ消息。
SpringAMQP如何收发消息?
- ①、引入spring-boot-starter-amqp依赖
- ②、配置rabbitmq服务端信息
- ③、利用RabbitTemplate发送消息
- ④、利用@RabbitListener注解声明要监听的队列,监听消息
Work Queues
Work Queues,即任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。
消息发送
在publisher服务中的SpringAmqpTest类中添加一个测试方法:
/**
* 向队列中不停发送消息,模拟消息堆积。
*/
@Test
public void testWorkQueue() {
// 队列名称
String queueName = "work.queue";
// 消息
for (int i = 0; i <= 50; i++) {
String message = "hello, spring amqp!——" + i;
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
消息接收
在consumer服务的SpringRabbitListener中添加2个新的方法:
@RabbitListener(queues = "work.queue")
public void listenerWorkQueue1(String message) throws InterruptedException {
log.info("消费者1——监听到 work.queue 的消息:【{}】", message);
System.out.println("消费者1——监听到 work.queue 的消息:" + message + "," + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenerWorkQueue2(String message) throws InterruptedException {
System.err.println("消费者2——监听到 work.queue 的消息:" + message + "," + LocalTime.now());
Thread.sleep(200);
}
运行结果可以看到消费者1和消费者2竟然每人消费了25条消息:
-
消费者1很快完成了自己的25条消息
-
消费者2却在缓慢的处理自己的25条消息。
也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。
能者多劳
修改consumer服务的application.yml文件:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。
总结
Work模型的使用:
-
多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
-
通过设置prefetch来控制消费者预取的消息数量
交换机Exchange
-
Publisher:生产者,不再发送消息到队列中,而是发给交换机
-
Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
-
Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
-
Consumer:消费者,与以前一样,订阅队列,没有变化
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失
交换机的类型有四种:
-
Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
-
Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
-
Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
-
Headers:头匹配,基于MQ的消息头匹配,用的较少。
1.Fanout交换机
在广播模式下,消息发送流程是这样的:
-
1) 可以有多个队列
-
2) 每个队列都要绑定到Exchange(交换机)
-
3) 生产者发送的消息,只能发送到交换机
-
4) 交换机把消息发送给绑定过的所有队列
-
5) 订阅队列的消费者都能拿到消息
fanout交换机的演示:
1.创建一个名为hmall.fanout
的交换机,类型是fanout
2.在控制台创建队列fanout.queue1
3.再创建一个队列fanout.queue2
4.然后绑定两个队列到交换机
5.消息发送(在publisher服务的SpringAmqpTest类中添加测试方法,作为生产者)
@Test
public void testFanoutExchange() {
// 交换机名称
String exchangeName = "hmall.fanout";
// 消息
String message = "hello, everyone!";
// 发送信息
rabbitTemplate.convertAndSend(exchangeName, null, message);
}
6.消息接收(在consumer服务的SpringRabbitListener中添加两个方法,作为消费者)
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
7.结果
交换机的作用是什么?
-
接收publisher发送的消息
-
将消息按照规则路由到与之绑定的队列
-
不能缓存消息,路由失败,消息丢失
-
FanoutExchange的会将消息路由到每个绑定的队列
2.Direct交换机
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
-
队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) -
消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 -
Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消
1.创建一个名为hmall.direct
的交换机,类型是direct
2.在控制台创建队列direct.queue1,
然后使用red
和blue
作为key,绑定direct.queue1
到hmall.direct
3.再创建一个队列direct.queue2,
使用red
和yellow
作为key,绑定direct.queue2
到hmall.direct(同上)
4.然后绑定两个队列到交换机
5.消息发送(在publisher服务的SpringAmqpTest类中添加测试方法,作为生产者)
@Test
public void testDirectExchange() {
// 交换机名称
String exchangeName = "hmall.direct";
// 消息
String message1 = "红色:震惊,大学男宿舍后面竟然发现女尸!";
String message2 = "蓝色:通知:女尸是充气的!!!";
// 发送信息
rabbitTemplate.convertAndSend(exchangeName, "red", message1);
rabbitTemplate.convertAndSend(exchangeName, "blue", message2);
}
6.消息接收(在consumer服务的SpringRabbitListener中添加两个方法,作为消费者)
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
7.结果
描述下Direct交换机与Fanout交换机的差异?
-
Fanout交换机将消息路由给每一个与之绑定的队列
-
Direct交换机根据RoutingKey判断路由给哪个队列
-
如果多个队列具有相同的RoutingKey,则与Fanout功能类似
3.Topic交换机
Topic类型的Exchange与Direct都是可以根据Routngkey把消息路由到不同的队列。只不过topic类型的Exchange可以让队列在绑定(Bindingkey)的时候使用通配符。
Bindingkey一般都是有一个或多个单词组成,多个单词之间以.
分割,如,itcam.insert
通配符规则:
- #:匹配一个或多个词(itcam.#能够匹配到itcam.a.b和itcam.a)
- *:匹配一个词(itcam.*只能匹配itcam.a)
topic交换机的演示:
假如此时publisher发送的消息使用的RoutingKey
共有四种:
-
china.news
代表有中国的新闻消息; -
china.weather
代表中国的天气消息; -
japan.news
则代表日本新闻 -
japan.weather
代表日本的天气消息;
1.创建一个名为hmall.topic
的交换机,类型是direct
2.在控制台创建队列topic.queue1,topic.queue1
绑定的是china.#
,凡是以 china.
开头的routing key
都会被匹配到,包括 china.news 、china.weather
3.在控制台创建队列topic.queue2,topic.queue2
绑定的是#.news
,凡是以 .news
结尾的 routing key
都会被匹配。包括 china.news 、japan.news
4.然后绑定两个队列到交换机
5.消息发送(在publisher服务的SpringAmqpTest类中添加测试方法,作为生产者)
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "hmall.topic";
// 消息
String message = "喜报!孙悟空大战哥斯拉,胜!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
6.消息接收(在consumer服务的SpringRabbitListener中添加两个方法,作为消费者)
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg) {
System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg) {
System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}
7.结果
描述下Direct交换机与Topic交换机的差异?
- Topic交换机接收的消息RoutingKey可以是多个单词,以 . 分割
- Topic交换机与队列绑定时的bindingKey可以指定通配符
- #:代表0个或多个词
- *:代表1个词
4.声明队列和交换机
SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:
- Queue:用于声明队列,可以用工厂类QueueBuilder构建
- Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
- Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建
基于API(以Fanout为例)
@Configuration
public class FanoutConfig {
/**
* 声明交换机
* @return Fanout类型交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("hmall.fanout1");
}
/**
* 第1个队列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
// ---------------------------------------------------------------------------------------
/**
* 第2个队列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
基于注解声明(以Direct为例)
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
声明队列、交换机、绑定关系的Bean是什么?
- Queue
- FanoutExchange、DirectExchange、TopicExchange
- Binding
基于@RabbitListener注解声明队列和交换机有哪些常见注解?
- @Queue
- @Exchange
-
消息转换器
-
Spring的消息发送代码接收的消息体是一个Object,在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
-
Spring采用的序列化方式是JDK序列化,而JDK序列化存在下列问题:
-
数据体积过大
-
有安全漏洞
-
可读性差
测试默认转换器
1)创建测试队列
首先,我们在consumer服务中声明一个新的配置类:
@Configuration
public class MessageConfig {
@Bean
public Queue objectQueue() {
return new Queue("object.queue");
}
}
重启consumer服务以后,object.queue队列就会被自动创建出来了
2)发送消息
在publisher模块的SpringAmqpTest中新增一个消息发送的代码,发送一个Map对象:
@Test
public void testSendObject() throws InterruptedException {
// 准备消息
Map<String, Object> msg = new HashMap<>();
msg.put("name", "cammy");
msg.put("age", 21);
// 发送消息
rabbitTemplate.convertAndSend("object.queue", msg);
}
发送消息后查看控制台:
配置JSON转换器
在publisher
和consumer
两个服务中都引入依赖
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>jackson.version</version>
</dependency>
在publisher
和consumer
两个服务的启动类中添加以下Bean
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
// todo:消息转换器中添加的messageId可以便于我们将来做幂等性判断
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
}
☆消息可靠性
当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:
-
消息投递的过程中出现了网络故障
-
消费者接收到消息后突然宕机
-
消费者接收到消息后,因处理不当导致异常
一旦发生上述情况,消息也会丢失。因此,RabbitMQ必须知道消费者的处理状态,一旦消息处理失败才能重新投递消息。
消费者确认机制
为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)
当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
ack:成功处理消息,RabbitMQ从队列中删除该消息
nack:消息处理失败,RabbitMQ需要再次投递消息
reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过try catch
机制捕获,消息处理成功时返回ack,处理失败时返回nack.
由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:
-
none
:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用 -
manual
:手动模式。需要自己在业务代码中调用api,发送ack
或reject
,存在业务入侵,但更灵活 -
auto
:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack
. 当业务出现异常时,根据异常判断返回不同结果:-
如果是业务异常,会自动返回
nack
; -
如果是消息处理或校验异常,自动返回
reject
;
-
通过在publisher服务的application.yaml文件配置下面的字段可以修改SpringAMQP的ACK处理方式:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 不做处理
# - none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
# - manual:手动模式。需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活
# - auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack. 当业务出现异常时,根据异常判断返回不同结果:
# - 如果是业务异常,会自动返回nack;
# - 如果是消息处理或校验异常,自动返回reject;
失败重试机制
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。
修改consumer服务的application.yml文件,添加内容:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
重启consumer服务,重复之前的测试。可以发现:
-
消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次
-
本地重试3次以后,抛出了
AmqpRejectAndDontRequeueException
异常。查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是reject
结论:
-
开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
-
重试达到最大次数后,Spring会返回reject,消息会被丢弃
失败处理策略
在之前的测试中,本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery
接口来定义的,它有3个不同实现:
-
RejectAndDontRequeueRecoverer
:重试耗尽后,直接reject
,丢弃消息。默认就是这种方式 -
ImmediateRequeueMessageRecoverer
:重试耗尽后,返回nack
,消息重新入队 -
RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer
,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfiguration {
@Bean
public DirectExchange errorExchange() {
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue() {
return new Queue("error.queue");
}
@Bean
public Binding errorQueueBinding(DirectExchange errorExchange, Queue errorQueue) {
return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
}
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
业务幂等性
何为幂等性?
幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x))
,例如求绝对值函数。
在程序开发中,则是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:
-
根据id删除数据
-
查询数据
-
新增数据
但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:
-
取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
-
退款业务。重复退款对商家而言会有经济损失。
所以,我们要尽可能避免业务被重复执行。
然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:
-
页面卡顿时频繁刷新导致表单重复提交
-
服务间调用的重试
-
MQ消息的重复投递
我们在用户支付成功后会发送MQ消息到交易服务,修改订单状态为已支付,就可能出现消息重复投递的情况。如果消费者不做判断,很有可能导致消息被消费多次,出现业务故障。
举例:
-
假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。
-
由于某种原因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给交易服务。
-
但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。
-
退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。
因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:
-
唯一消息ID
-
业务状态判断
唯一消息ID
这个思路非常简单:
-
每一条消息都生成一个唯一的id,与消息一起投递给消费者。
-
消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
-
如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
添加唯一ID的方式:SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jjmc.setCreateMessageIds(true);
return jjmc;
}
业务判断
业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。
例如我们当前案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。
相比较而言,唯一消息ID的方案需要改造原有的数据库,所以我更推荐使用业务判断的方案。
兜底方案
MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致。下图中黄色线圈起来的部分就是MQ通知失败后的兜底处理方案,由交易服务自己主动去查询支付状态。
不过需要注意的是,交易服务并不知道用户会在什么时候支付,如果查询的时机不正确(比如查询的时候用户正在支付中),可能查询到的支付状态也不正确。
那么问题来了,我们到底该在什么时间主动查询支付状态呢?
这个时间是无法确定的,因此,通常我们采取的措施就是利用定时任务定期查询,例如每隔20秒就查询一次,并判断支付状态。如果发现订单已经支付,则立刻更新订单状态为已支付即可。
综上,支付服务与交易服务之间的订单状态一致性是如何保证的?
首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性
最后,我们还在交易服务设置了定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。
死信交换机与延迟消息
死信交换机
什么是死信?
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
-
消费者使用
basic.reject
或basic.nack
声明消费失败,并且消息的requeue
参数设置为false -
消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
-
要投递的队列消息堆积满了,无法投递,最早的消息可能称为死信
如果一个队列中的消息已经成为死信,并且这个队列通过dead-letter-exchange
属性指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机就称为死信交换机(Dead Letter Exchange)。
死信交换机有什么作用呢?
-
收集那些因处理失败而被拒绝的消息
-
收集那些因队列满了而被拒绝的消息
-
收集因TTL(有效期)到期的消息
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dlx.queue", durable = "true"),
exchange = @Exchange(name = "dlx.direct", type = ExchangeTypes.DIRECT),
key = {"hi"}
))
public void listDlxQueue(String message) {
log.info("消费者监听到dlx.queue消息:【{}】", message);
}
@Configuration
public class NormalConfig {
@Bean
public DirectExchange normalExchange() {
return new DirectExchange("normal.exchange");
}
@Bean
public Queue normalQueue() {
// return new Queue("normal.queue");
return QueueBuilder
.durable("normal.queue")
.deadLetterExchange("dlx.direct")
.build();
}
@Bean
public Binding normalExchangeBinding(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with("hi");
}
}
☆延迟消息
前面两种作用场景可以看做是把利用死信交换机的机制实现延迟消息当做一种消息处理的最终兜底方案,与消费者重试时讲的RepublishMessageRecoverer
作用类似。
如图,有一组绑定的交换机(ttl.fanout
)和队列(ttl.queue
)。但是ttl.queue
没有消费者监听,而是设定了死信交换机hmall.direct
,而队列direct.queue1
则与死信交换机绑定,RoutingKey是blue:
假如我们现在发送一条消息到ttl.fanout
,RoutingKey为blue,并设置消息的有效期为5000毫秒:
注意:尽管这里的
ttl.fanout
不需要RoutingKey,但是当消息变为死信并投递到死信交换机时,会沿用之前的RoutingKey,这样hmall.direct
才能正确路由消息。
消息肯定会被投递到ttl.queue
之后,由于没有消费者,因此消息无人消费。5秒之后,消息的有效期到期,成为死信:
死信被再次投递到死信交换机hmall.direct
,并沿用之前的RoutingKey,也就是blue
:
由于direct.queue1
与hmall.direct
绑定的key是blue,因此最终消息被成功路由到direct.queue1
,如果此时有消费者与direct.queue1
绑定, 也就能成功消费消息了。但此时已经是5秒钟以后了:也就是说,publisher发送了一条消息,但最终consumer在5秒后才收到消息。我们成功实现了延迟消息。
注意:
RabbitMQ的消息过期是基于追溯方式来实现的,也就是说当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。
当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的TTL时间不一定准确。
DelayExchange插件
声明延迟交换机
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:【{}】", msg);
}
@Configuration
public class DelayExchangeConfig {
@Bean
public DirectExchange delayExchange(){
return ExchangeBuilder
.directExchange("delay.direct") // 指定交换机类型和名称
.delayed() // 设置delay的属性为true
.durable(true) // 持久化
.build();
}
@Bean
public Queue delayedQueue(){
return new Queue("delay.queue");
}
@Bean
public Binding delayQueueBinding(){
return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");
}
}
发送延时消息
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}
超时订单问题
假如订单超时支付时间为15分钟,理论上说我们应该在下单时发送一条延迟消息,延迟时间为30分钟。这样就可以在接收到消息时检验订单支付状态,关闭未支付订单。
即用户下单完成后,发送15分钟延迟消息。在15分钟后接收消息。检查支付状态:
已支付:更新订单状态为已支付
未支付:更新订单状态为关闭订单,恢复商品库存
☆分布式搜索
正向索引
id(索引) | title | price |
---|---|---|
1 | 小米手机 | 3499 |
2 | 华为手机 | 4999 |
3 | 华为小米充电器 | 49 |
4 | 小米手环 | 49 |
... | ... | ... |
其中的id字段已经创建了索引,由于索引底层采用了B+树结构,因此我们根据id搜索的速度会非常快。但是其他字段例如title
,只在叶子节点上存在。
因此要根据title
搜索的时候只能遍历树中的每一个叶子节点,判断title数据是否符合要求。
比如用户的SQL语句为:
select * from tb_goods where title like '%手机%';
上图说明:
-
1)检查到搜索条件为
like '%手机%'
,需要找到title
中包含手机
的数据 -
2)逐条遍历每行数据(每个叶子节点),比如第1次拿到
id
为1的数据 -
3)判断数据中的
title
字段值是否符合条件 -
4)如果符合则放入结果集,不符合则丢弃
-
5)回到步骤1
综上,根据id精确匹配时,可以走索引,查询效率较高。而当搜索条件为模糊匹配时,由于索引无法生效,导致从索引查询退化为全表扫描,效率很差。
因此,正向索引适合于根据索引字段的精确搜索,不适合基于部分词条的模糊匹配。
{
"id": 1,
"title": "小米手机",
"price": 3499
}
{
"id": 2,
"title": "华为手机",
"price": 4999
}
{
"id": 3,
"title": "华为小米充电器",
"price": 49
}
{
"id": 4,
"title": "小米手环",
"price": 299
}
☆倒排索引
倒排索引的概念是基于MySQL这样的正向索引而言的。
倒排索引中有两个非常重要的概念:
-
文档(
Document
):用来搜索的数据,其中的每一条数据就是一个文档。例如一个网页、一个商品信息 -
词条(
Term
):对文档数据或用户搜索数据,利用某种算法分词,得到的具备含义的词语就是词条。例如:我是中国人,就可以分为:我、是、中国人、中国、国人这样的几个词条
创建倒排索引是对正向索引的一种特殊处理和应用,流程如下:
-
将每一个文档的数据利用分词算法根据语义拆分,得到一个个词条
-
创建表,每行数据包括词条、词条所在文档id、位置等信息
-
因为词条唯一性,可以给词条创建正向索引
此时形成的这张以词条为索引的表,就是倒排索引表,两者对比如下:
正向索引
id(索引) | title | price |
---|---|---|
1 | 小米手机 | 3499 |
2 | 华为手机 | 4999 |
3 | 华为小米充电器 | 49 |
4 | 小米手环 | 49 |
... | ... | ... |
倒排索引
词条(索引) | 文档id |
---|---|
小米 | 1,3,4 |
手机 | 1,2 |
华为 | 2,3 |
充电器 | 3 |
手环 | 4 |
倒排索引的搜索流程如下(以搜索"华为手机"为例),如图:
上图流程描述:
1)用户输入条件"华为手机"
进行搜索。
2)对用户输入条件分词,得到词条:华为
、手机
。
3)拿着词条在倒排索引中查找(由于词条有索引,查询效率很高),即可得到包含词条的文档id:1、2、3
。
4)拿着文档id
到正向索引中查找具体文档即可(由于id
也有索引,查询效率也很高)。
注意:虽然要先查询倒排索引,再查询正向索引,但是无论是词条、还是文档id都建立了索引,查询速度非常快!无需全表扫描。
为什么一个叫做正向索引,一个叫做倒排索引呢?
-
正向索引是最传统的,根据id索引的方式。但根据词条查询时,必须先逐条获取每个文档,然后判断文档中是否包含所需要的词条,是根据文档找词条的过程。
-
而倒排索引则相反,是先找到用户要搜索的词条,根据词条得到保护词条的文档的id,然后根据id获取文档。是根据词条找文档的过程。
什么是文档和词条?
- 每一条数据就是一个文档
- 对文档中的内容分词,得到的词语就是词条
什么是正向索引?
基于文旦id创建索引。根据id查询快,但是查询词条时必须先找到文档,而后判断是否包含词条。
什么是倒排索引?
对文档内容分词,对词条创建索引,并记录词条所在文档的id。查询时先根据词条查询到文档id,而后根据文档id查询文档。
两者的优缺点是什么呢?
正向索引:
-
优点:
-
可以给多个字段创建索引
-
根据索引字段搜索、排序速度非常快
-
-
缺点:
-
根据非索引字段,或者索引字段中的部分词条查找时,只能全表扫描。
-
倒排索引:
-
优点:
-
根据词条搜索、模糊搜索时,速度非常快
-
-
缺点:
-
只能给词条创建索引,而不是字段
-
无法根据字段做排序
-
Elasticsearch
本文标签: 商城项目SpringCloud
版权声明:本文标题:基于商城项目了解SpringCloud 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://m.elefans.com/xitong/1728534139a1162458.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论