Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vertx 4.2.6 WebsocketEndpoint InflightRequest Not Reducing #5415

Open
gonit opened this issue Dec 4, 2024 · 16 comments · May be fixed by #5438
Open

Vertx 4.2.6 WebsocketEndpoint InflightRequest Not Reducing #5415

gonit opened this issue Dec 4, 2024 · 16 comments · May be fixed by #5438
Assignees
Labels
Milestone

Comments

@gonit
Copy link

gonit commented Dec 4, 2024

Version

Which version(s) did you encounter this bug ?
4.2.6

Context

I encountered a state in my Vertx Java application where the websocket connection requests are not reaching to the server (confirmed by TCPDump). On digging deeper, i noticed that each Vertx context has a HttpClient. Each HttpClient has 2 ConnectionManager one for Http and other for Websocket. WebsocketConnectionManager contains Endpoint map storing a WebsocketEndpoint object against a key - server. The class WebSocketEndpoint maintains 2 important properties - Waiters queue and inflightRequest. For each connection request, if inflightRequest count is less than maxPool, then the request is appended to Waiter queue otherwise inflightRequest is incremented. The inflightRequest count is only decremented after the connection is made(see second code block). Once number of inflight request crosses for a server crosses maxPool than all further requests are queued. We noticed when the maxPool number of connection request fails, the inflightRequest count is not decremented causing all further request to land onto Waiter queue. If this queue is unbounded its a never ending state. All these details are confirmed with heapdump.

public void requestConnection2(ContextInternal ctx, long timeout, Handler<AsyncResult<HttpClientConnection>> handler) {
        synchronized(this) {
            if (this.inflightConnections >= this.maxPoolSize) {
                this.waiters.add(new Waiter(handler, ctx));
                return;
            }
 
            ++this.inflightConnections;
        }
 
        this.tryConnect(ctx, handler);
    }
 
private void tryConnect(ContextInternal ctx, final Handler<AsyncResult<HttpClientConnection>> handler) {
        EventLoopContext eventLoopContext;
        if (ctx instanceof EventLoopContext) {
            eventLoopContext = (EventLoopContext)ctx;
        } else {
            eventLoopContext = ctx.owner().createEventLoopContext(ctx.nettyEventLoop(), ctx.workerPool(), ctx.classLoader());
        }
 
        class Listener implements Handler<AsyncResult<HttpClientConnection>> {
            Listener() {
            }
 
            private void onEvict() {
                WebSocketEndpoint.this.decRefCount();
                Waiter h;
                synchronized(WebSocketEndpoint.this) {
                    if (--WebSocketEndpoint.this.inflightConnections > WebSocketEndpoint.this.maxPoolSize || WebSocketEndpoint.this.waiters.isEmpty()) {
                        return;
                    }
 
                    h = (Waiter)WebSocketEndpoint.this.waiters.poll();
                }
 
                WebSocketEndpoint.this.tryConnect(h.context, h.handler);
            }
 
            public void handle(AsyncResult<HttpClientConnection> ar) {
                if (ar.succeeded()) {
                    HttpClientConnection c = (HttpClientConnection)ar.result();
                    if (WebSocketEndpoint.this.incRefCount()) {
                        c.evictionHandler((v) -> {
                            this.onEvict();
                        });
                        handler.handle(Future.succeededFuture(c));
                    } else {
                        c.close();
                        handler.handle(Future.failedFuture("Connection closed"));
                    }
                } else {
                    handler.handle(Future.failedFuture(ar.cause()));
                }
 
            }
        }
 
        this.connector.httpConnect(eventLoopContext, new Listener());
    }

Do you have a reproducer?

Sorry, this code is part of my repository for which i have not created a reproducer yet. I would like to take guidance on current problem.

Extra

  • JDK17
@gonit gonit added the bug label Dec 4, 2024
@AnjaliTajane
Copy link

Thank you for the detailed explanation of the issue. I’ve tried to analyze the behavior and reviewed the provided code. Below is my understanding of the problem and proposed steps to resolve it.

The root cause lies in the inflightConnections counter. When a connection request fails, inflightConnections is not decremented. This results in:

Inflated inflightConnections value, preventing new requests from being processed.
Unbounded growth of the waiter's queue, causing a deadlock-like state for the application.

@AnjaliTajane
Copy link

we can use this code

public void handle(AsyncResult ar) {
synchronized (WebSocketEndpoint.this) {
if (!ar.succeeded()) {
// Decrement inflightConnections on failure
WebSocketEndpoint.this.inflightConnections--;
}
}

if (ar.succeeded()) {
    HttpClientConnection c = (HttpClientConnection) ar.result();
    if (WebSocketEndpoint.this.incRefCount()) {
        c.evictionHandler((v) -> this.onEvict());
        handler.handle(Future.succeededFuture(c));
    } else {
        c.close();
        handler.handle(Future.failedFuture("Connection closed"));
    }
} else {
    handler.handle(Future.failedFuture(ar.cause()));
}

}

@gonit
Copy link
Author

gonit commented Dec 5, 2024

@AnjaliTajane - This is Vertx code. Does this mean there is a bug in this class? If so, i wonder why its not a reported issue already as i skimmed at all issues raised on this repo for last 4 years?

@AnjaliTajane
Copy link

Thank you for pointing that out. Yes, this code is part of Vert.x, and from my point of view, it seems like the inflightConnections count might not be decremented in certain failure scenarios, leading to all further connection requests being queued indefinitely.

Regarding why this hasn’t been reported yet, there could be several reasons:

Usage Patterns: This issue might only surface in edge cases or specific usage scenarios that aren’t common among most users of this library.
Unbounded Waiter Queue: If users haven’t monitored or debugged unbounded queue growth in similar circumstances, they might not have identified the root cause.
Custom Implementations: Some users might have workarounds or custom logic to manage connection pools, which bypasses this behavior.
Since the issue is reproducible and backed by a heap dump analysis, it might be worth reporting it as a potential bug to the Vert.x repository. This will allow the maintainers to review and confirm if it’s an overlooked edge case or expected behavior under specific conditions.

Would you like me to draft a clear and concise bug report for submission?

@gonit
Copy link
Author

gonit commented Dec 5, 2024

Sure, that would be helpful. Are you not one of the maintainers?

@AnjaliTajane
Copy link

Thank you for asking! No, I’m not one of the maintainers, but I’ve been actively exploring and working with the Vert.x framework. This issue caught my attention while troubleshooting a specific use case in my project, so I wanted to analyze and understand it better.

I’m happy to assist in drafting a detailed bug report or providing a reproduction scenario to ensure the maintainers have the necessary context to investigate further. Let me know if that sounds good, and I’ll prepare it!

@gonit
Copy link
Author

gonit commented Dec 11, 2024

@AnjaliTajane I'll be happy to collaborate, let me know if i can provide anymore details in filing a better bug report. Do you know how long it takes for maintainers to respond?

@AnjaliTajane
Copy link

No i dont have any idea about when would the maintainers respond @gonit

@vietj
Copy link
Member

vietj commented Dec 11, 2024

can you provide a reproducer for this ?

@AnjaliTajane
Copy link

yes sir sure @vietj

@AnjaliTajane
Copy link

WebSocket Issue reproducer
import io.vertx.core.*;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.WebSocket;
import io.vertx.core.net.SocketAddress;

public class WebSocketIssueReproducer {

public static void main(String[] args) {
    Vertx vertx = Vertx.vertx();

    HttpClient client = vertx.createHttpClient();

    SocketAddress serverAddress = SocketAddress.inetSocketAddress(8080, "localhost");

    int totalConnections = 20;
    int maxPoolSize = 5; // Simulating a max pool size for the client

   simulateWebSocketConnections(vertx, client, serverAddress, totalConnections, maxPoolSize);
}

private static void simulateWebSocketConnections(Vertx vertx, HttpClient client, SocketAddress serverAddress, int totalConnections, int maxPoolSize) {
      int[] inflightConnections = {0};
    int[] queuedConnections = {0};

    for (int i = 0; i < totalConnections; i++) {
        vertx.setTimer(i * 100, id -> {
            synchronized (inflightConnections) {
                if (inflightConnections[0] >= maxPoolSize) {
                    System.out.println("Connection queued: " + (++queuedConnections[0]));
                    return;
                }
                inflightConnections[0]++;
            }

                client.webSocket(serverAddress.port(), serverAddress.host(), "/", result -> {
                synchronized (inflightConnections) {
                    inflightConnections[0]--;
                }

                if (result.succeeded()) {
                    WebSocket webSocket = result.result();
                    System.out.println("Connected: " + webSocket.textHandlerID());

                        vertx.setTimer(500, t -> {
                        webSocket.close();
                        System.out.println("Connection closed: " + webSocket.textHandlerID());
                    });
                } else {
                    System.out.println("Connection failed: " + result.cause().getMessage());
                }
            });
        });
    }

    vertx.setPeriodic(500, id -> {
        synchronized (inflightConnections) {
            System.out.println("Inflight Connections: " + inflightConnections[0]);
            System.out.println("Queued Connections: " + queuedConnections[0]);
        }
    });
}

}

I hope this code helps to solve the problem
@vietj sir please check it

@vietj
Copy link
Member

vietj commented Dec 12, 2024

thanks @AnjaliTajane I'll have a look

@vietj vietj added this to the 4.5.12 milestone Dec 12, 2024
@vietj vietj self-assigned this Dec 12, 2024
@gonit
Copy link
Author

gonit commented Dec 19, 2024

@vietj Thanks for picking this. Does this issue looks to be a bug?

@vietj
Copy link
Member

vietj commented Dec 19, 2024

I haven't had time to investigate yet

@AnjaliTajane
Copy link

Please Investigate sir i hope it helps for better enhancement @vietj

@vietj
Copy link
Member

vietj commented Jan 9, 2025

onto it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants