译:响应式Spring Cloud初探

865 阅读10分钟
原文链接: www.spring4all.com

响应式Spring Cloud初探

原文链接:The Road to Reactive Spring Cloud

作者: img JOSH LONG

译者: helloworldtang

Spring Cloud Finchley GA release 充满了好用的新特性,它是响应式微服务之旅的一个重要里程碑。我不可能把所有的东西都列出来,在此我向你推荐Spencer Gibb的新发布的公告。相反,在这篇文章中,我想把重点放在我们对响应式Spring Cloud的道路上。

我们发布Spring Framework 5 in September 2017。这是第一个引入新的响应式编程支持的版本,以帮助构建更健壮、可伸缩的服务。它建立在Pivotal Reactor项目之上,我们的响应式流兼容的响应式运行时。Spring Framework 5还包含了大量的新特性,我也不打算把它们都列出来,相反,选择关注于响应式特性的支持。响应式编程是什么?为什么它重要吗?当你在构建网络服务时,这很重要。

简而言之,Spring的服务集成的基本原理已经被刷新,以完全接受响应式编程。那么,什么是“响应式编程”呢?响应式编程是一种识别,一旦您开始在网络上传输更多的数据,通过API调用来填满您的IO缓冲区,您就会在任何给定的请求中花费更多的时间来执行IO。

IO本身并不是问题 。IO传统上是块——线程必须等待InputStream来产生新的字节。(通常在while循环read() ’ingbyte的缓冲区)。当一个线程等待时,它不能被重新用于其他任何东西。线程是昂贵的!

想想传统服务器是如何工作的,不管是用Java实现,还是通过相同的方法使用线程的其它平台 。如果您的web服务器的线程池中有100个线程,当有101个请求到达,那么最后一个额外的请求将在另一个完成处理它们的请求之前不会被处理。如果其他人能够完成(从而释放他们所独占的线程),在第101次请求到来之前,太棒了!可能不需要进行响应式编程。如果您能够在新请求到来前更快地释放线程,并且在这些线程中花费的时间主要是由于输入/输出,那么就不需要进行响应式编程了。

当您迁移到一个微服务、大数据和长期会话(例如在websockets、服务器端发送事件和任何其他长期存在的服务器端状态)的环境中,您将开始通过网络上处理更多的数据。

线程与IO之间的耦合是不必要的。你的操作系统已经支持了‘后台异步处理’的 IO,在你应该参与的时候通知你,这已经有几十年了。的确, Java 1.4 (from the early 2000s) supports NIO (Channels) 这就给了我们这个异步的IO机制。

在这套异步IO的机制中,有专门的组件管理IO,并在需要的时候调用你的代码。如果有任何延迟,该线程可以自由移动并处理其他请求。这个线程不是阻塞的。与您的代码从InputStream中提取字节不同,字节将被异步地推送到它。通过这种方式,就可以有效地翻转了与数据源的交互方式。

许多项目,比如来自@NetflixOSS的RxJava、来自@Pivotal的 @ProjectReactor、来自Eclipse的@vertx_project 以及来自@lightbend的@akkateam,都在寻求提供一种支持这种新的异步现实的编程模型。有一个共同的基础,在这个共同的基础上诞生出了Reactive Streams 规范,这些项目支持所有的支持。

Reactive Streams 规范支持将项目发布给订阅者的 Publisher 类型。当 onNextIT)方法被调用时,Subscribers将进行消费。当用户订阅时,它会得到一个Subscription,它可以用来表示它可以处理多少记录。最后一个bit,能够准确地指定订阅者准备处理多少记录,这是流量控制Publisher不能压倒Subscriber。这可以促进稳定。在响应式编程的上下文中,流控制被称为 反向压力.。

还有最后一个接口,Processor,它只是一座桥;它同时实现了PublisherSubscriber。 Project Reactor 支持两类 Publisher的约定:Flux,它适用于0-n的场景,以及Mono,适用于单条记录,或者没有记录的场景。

这是对IO发生方式的一种根本性的重新思考,因此它需要在上面的每一层进行集成;在数据访问层,安全层,在Boot和微服务层中。

Spring Framework5还包括一个崭新的响应式web运行时(甚至支持Netty项目),即Spring WebFlux。它甚至包括一些新的函数编程风格的响应式endpoints.。我在2016年做了一个关于这个方面的 Spring Tips视频!

Spring WebFlux建立在reactive streams规范之上,因此可以与任何其他支持库进行互操作。这里有一个 Spring Tips 视频,我演示了使用 Lightbend’s Akka Streams (和 Scala)的响应式Spring Webflux。

首先,新的Spring WebFlux组件模型是响应式的和异步的。它支持异步的情况,比如websockets和server-sent事件,就像传统处理同步情况一样。你最终会得到一种东西。想要在几纳秒内发送一个包含10条记录的简短的JSON字符串?那就使用 Publisher!如果想生成server-sent的事件,这里有一个关于server-sent事件的 Spring Tips视频。

这是一个关于响应式websockets的 Spring Tips视频。

新版本还包括一个新的响应式HTTP客户端,称为WebClient。我也做了一个关于这个的Spring Tips视频!

Spring Data Kay 支持对具有异步IO支持的数据访问技术的templates 和 repositories进行响应式的数据访问。下面是一个使用响应式Spring Data MongoDB的例子。

interface ReservationRepository extends ReactiveMongoRepository<Reservation, String> {

        Flux<Reservation> findByEmail(String email);
}

@Document
@AllArgsConstructor
@NoArgsConstructor
@Data
class Reservation {
        @Id
        private String id;
        private String email;
}

Spring Security 5 支持对传统用例的响应式身份验证和授权(如下所示)和OAuth:

  @Bean
  MapReactiveUserDetailsService authentication() {
    // don't do this! this is a hardcoded username and password and it
    // would literally pain Spring Security lead @rob_winch to see this!
    //
    return new MapReactiveUserDetailsService(
      User.withDefaultPasswordEncoder().username("user").password("pw").roles("USER").build());
  }

  @Bean
  SecurityWebFilterChain authorization(ServerHttpSecurity security) {
  //@formatter:off
  return security
  .csrf().disable()
  .httpBasic()
  .and()
  .authorizeExchange()
    .pathMatchers("/proxy").authenticated()
    .anyExchange().permitAll()
  .and()
  .build();
  //@formatter:on
  }

Spring Boot 2对这些都提供了支持,这样就可以构建REST endpoints,使用Actuator,管理安全性,以及其他一切“just works”,不管你选择使用Spring WebFlux还是Spring MVC。

从代码库的变化角度来看,这也意味着很多不稳定的地方,对于Spring Cloud团队来说,这就是为什么这个版本如此重要的原因。

新发布的新版本在现有的SpringCloud组件上无缝地集成了响应式编程:服务注册、发现、安全、CDC(T)和测试、消息传递、micro-proxy支持、断路器等等。让我们看一些例子。

您可以使用新的响应式 WebClient,并使用Spring Cloud的DiscoveryClient支持的任何服务注册中心(Netflix、Hashicorp、Apache Zookeeper、Cloud Foundry等)来解析主机。

@Bean
WebClient client(LoadBalancerExchangeFilterFunction eff) {
  return WebClient.builder().filter(eff).build();
}

然后您可以使用这个响应式的、基于服务注册中心的 WebClient。在下面的例子中,reservation-service是在服务注册中心注册的服务,而不是实际的主机名。

Publisher<String> emails = client
    .get()
    .uri("http://reservation-service/reservations")
    .retrieve()
    .bodyToFlux(Reservation.class)
    .map(Reservation::getEmail);

您也可以使用Spring Cloud Stream的对响应式特性的支持,分别在Kafka或RabbitMQ中使用来自主题或队列的消息。

@Configuration  
@EnableBinding(Sink.class)
public class MyStreamListener {

  @StreamListener
  public void incoming (@Input(Sink.INPUT) Flux<String> names ) {
    names
     .map ( x-> new Reservation( null, x))
     .flatMap ( this.reservationRepository::save )
     .subscribe( x -> log.info( "saved " + x.toString()));
   }
 }

您可以使用Hystrix断路器和响应式Publisher来保护和隔离潜在的错误服务调用。 在下面的例子中,我使用了可能失败的响应WebClient来进行HTTP调用。如果它失败了,我希望能够提供一个备用的 Publisher 来返回。调用时服务不能正常响应时,就调用预置的服务进行影响。这几乎和没有成功调用一样重要。我的代码没有抛出异常。它优雅地进行了降级。那个断路器好像有智能的一样,并且它是有状态的。如果有足够多的连续尝试失败,断路器最终会直接切换到备用Publisher。如果下游服务应该重新上线(如果您使用Cloud Foundry的话),那么它最终将重新注册到注册中心,注册中心将发送一个心跳事件,而心跳事件将被用来使注册中心在客户端本地的服务列表失效。客户端就会看到注册中心中有新的实例,它会重置断路器,关闭,并允许下一个调用通过,希望它能成功。

Publisher<String> emails = client
  .get()
  .uri("http://reservation-service/reservations")
  .retrieve()
  .bodyToFlux(Reservation.class)
  .map(Reservation::getEmail);

Publisher<String> fallback = HystrixCommands
  .from( emails )
  .eager()
  .commandName("emails")
  .fallback ( Flux.just ("EEK!") )
  .build();

虽然能够在一个响应式环境中使用这些现有的技术是很好的,但是最令人兴奋的是响应式编程开启了什么新的可能性!两个新项目,Spring Cloud Gateway和Spring Cloud Function,都从响应式编程中受益。

让我们简单了解下这两个项目。

Spring Cloud Gateway 是我们崭新的响应式API网关。它建立在Spring的响应式特性之上。毕竟,它的工作是将客户的请求传递给下游服务。对于响应式编程来说,这是一个完美的用例(和需求)。我也做了一个关于它的 Spring Tips视频。

这里有一个使用Spring Cloud Gateway的例子,它将一个请求从:9999/proxy代理到一个服务(通过服务注册中心解析和负载平衡)和速率限制。(NB:这个配置可以常驻在Spring Cloud Config Serve的(可刷新的)配置中,也可以在任何资源中创建一个Flux<Route>。)

这个例子限制每个经过身份验证的用户每秒100个请求。您不需要Spring Security来使用网关,但是根据配置它是隐含的。

@Bean
RouteLocator gateway (RouteLocatorBuilder rlb, RedisRateLimiter rrl) {
  return rlb
    .routes()
    .route( spec ->
      spec
       .path("/rl")
       .flters( fs -> fs
         .requestRateLimiter( c -> c.setRateLimiter( this.redisRateLimiter() ))
         .setPath("/reservations")
       )
       .uri("lb://reservation-service/")
    )
    .build();
}


@Bean // 100 reqs per second, burstable to 150
RedisRateLimiter redisRateLimiter (){
  return new RedisRateLimiter(100, 150);
}

Spring Cloud Function 是我们新的 function-as-a-service的一个项目。它将普通的-功能转换为不同function-as-a-service运行时所需的类型。它可以用于AWS Lambda,微软Azure,当然还有我们自己的Project Riff。Project Riff是一个Apache 2许可的、基于Kubernetes的支持多语言的function-as-a-service运行时。我也做了一个关于Spring Cloud Function和Project Riff的Spring Tips视频。

使用它并不容易!您需要创建java.util.function.Function<I,O> 的实例。在这种情况下,IO都可以是Publisher<X>

package com.example.uppercase;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import reactor.core.publisher.Flux;

import java.util.function.Function;

@SpringBootApplication
public class UppercaseApplication {

        @Bean
        Function<Flux<String>, Flux<String>> uppercase() {
                return incoming -> incoming.map(String::toUpperCase);
        }

        public static void main(String[] args) {
                SpringApplication.run(UppercaseApplication.class, args);
        }
}

正如您现在所希望的那样,响应式编程已经很好地在Spring中实现了!Spring Cloud是最后一个需要支持它的主要项目,用于对响应式编程进行全面的讨论。但这并不是故事的结局。事实上,我们才刚刚开始!请继续关注。:-)

在即将到来的SpringOne Platform活动中,我们将讨论响应式编程和基于响应式Spring Cloud微服务,以及其他许多事情。 Join us!