Skip to content

Commit

Permalink
First draft of Helidon 4 threading example
Browse files Browse the repository at this point in the history
  • Loading branch information
barchetta committed Mar 26, 2024
1 parent f268194 commit 35ab0dc
Show file tree
Hide file tree
Showing 9 changed files with 379 additions and 0 deletions.
23 changes: 23 additions & 0 deletions examples/webserver/threads/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# helidon-examples-webserver-threads


TODO XXXXXXXXXXXXX

## Build and run


With JDK21
```bash
mvn package
java -jar target/helidon-examples-webserver-threads.jar
```

## Exercise the application

Basic:
```
curl -X GET http://localhost:8080/thread/compute/5
Hello World!
```


68 changes: 68 additions & 0 deletions examples/webserver/threads/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.helidon.applications</groupId>
<artifactId>helidon-se</artifactId>
<version>4.0.0-SNAPSHOT</version>
<relativePath>../../../applications/se/pom.xml</relativePath>
</parent>
<groupId>io.helidon.examples.webserver</groupId>
<artifactId>helidon-examples-webserver-threads</artifactId>
<version>4.0.0-SNAPSHOT</version>

<properties>
<mainClass>io.helidon.examples.webserver.threads.Main</mainClass>
</properties>

<dependencies>
<dependency>
<groupId>io.helidon.webserver</groupId>
<artifactId>helidon-webserver</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.webclient</groupId>
<artifactId>helidon-webclient</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.config</groupId>
<artifactId>helidon-config-yaml</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.logging</groupId>
<artifactId>helidon-logging-jul</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.helidon.webserver.testing.junit5</groupId>
<artifactId>helidon-webserver-testing-junit5</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-libs</id>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@

package io.helidon.examples.webserver.threads;


import io.helidon.logging.common.LogConfig;
import io.helidon.config.Config;
import io.helidon.webserver.WebServer;
import io.helidon.webserver.http.HttpRouting;




/**
* The application main class.
*/
public class Main {


/**
* Cannot be instantiated.
*/
private Main() {
}


/**
* Application main entry point.
* @param args command line arguments.
*/
public static void main(String[] args) {

// load logging configuration
LogConfig.configureRuntime();

// initialize global config from default configuration
Config config = Config.create();
Config.global(config);


WebServer server = WebServer.builder()
.config(config.get("server"))
.routing(Main::routing)
.build()
.start();


System.out.println("WEB server is up! http://localhost:" + server.port() + "/thread");

}


/**
* Updates HTTP Routing.
*/
static void routing(HttpRouting.Builder routing) {
routing
.register("/thread", new ThreadService());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package io.helidon.examples.webserver.threads;

import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.lang.System.Logger.Level;
import java.util.concurrent.RejectedExecutionException;

import io.helidon.common.configurable.ThreadPoolSupplier;
import io.helidon.config.Config;
import io.helidon.http.Status;
import io.helidon.webclient.api.HttpClientResponse;
import io.helidon.webclient.api.WebClient;
import io.helidon.webserver.http.HttpRules;
import io.helidon.webserver.http.HttpService;
import io.helidon.webserver.http.ServerRequest;
import io.helidon.webserver.http.ServerResponse;

class ThreadService implements HttpService {

private static final System.Logger LOGGER = System.getLogger(ThreadService.class.getName());
private static final Random rand = new Random(System.currentTimeMillis());

// ThreadPool of platform threads.
private static ExecutorService platformExecutorService;
// Executor of virtual threads.
private static ExecutorService virtualExecutorService;

WebClient client = WebClient.builder()
.baseUri("http://localhost:8080/thread")
.build();

/**
* The config value for the key {@code greeting}.
*/

ThreadService() {
this(Config.global().get("app"));
}

ThreadService(Config appConfig) {
/*
* We create two executor services. One is a thread pool of platform threads.
* The second is a virtual thread executor service.
* See `application.yaml` for configuration of each of these.
*/
ThreadPoolSupplier platformThreadPoolSupplier = ThreadPoolSupplier.builder()
.config(appConfig.get("application-platform-executor"))
.build();
platformExecutorService = platformThreadPoolSupplier.get();

ThreadPoolSupplier virtualThreadPoolSupplier = ThreadPoolSupplier.builder()
.config(appConfig.get("application-virtual-executor"))
.build();
virtualExecutorService = virtualThreadPoolSupplier.get();
}

/**
* A service registers itself by updating the routing rules.
*
* @param rules the routing rules.
*/
@Override
public void routing(HttpRules rules) {
rules
.get("/compute", this::computeHandler)
.get("/compute/{iterations}", this::computeHandler)
.get("/fanout", this::fanOutHandler)
.get("/fanout/{count}", this::fanOutHandler)
.get("/sleep", this::sleepHandler)
.get("/sleep/{seconds}", this::sleepHandler);
}

/**
* Perform a CPU intensive operation.
* The optional path parameter controls the number of iterations of the computation. The more
* iterations the longer it will take.
*
* @param request server request
* @param response server response
*/
private void computeHandler(ServerRequest request, ServerResponse response) {
String iterations = request.path().pathParameters().first("iterations").orElse("1");
try {
// We execute the computation on a platform thread. This prevents pining of the virtual
// thread, plus provides us the ability to limit the number of concurrent computation requests
// we handle by limiting the thread pool work queue length (as defined in application.yaml)
Future<Double> future = platformExecutorService.submit(() -> compute(Integer.parseInt(iterations)));
response.send(future.get().toString());
} catch (RejectedExecutionException e) {
// Work queue is full! We reject the request
LOGGER.log(Level.WARNING, e);
response.status(Status.SERVICE_UNAVAILABLE_503).send("Server busy");
} catch (ExecutionException | InterruptedException e) {
LOGGER.log(Level.ERROR, e);
response.status(Status.INTERNAL_SERVER_ERROR_500).send();
}
}

/**
* Sleep for a specified number of secons.
* The optional path parameter controls the number of seconds to sleep. Defaults to 1
*
* @param request server request
* @param response server response
*/
private void sleepHandler(ServerRequest request, ServerResponse response) {
String seconds = request.path().pathParameters().first("seconds").orElse("1");
response.send(Integer.toString(sleep(Integer.parseInt(seconds))));
}

/**
* Fan out a number of remote requests in parallel.
* The optional path parameter controls the number of parallel requests to make.
*
* @param request server request
* @param response server response
*/
private void fanOutHandler(ServerRequest request, ServerResponse response) {
int count = Integer.parseInt(request.path().pathParameters().first("count").orElse("1"));
LOGGER.log(Level.INFO, "Fanning out " + count + " parallel requests");
// We simulate multiple client requests running in parallel by calling our sleep endpoint.
try {
// For this we use our virtual thread based executor. We submit the work and save the Futures
var futures = new ArrayList<Future<String>>();
for (int i = 0; i < count; i++) {
futures.add(virtualExecutorService.submit(() -> callRemote(rand.nextInt(5))));
}

// After work has been submitted we loop through the future and block getting the results.
// We aggregate the results in a list of Strings
var responses = new ArrayList<String>();
for (var future : futures) {
try {
responses.add(future.get());
} catch (InterruptedException e) {
responses.add(e.getMessage());
}
}

// All parallel calls are complete!
response.send(String.join(":", responses));
} catch (ExecutionException e) {
LOGGER.log(Level.ERROR, e);
response.status(Status.INTERNAL_SERVER_ERROR_500).send();
}
}

/**
* Simulate a remote client call be calling the sleep endpoint on ourself.
*
* @param seconds number of seconds the endpoint should sleep.
* @return
*/
private String callRemote(int seconds) {
LOGGER.log(Level.INFO, Thread.currentThread() + ": Calling remote sleep for " + seconds + "s");
try (HttpClientResponse response = client.get("/sleep/" + seconds).request()) {
if (response.status().equals(Status.OK_200)) {
return response.as(String.class);
} else {
return (response.status().toString());
}
}
}

/**
* Sleep current thread
* @param seconds number of seconds to sleep
* @return
*/
private int sleep(int seconds) {
try {
Thread.sleep(seconds * 1_000L);
} catch (InterruptedException e) {
LOGGER.log(Level.WARNING, e);
}
return seconds;
}

/**
* Perform a CPU intensive computation
* @param iterations: number of times to perform computation
*/
private double compute(int iterations) {
LOGGER.log(Level.INFO, Thread.currentThread() + ": Computing with " + iterations + " iterations");
double d = 123456789.123456789 * rand.nextInt(100);
for (int i=0; i < iterations; i++) {
for (int n=0; n < 1_000_000; n++) {
for (int j = 0; j < 5; j++) {
d = Math.tan(d);
d = Math.atan(d);
}
}
}
return d;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package io.helidon.examples.webserver.threads;
14 changes: 14 additions & 0 deletions examples/webserver/threads/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
server:
port: 8080
host: 0.0.0.0

app:
greeting: "Hello"
application-platform-executor:
thread-name-prefix: "application-platform-executor-"
core-pool-size: 1
max-pool-size: 2
queue-capacity: 10
application-virtual-executor:
thread-name-prefix: "application-virtual-executor-"
virtual-threads: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
handlers=io.helidon.logging.jul.HelidonConsoleHandler
java.util.logging.SimpleFormatter.format=%1$tY.%1$tm.%1$td %1$tH:%1$tM:%1$tS.%1$tL %5$s%6$s%n
# Global logging level. Can be overridden by specific loggers
.level=INFO

io.helidon.common.configurable.ThreadPool.level=ALL
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.helidon.examples.webserver.threads;

import io.helidon.webserver.testing.junit5.RoutingTest;

@RoutingTest
class MainTest {
MainTest() {
}
}
Empty file.

0 comments on commit 35ab0dc

Please sign in to comment.