Skip to content

Commit

Permalink
Fix comments
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Jan 27, 2025
1 parent 4bc4573 commit 7bc5171
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 12 deletions.
1 change: 1 addition & 0 deletions be/src/http/action/transaction_stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ class ResourceHandler {
if (_ctx->unref()) {
delete _ctx;
}
_ctx = nullptr;
}

private:
Expand Down
17 changes: 17 additions & 0 deletions be/src/runtime/stream_load/stream_load_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,4 +308,21 @@ void StreamLoadContext::release(StreamLoadContext* context) {
}
}

bool StreamLoadContext::tsl_reach_timeout() {
return timeout_second > 0 && (UnixSeconds() - begin_txn_ts) > timeout_second;
}

bool StreamLoadContext::tsl_reach_idle_timeout(int32_t check_interval) {
if (idle_timeout_sec <= 0) {
return false;
}
// if there is data to consume, the load is till active
std::shared_ptr<MessageBodySink> sink = body_sink;
if (sink && !sink->exhausted()) {
last_active_ts = UnixSeconds();
return false;
}
return (UnixSeconds() - last_active_ts) > idle_timeout_sec + check_interval;
}

} // namespace starrocks
4 changes: 4 additions & 0 deletions be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ class StreamLoadContext {

static void release(StreamLoadContext* context);

// for transaction stream load
bool tsl_reach_timeout();
bool tsl_reach_idle_timeout(int32_t check_interval);

public:
// 1) Before the stream load receiving thread exits, Fragment may have been destructed.
// At this time, mem_tracker may have been destructed,
Expand Down
16 changes: 4 additions & 12 deletions be/src/runtime/stream_load/transaction_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,21 +396,13 @@ void TransactionMgr::_clean_stream_context() {
for (const auto& id : ids) {
auto ctx = _exec_env->stream_context_mgr()->get(id);
if (ctx != nullptr) {
int64_t now = UnixSeconds();
Status status;
if ((now - ctx->begin_txn_ts) > ctx->timeout_second && ctx->timeout_second > 0) {
if (ctx->tsl_reach_timeout()) {
status = Status::Aborted(
fmt::format("transaction is aborted by timeout {} seconds.", ctx->timeout_second));
} else if ((now - ctx->last_active_ts) > ctx->idle_timeout_sec + interval && ctx->idle_timeout_sec > 0) {
status = Status::Aborted(fmt::format("transaction is aborted by idle timeout {} seconds.",
ctx->idle_timeout_sec + interval));
} else {
std::shared_ptr<MessageBodySink> body_sink = ctx->body_sink;
if (body_sink != nullptr) {
if (!body_sink->exhausted()) {
ctx->last_active_ts = UnixSeconds();
}
}
} else if (ctx->tsl_reach_idle_timeout(interval)) {
status = Status::Aborted(
fmt::format("transaction is aborted by idle timeout {} seconds.", ctx->idle_timeout_sec));
}
if (!status.ok()) {
if (ctx->lock.try_lock()) {
Expand Down

0 comments on commit 7bc5171

Please sign in to comment.