1、Spring Cloud Gateway 是由 WebFlux + Netty + Reactor 实现的响应式的 API 网关。它不能在传统的 servlet 容器中工作,也不能构建成 war 包;
2、Spring Cloud Gateway 旨在为微服务架构提供一种简单且有效的 API 路由的管理方式,并基于 Filter 的方式提供网关的基本功能,例如说安全认证、监控、限流等等。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
spring:
application:
name: api-gateway
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848 # 将gateway注册到nacos
service: ${spring.application.name}
gateway:
discovery:
locator:
enabled: true # 让gateway从nacos中获取服务信息
routes:
- id: product_route
uri: lb://demo-provider
order: 1
predicates:
- Path=/demo-provider/**
filters:
- StripPrefix=1
GatewayAutoConfiguration
自动配置类
DispatcherHandler
DispatcherHandler#handle
,转发器。
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) {
return this.createNotFoundError();
} else {
return CorsUtils.isPreFlightRequest(exchange.getRequest()) ? this.handlePreFlight(exchange) : Flux.fromIterable(this.handlerMappings).concatMap((mapping) -> {
return mapping.getHandler(exchange);
}).next().switchIfEmpty(this.createNotFoundError()).flatMap((handler) -> {
return this.invokeHandler(exchange, handler);
}).flatMap((result) -> {
return this.handleResult(exchange, result);
});
}
}
RoutePredicateHandlerMapping
通过断言找到路由,存储route信息到GATEWAY_ROUTE_ATTR
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
if (this.managementPortType == RoutePredicateHandlerMapping.ManagementPortType.DIFFERENT && this.managementPort != null && exchange.getRequest().getURI().getPort() == this.managementPort) {
return Mono.empty();
} else {
exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_HANDLER_MAPPER_ATTR, this.getSimpleName());
return this.lookupRoute(exchange).flatMap((r) -> {
exchange.getAttributes().remove(ServerWebExchangeUtils.GATEWAY_PREDICATE_ROUTE_ATTR);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Mapping [" + this.getExchangeDesc(exchange) + "] to " + r);
}
exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR, r);
return Mono.just(this.webHandler);
}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
exchange.getAttributes().remove(ServerWebExchangeUtils.GATEWAY_PREDICATE_ROUTE_ATTR);
if (this.logger.isTraceEnabled()) {
this.logger.trace("No RouteDefinition found for [" + this.getExchangeDesc(exchange) + "]");
}
})));
}
}
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
return this.routeLocator.getRoutes().concatMap((route) -> {
return Mono.just(route).filterWhen((r) -> {
exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
return (Publisher)r.getPredicate().apply(exchange);
}).doOnError((e) -> {
this.logger.error("Error applying predicate for route: " + route.getId(), e);
}).onErrorResume((e) -> {
return Mono.empty();
});
}).next().map((route) -> {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Route matched: " + route.getId());
}
this.validateRoute(route, exchange);
return route;
});
}
FilteringWebHandler
FilteringWebHandler#handle
,通过适配器将GlobalFilter生成GatewayFilterAdapter,生成Filter责任链处理请求
public Mono<Void> handle(ServerWebExchange exchange) {
Route route = (Route)exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
List<GatewayFilter> gatewayFilters = route.getFilters();
List<GatewayFilter> combined = new ArrayList(this.globalFilters);
combined.addAll(gatewayFilters);
AnnotationAwareOrderComparator.sort(combined);
if (logger.isDebugEnabled()) {
logger.debug("Sorted gatewayFilterFactories: " + combined);
}
return (new FilteringWebHandler.DefaultGatewayFilterChain(combined)).filter(exchange);
}
ReactiveLoadBalancerClientFilter
ReactiveLoadBalancerClientFilter#filter
,进行负载均衡。
private Mono<Response<ServiceInstance>> choose(Request<RequestDataContext> lbRequest, String serviceId, Set<LoadBalancerLifecycle> supportedLifecycleProcessors) {
ReactorLoadBalancer<ServiceInstance> loadBalancer = (ReactorLoadBalancer)this.clientFactory.getInstance(serviceId, ReactorServiceInstanceLoadBalancer.class);
if (loadBalancer == null) {
throw new NotFoundException("No loadbalancer available for " + serviceId);
} else {
supportedLifecycleProcessors.forEach((lifecycle) -> {
lifecycle.onStart(lbRequest);
});
return loadBalancer.choose(lbRequest);
}
}
NettyRoutingFilter
NettyRoutingFilter#filter
,通过netty进行请求转发。
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI requestUrl = (URI)exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
String scheme = requestUrl.getScheme();
if (!ServerWebExchangeUtils.isAlreadyRouted(exchange) && ("http".equalsIgnoreCase(scheme) || "https".equalsIgnoreCase(scheme))) {
ServerWebExchangeUtils.setAlreadyRouted(exchange);
ServerHttpRequest request = exchange.getRequest();
HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
String url = requestUrl.toASCIIString();
HttpHeaders filtered = HttpHeadersFilter.filterRequest(this.getHeadersFilters(), exchange);
DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
filtered.forEach(httpHeaders::set);
boolean preserveHost = (Boolean)exchange.getAttributeOrDefault(ServerWebExchangeUtils.PRESERVE_HOST_HEADER_ATTRIBUTE, false);
Route route = (Route)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
Flux<HttpClientResponse> responseFlux = ((RequestSender)this.getHttpClient(route, exchange).headers((headers) -> {
headers.add(httpHeaders);
headers.remove("Host");
if (preserveHost) {
String host = request.getHeaders().getFirst("Host");
headers.add("Host", host);
}
}).request(method).uri(url)).send((req, nettyOutbound) -> {
if (log.isTraceEnabled()) {
nettyOutbound.withConnection((connection) -> {
log.trace("outbound route: " + connection.channel().id().asShortText() + ", inbound: " + exchange.getLogPrefix());
});
}
return nettyOutbound.send(request.getBody().map(this::getByteBuf));
}).responseConnection((res, connection) -> {
exchange.getAttributes().put(ServerWebExchangeUtils.CLIENT_RESPONSE_ATTR, res);
exchange.getAttributes().put(ServerWebExchangeUtils.CLIENT_RESPONSE_CONN_ATTR, connection);
ServerHttpResponse response = exchange.getResponse();
HttpHeaders headers = new HttpHeaders();
res.responseHeaders().forEach((entry) -> {
headers.add((String)entry.getKey(), (String)entry.getValue());
});
String contentTypeValue = headers.getFirst("Content-Type");
if (StringUtils.hasLength(contentTypeValue)) {
exchange.getAttributes().put("original_response_content_type", contentTypeValue);
}
this.setResponseStatus(res, response);
HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(this.getHeadersFilters(), headers, exchange, Type.RESPONSE);
if (!filteredResponseHeaders.containsKey("Transfer-Encoding") && filteredResponseHeaders.containsKey("Content-Length")) {
response.getHeaders().remove("Transfer-Encoding");
}
exchange.getAttributes().put(ServerWebExchangeUtils.CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet());
response.getHeaders().putAll(filteredResponseHeaders);
return Mono.just(res);
});
Duration responseTimeout = this.getResponseTimeout(route);
if (responseTimeout != null) {
responseFlux = responseFlux.timeout(responseTimeout, Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout))).onErrorMap(TimeoutException.class, (th) -> {
return new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th);
});
}
return responseFlux.then(chain.filter(exchange));
} else {
return chain.filter(exchange);
}
}
NettyWriteResponseFilter
NettyWriteResponseFilter
用来写响应
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return chain.filter(exchange).then(Mono.defer(() -> {
Connection connection = (Connection)exchange.getAttribute(ServerWebExchangeUtils.CLIENT_RESPONSE_CONN_ATTR);
if (connection == null) {
return Mono.empty();
} else {
log.trace("NettyWriteResponseFilter start");
ServerHttpResponse response = exchange.getResponse();
NettyDataBufferFactory factory = (NettyDataBufferFactory)response.bufferFactory();
ByteBufFlux var10000 = connection.inbound().receive().retain();
factory.getClass();
Flux<NettyDataBuffer> body = var10000.map(factory::wrap);
MediaType contentType = null;
try {
contentType = response.getHeaders().getContentType();
} catch (Exception var8) {
log.trace("invalid media type", var8);
}
return this.isStreamingMediaType(contentType) ? response.writeAndFlushWith(body.map(Flux::just)) : response.writeWith(body);
}
}));
}
因篇幅问题不能全部显示,请点此查看更多更全内容