首页 > 编程语言 > Spring Cloud Gateway 记录请求应答数据日志操作
2020
12-02

Spring Cloud Gateway 记录请求应答数据日志操作

我就废话不多说了,大家还是直接看代码吧~

public class GatewayContext {
 public static final String CACHE_GATEWAY_CONTEXT = "cacheGatewayContext";
 /**
  * cache json body
  */
 private String cacheBody;
 /**
  * cache formdata
  */
 private MultiValueMap<String, String> formData;
 /**
  * cache reqeust path
  */
 private String path;
 public String getCacheBody() {
  return cacheBody;
 }
 public void setCacheBody(String cacheBody) {
  this.cacheBody = cacheBody;
 }
 public MultiValueMap<String, String> getFormData() {
  return formData;
 }
 public void setFormData(MultiValueMap<String, String> formData) {
  this.formData = formData;
 }
 public String getPath() {
  return path;
 }
 public void setPath(String path) {
  this.path = path;
 }
}
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.stereotype.Component;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.server.HandlerStrategies;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
import io.netty.buffer.ByteBufAllocator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
// https://segmentfault.com/a/1190000017898354
@Component
public class LogRequestGlobalFilter
  implements GlobalFilter {
 /**
  * default HttpMessageReader
  */
 private static final List<HttpMessageReader<?>> messageReaders =
   HandlerStrategies.withDefaults().messageReaders();
 private Logger log = LoggerFactory.getLogger(LogRequestGlobalFilter.class);
 @Override
 public Mono<Void> filter(
   ServerWebExchange exchange,
   GatewayFilterChain chain) {
  /**
   * save request path and serviceId into gateway context
   */
  ServerHttpRequest request = exchange.getRequest();
  String path = request.getPath().pathWithinApplication().value();
  GatewayContext gatewayContext = new GatewayContext();
  gatewayContext.setPath(path);
  /**
   * save gateway context into exchange
   */
  exchange.getAttributes().put(GatewayContext.CACHE_GATEWAY_CONTEXT,
    gatewayContext);
  HttpHeaders headers = request.getHeaders();
  MediaType contentType = headers.getContentType();
  log.info("start-------------------------------------------------");
  log.info("HttpMethod:{},Url:{}", request.getMethod(),
    request.getURI().getRawPath());
  log.info("Headers token: {}", headers.getFirst("token"));
  if (request.getMethod() == HttpMethod.GET) {
   log.info("end-------------------------------------------------");
  }
  if (request.getMethod() == HttpMethod.POST) {
   Mono<Void> voidMono = null;
   if (MediaType.APPLICATION_JSON.equals(contentType)
     || MediaType.APPLICATION_JSON_UTF8.equals(contentType)) {
    voidMono =
      readBody(exchange, chain, gatewayContext);
   }
   if (MediaType.APPLICATION_FORM_URLENCODED.equals(contentType)) {
    voidMono =
      readFormData(exchange, chain, gatewayContext);
   }
   return voidMono;
  }
  /* log.debug(
   "[GatewayContext]ContentType:{},Gateway context is set with {}",
   contentType, gatewayContext);*/
  return chain.filter(exchange);
 }
 /**
  * ReadFormData
  *
  * @param exchange
  * @param chain
  * @return
  */
 private Mono<Void> readFormData(
   ServerWebExchange exchange,
   GatewayFilterChain chain,
   GatewayContext gatewayContext) {
  final ServerHttpRequest request = exchange.getRequest();
  HttpHeaders headers = request.getHeaders();
  return exchange.getFormData()
    .doOnNext(multiValueMap -> {
     gatewayContext.setFormData(multiValueMap);
     log.info("Post x-www-form-urlencoded:{}",
       multiValueMap);
     log.info(
       "end-------------------------------------------------");
    })
    .then(Mono.defer(() -> {
     Charset charset = headers.getContentType().getCharset();
     charset = charset == null ? StandardCharsets.UTF_8 : charset;
     String charsetName = charset.name();
     MultiValueMap<String, String> formData =
       gatewayContext.getFormData();
     /**
      * formData is empty just return
      */
     if (null == formData || formData.isEmpty()) {
      return chain.filter(exchange);
     }
     StringBuilder formDataBodyBuilder = new StringBuilder();
     String entryKey;
     List<String> entryValue;
     try {
      /**
       * repackage form data
       */
      for (Map.Entry<String, List<String>> entry : formData
        .entrySet()) {
       entryKey = entry.getKey();
       entryValue = entry.getValue();
       if (entryValue.size() > 1) {
        for (String value : entryValue) {
         formDataBodyBuilder.append(entryKey).append("=")
           .append(
             URLEncoder.encode(value, charsetName))
           .append("&");
        }
       } else {
        formDataBodyBuilder
          .append(entryKey).append("=").append(URLEncoder
          .encode(entryValue.get(0), charsetName))
          .append("&");
       }
      }
     } catch (UnsupportedEncodingException e) {
      // ignore URLEncode Exception
     }
     /**
      * substring with the last char '&'
      */
     String formDataBodyString = "";
     if (formDataBodyBuilder.length() > 0) {
      formDataBodyString = formDataBodyBuilder.substring(0,
        formDataBodyBuilder.length() - 1);
     }
     /**
      * get data bytes
      */
     byte[] bodyBytes = formDataBodyString.getBytes(charset);
     int contentLength = bodyBytes.length;
     ServerHttpRequestDecorator decorator =
       new ServerHttpRequestDecorator(
         request) {
        /**
         * change content-length
         *
         * @return
         */
        @Override
        public HttpHeaders getHeaders() {
         HttpHeaders httpHeaders = new HttpHeaders();
         httpHeaders.putAll(super.getHeaders());
         if (contentLength > 0) {
          httpHeaders.setContentLength(contentLength);
         } else {
          httpHeaders.set(HttpHeaders.TRANSFER_ENCODING,
            "chunked");
         }
         return httpHeaders;
        }
        /**
         * read bytes to Flux<Databuffer>
         *
         * @return
         */
        @Override
        public Flux<DataBuffer> getBody() {
         return DataBufferUtils
           .read(new ByteArrayResource(bodyBytes),
             new NettyDataBufferFactory(
               ByteBufAllocator.DEFAULT),
             contentLength);
        }
       };
     ServerWebExchange mutateExchange =
       exchange.mutate().request(decorator).build();
    /* log.info("[GatewayContext]Rewrite Form Data :{}",
      formDataBodyString);*/
     return chain.filter(mutateExchange);
    }));
 }
 /**
  * ReadJsonBody
  *
  * @param exchange
  * @param chain
  * @return
  */
 private Mono<Void> readBody(
   ServerWebExchange exchange,
   GatewayFilterChain chain,
   GatewayContext gatewayContext) {
  /**
   * join the body
   */
  return DataBufferUtils.join(exchange.getRequest().getBody())
    .flatMap(dataBuffer -> {
     /*
      * read the body Flux<DataBuffer>, and release the buffer
      * //TODO when SpringCloudGateway Version Release To G.SR2,this can be update with the new version's feature
      * see PR https://github.com/spring-cloud/spring-cloud-gateway/pull/1095
      */
     byte[] bytes = new byte[dataBuffer.readableByteCount()];
     dataBuffer.read(bytes);
     DataBufferUtils.release(dataBuffer);
     Flux<DataBuffer> cachedFlux = Flux.defer(() -> {
      DataBuffer buffer =
        exchange.getResponse().bufferFactory().wrap(bytes);
      DataBufferUtils.retain(buffer);
      return Mono.just(buffer);
     });
     /**
      * repackage ServerHttpRequest
      */
     ServerHttpRequest mutatedRequest =
       new ServerHttpRequestDecorator(exchange.getRequest()) {
        @Override
        public Flux<DataBuffer> getBody() {
         return cachedFlux;
        }
       };
     /**
      * mutate exchage with new ServerHttpRequest
      */
     ServerWebExchange mutatedExchange =
       exchange.mutate().request(mutatedRequest).build();
     /**
      * read body string with default messageReaders
      */
     return ServerRequest.create(mutatedExchange, messageReaders)
       .bodyToMono(String.class)
       .doOnNext(objectValue -> {
        log.info("PostBody:{}", objectValue);
        log.info(
          "end-------------------------------------------------");
        gatewayContext.setCacheBody(objectValue);
      /* log.debug("[GatewayContext]Read JsonBody:{}",
        objectValue);*/
       }).then(chain.filter(mutatedExchange));
    });
 }
}
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicReference;
@Component
@Slf4j
public class LogResponseGlobalFilter implements GlobalFilter, Ordered {
 private static final String REQUEST_PREFIX = "Request Info [ ";
 private static final String REQUEST_TAIL = " ]";
 private static final String RESPONSE_PREFIX = "Response Info [ ";
 private static final String RESPONSE_TAIL = " ]";
 private StringBuilder normalMsg = new StringBuilder();
 @Override
 public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
  ServerHttpRequest request = exchange.getRequest();
  ServerHttpResponse response = exchange.getResponse();
  DataBufferFactory bufferFactory = response.bufferFactory();
  normalMsg.append(RESPONSE_PREFIX);
  ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(response) {
   @Override
   public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
    if (body instanceof Flux) {
     Flux<? extends DataBuffer> fluxBody = (Flux<? extends DataBuffer>) body;
     return super.writeWith(fluxBody.map(dataBuffer -> {
      // probably should reuse buffers
      byte[] content = new byte[dataBuffer.readableByteCount()];
      dataBuffer.read(content);
      String responseResult = new String(content, Charset.forName("UTF-8"));
      normalMsg.append("status=").append(this.getStatusCode());
      normalMsg.append(";header=").append(this.getHeaders());
      normalMsg.append(";responseResult=").append(responseResult);
      normalMsg.append(RESPONSE_TAIL);
      log.info(normalMsg.toString());
      return bufferFactory.wrap(content);
     }));
    }
    return super.writeWith(body); // if body is not a flux. never got there.
   }
  };
  return chain.filter(exchange.mutate().response(decoratedResponse).build());
 }
 @Override
 public int getOrder() {
  return -2;
 }
}

补充知识:Spring Cloud Gateway 2.x 打印 Log

场景

在服务网关层面,需要打印出用户每次的请求body和其他的参数,gateway使用的是Reactor响应式编程,和Zuul网关获取流的写法还有些不同,

不过基本的思路是一样的,都是在filter中读取body流,然后缓存回去,因为body流,框架默认只允许读取一次。

思路

1. 添加一个filter做一次请求的拦截

GatewayConfig.java

添加一个配置类,配置一个高优先级的filter,并且注入一个PayloadServerWebExchangeDecorator 对request和response做包装的类。

package com.demo.gateway2x.config;
import com.demo.gateway2x.decorator.PayloadServerWebExchangeDecorator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.web.server.WebFilter;
@Configuration
public class GatewayConfig {
 @Bean
 @Order(Ordered.HIGHEST_PRECEDENCE) //过滤器顺序
 public WebFilter webFilter() {
  return (exchange, chain) -> chain.filter(new PayloadServerWebExchangeDecorator(exchange));
 }
}

PayloadServerWebExchangeDecorator.java

这个类中,我们实现了框架的ServerWebExchangeDecorator类,同时注入了自定义的两个类,PartnerServerHttpRequestDecorator 和 PartnerServerHttpResponseDecorator ,

这两个类用于后面对请求与响应的拦截。

package com.demo.gateway2x.decorator;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.ServerWebExchangeDecorator;
public class PayloadServerWebExchangeDecorator extends ServerWebExchangeDecorator {
 private PartnerServerHttpRequestDecorator requestDecorator;
 private PartnerServerHttpResponseDecorator responseDecorator;
 public PayloadServerWebExchangeDecorator(ServerWebExchange delegate) {
  super(delegate);
  requestDecorator = new PartnerServerHttpRequestDecorator(delegate.getRequest());
  responseDecorator = new PartnerServerHttpResponseDecorator(delegate.getResponse());
 }
 @Override
 public ServerHttpRequest getRequest() {
  return requestDecorator;
 }
 @Override
 public ServerHttpResponse getResponse() {
  return responseDecorator;
 }
}

2. 在请求进入时,对request做一次拦截

PartnerServerHttpRequestDecorator.java

这个类实现了 ServerHttpRequestDecorator , 并在构造函数中,使用响应式编程,调用了打印log的方法,注意关注 Mono<DataBuffer> mono = DataBufferUtils.join(flux); ,

这里将Flux合并成了一个Mono,因为如果不这么做,body内容过多,将会被分段打印,这里是一个恒重要的点,

在打印RequestParamsHandle.chain打印过日志后,我们又返回了一个dataBuffer,用作向下传递,否则dataBuffer被读取过一次后就不能继续使用了。

package com.demo.gateway2x.decorator;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import static reactor.core.scheduler.Schedulers.single;
@Slf4j
public class PartnerServerHttpRequestDecorator extends ServerHttpRequestDecorator {
 private Flux<DataBuffer> body;
 public PartnerServerHttpRequestDecorator(ServerHttpRequest delegate) {
  super(delegate);
  Flux<DataBuffer> flux = super.getBody();
  if (ParamsUtils.CHAIN_MEDIA_TYPE.contains(delegate.getHeaders().getContentType())) {
   Mono<DataBuffer> mono = DataBufferUtils.join(flux);
   body = mono.publishOn(single()).map(dataBuffer -> RequestParamsHandle.chain(delegate, log, dataBuffer)).flux();
  } else {
   body = flux;
  }
 }
 @Override
 public Flux<DataBuffer> getBody() {
  return body;
 }
}

RequestParamsHandle.java

这个类主要用来读取dataBuffer并做了日志打印处理,也可以做一些其他的例如参数校验等使用。

package com.demo.gateway2x.decorator;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.util.StringUtils;
import java.util.HashMap;
import java.util.Map;
public class RequestParamsHandle {
 public static <T extends DataBuffer> T chain(ServerHttpRequest delegate, Logger log, T buffer) {
  ParamsUtils.BodyDecorator bodyDecorator = ParamsUtils.buildBodyDecorator(buffer);
  // 参数校验 和 参数打印
  log.info("Payload: {}", JSON.toJSONString(validParams(getParams(delegate, bodyDecorator.getBody()))));
  return (T) bodyDecorator.getDataBuffer();
 }
 public static Map<String,Object> getParams(ServerHttpRequest delegate, String body) {
  // 整理参数
  Map<String,Object> params = new HashMap<>();
  if (delegate.getQueryParams() != null) {
   params.putAll(delegate.getQueryParams());
  }
  if (!StringUtils.isEmpty(body)) {
   params.putAll(JSON.parseObject(body));
  }
  return params;
 }
 public static Map<String,Object> validParams(Map<String,Object> params) {
  // todo 参数校验
  return params;
 }
}

3. 在结果返回时,对response做一次拦截

PartnerServerHttpResponseDecorator.java

这个类和上面的request的异曲同工,拦截响应流,并做记录入处理。

package com.demo.gateway2x.decorator;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import static reactor.core.scheduler.Schedulers.single;
@Slf4j
public class PartnerServerHttpResponseDecorator extends ServerHttpResponseDecorator {
 PartnerServerHttpResponseDecorator(ServerHttpResponse delegate) {
  super(delegate);
 }
 @Override
 public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
  return super.writeAndFlushWith(body);
 }
 @Override
 public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
  final MediaType contentType = super.getHeaders().getContentType();
  if (ParamsUtils.CHAIN_MEDIA_TYPE.contains(contentType)) {
   if (body instanceof Mono) {
    final Mono<DataBuffer> monoBody = (Mono<DataBuffer>) body;
    return super.writeWith(monoBody.publishOn(single()).map(dataBuffer -> ResponseParamsHandle.chain(log, dataBuffer)));
   } else if (body instanceof Flux) {
    Mono<DataBuffer> mono = DataBufferUtils.join(body);
    final Flux<DataBuffer> monoBody = mono.publishOn(single()).map(dataBuffer -> ResponseParamsHandle.chain(log, dataBuffer)).flux();
    return super.writeWith(monoBody);
   }
  }
  return super.writeWith(body);
 }
}

ResponseParamsHandle.java

响应流的日志打印

package com.demo.gateway2x.decorator;
import org.slf4j.Logger;
import org.springframework.core.io.buffer.DataBuffer;
public class ResponseParamsHandle {
 public static <T extends DataBuffer> T chain(Logger log, T buffer) {
  ParamsUtils.BodyDecorator bodyDecorator = ParamsUtils.buildBodyDecorator(buffer);
  // 参数校验 和 参数打印
  log.info("Payload: {}", bodyDecorator.getBody());
  return (T) bodyDecorator.getDataBuffer();
 }
}

下面是实际操作,发送一次http请求:

控制台log结果:

github源码地址:https://github.com/qiaomengnan16/gateway-2x-log-demo

总结

gateway和zuul打印参数的方式思路是一致的,只是gateway采用的是reactor,写法上与zuul的直接读取流有些不同,这里需要知道的是Flux需要转换为Mono这个地方,如果不转换容易分多批打印。

以上这篇Spring Cloud Gateway 记录请求应答数据日志操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持自学编程网。

编程技巧