Skip to content

Commit

Permalink
Spring Boot 2.0 WebFlux编程
Browse files Browse the repository at this point in the history
  • Loading branch information
wuyouzhuguli committed Apr 4, 2019
1 parent 951a3d2 commit 1a9df12
Show file tree
Hide file tree
Showing 16 changed files with 423 additions and 0 deletions.
44 changes: 44 additions & 0 deletions 57.Spring-Boot-WebFlux/async-servlet/src/AsyncServlet.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import javax.servlet.AsyncContext;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/**
* @author MrBird
*/
@WebServlet(urlPatterns = "/async", asyncSupported = true)
public class AsyncServlet extends HttpServlet {
private static final long serialVersionUID = 393375716683413545L;

private Logger log = Logger.getLogger(AsyncServlet.class.getName());

@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) {
long start = System.currentTimeMillis();
AsyncContext asyncContext = request.startAsync();

CompletableFuture.runAsync(() -> execute(asyncContext, asyncContext.getRequest(), asyncContext.getResponse()));
log.info("总耗时:" + (System.currentTimeMillis() - start) + "ms");
}

private void execute(AsyncContext asyncContext, ServletRequest request, ServletResponse response) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
response.getWriter().append("hello");
} catch (IOException e) {
e.printStackTrace();
}
asyncContext.complete();
}
}
39 changes: 39 additions & 0 deletions 57.Spring-Boot-WebFlux/async-servlet/src/SyncServlet.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/**
* @author MrBird
*/
@WebServlet(urlPatterns = "/sync")
public class SyncServlet extends HttpServlet {

private static final long serialVersionUID = 7583536145022393360L;

private Logger log = Logger.getLogger(SyncServlet.class.getName());

@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) {

long start = System.currentTimeMillis();
this.execute(request, response);
log.info("总耗时:" + (System.currentTimeMillis() - start) + "ms");
}

private void execute(HttpServletRequest request, HttpServletResponse response) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
response.getWriter().append("hello");
} catch (IOException e) {
e.printStackTrace();
}
}
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
6 changes: 6 additions & 0 deletions 57.Spring-Boot-WebFlux/async-servlet/web/WEB-INF/web.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd"
version="4.0">
</web-app>
10 changes: 10 additions & 0 deletions 57.Spring-Boot-WebFlux/async-servlet/web/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>hello</title>
</head>
<body>

</body>
</html>
41 changes: 41 additions & 0 deletions 57.Spring-Boot-WebFlux/webflux/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>webflux</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>webflux</name>
<description>Demo project for Spring Boot</description>

<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package com.example.webflux;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Random;

/**
* @author MrBird
*/
public class FluxTest {

public static void main(String[] args) throws InterruptedException {
Flux.just("Hello", "World").subscribe(System.out::println);
Flux.fromArray(new Integer[]{1, 2, 3}).subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
Flux.range(1, 4).subscribe(System.out::println);
// Flux.interval(Duration.of(1, ChronoUnit.SECONDS)).subscribe(System.out::println);

Flux.generate(sink -> {
sink.next("Hello");
sink.complete();
}).subscribe(System.out::println);


final Random random = new Random();
Flux.generate(ArrayList::new, (list, sink) -> {
int value = random.nextInt(100);
list.add(value);
sink.next(value);
if (list.size() == 10) {
sink.complete();
}
return list;
}).subscribe(System.out::println);

Flux.create(sink -> {
for (int i = 0; i < 10; i++) {
sink.next(i);
}
sink.complete();
}).subscribe(System.out::println);

Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);

Flux.range(1, 20).take(10).subscribe(System.out::println);
Flux.range(1, 20).takeLast(10).subscribe(System.out::println);
Flux.range(1, 20).takeWhile(i -> i < 10).subscribe(System.out::println);
Flux.range(1, 20).takeUntil(i -> i == 10).subscribe(System.out::println);

Flux.range(1, 10).reduce((x, y) -> x + y).subscribe(System.out::println);
Flux.range(1, 10).reduceWith(() -> 10, (x, y) -> x + y).subscribe(System.out::println);

Flux.merge(
Flux.interval(Duration.of(500, ChronoUnit.MILLIS)).take(2),
Flux.interval(Duration.of(500, ChronoUnit.MILLIS)).take(2)
).toStream().forEach(System.out::println);

Flux.range(1, 100).buffer(20).subscribe(System.out::println);

Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);
Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);

Flux.just("a", "b", "c", "d")
.zipWith(Flux.just("e", "f", "g", "h", "i"))
.subscribe(System.out::println);

Flux.just("a", "b", "c", "d")
.zipWith(Flux.just("e", "f", "g", "h", "i"), (s1, s2) -> String.format("%s-%s", s1, s2))
.subscribe(System.out::println);


Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.subscribe(System.out::println, System.err::println);

Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.onErrorReturn(0)
.subscribe(System.out::println);


Flux.just(1, 2)
.concatWith(Mono.error(new IllegalArgumentException()))
.onErrorResume(e -> {
if (e instanceof IllegalStateException) {
return Mono.just(0);
} else if (e instanceof IllegalArgumentException) {
return Mono.just(-1);
}
return Mono.empty();
}).subscribe(System.out::println);

Thread.currentThread().join(20000);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.example.webflux;

import java.util.concurrent.TimeUnit;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class MonoFluxTest {

public static void main(String[] args) {
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
private Subscription subscription;

@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}

@Override
public void onNext(Integer item) {
System.out.println("接受到数据: " + item);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
this.subscription.cancel();
}

@Override
public void onComplete() {
System.out.println("处理完了!");
}

};

String[] strs = {"1", "2", "3"};
Flux.fromArray(strs).map(Integer::parseInt).subscribe(subscriber);
Mono.fromSupplier(() -> 1).map(s -> s + 1).subscribe(subscriber);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.example.webflux;

import reactor.core.publisher.Mono;

import java.util.Optional;

/**
* @author MrBird
*/
public class MonoTest {

public static void main(String[] args) {
Mono.just("are").subscribe(System.out::println);
Mono.empty().subscribe(System.out::println);
Mono.fromSupplier(() -> "you").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("ok")).subscribe(System.out::println);

Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.example.webflux;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

/**
* @author MrBird
*/
@RestController
public class TestController {
// Mono 表示 0-1 个元素,Flux 0-N 个元素


private Logger logger = LoggerFactory.getLogger(this.getClass());

@GetMapping("sync")
public String sync() {
logger.info("sync method start");
String result = this.execute();
logger.info("sync method end");
return result;
}

@GetMapping("async/mono")
public Mono<String> asyncMono() {
logger.info("async method start");
Mono<String> result = Mono.fromSupplier(this::execute);
logger.info("async method end");
return result;
}

// SSE(Server Sent Event)
// https://developer.mozilla.org/zh-CN/docs/Server-sent_events/Using_server-sent_events
// http://www.ruanyifeng.com/blog/2017/05/server-sent_events.html
@GetMapping(value = "async/flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> asyncFlux() {
logger.info("async method start");
Flux<String> result = Flux.fromStream(IntStream.range(1, 5).mapToObj(i -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "int value:" + i;
}));
logger.info("async method end");
return result;
}

private String execute() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}
}
Loading

0 comments on commit 1a9df12

Please sign in to comment.