Skip to content

Commit

Permalink
Merge pull request #178 from cloudwu/dev
Browse files Browse the repository at this point in the history
release 0.7.3
  • Loading branch information
cloudwu committed Oct 13, 2014
2 parents 0aa5274 + d649d0a commit d2aa206
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 5 deletions.
6 changes: 6 additions & 0 deletions lualib/skynet.lua
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ end

local coroutine_pool = {}
local coroutine_yield = coroutine.yield
local coroutine_count = 0

local function co_create(f)
local co = table.remove(coroutine_pool)
Expand All @@ -109,6 +110,11 @@ local function co_create(f)
f(coroutine_yield())
end
end)
coroutine_count = coroutine_count + 1
if coroutine_count > 1024 then
skynet.error("May overload, create 1024 task")
coroutine_count = 0
end
else
coroutine.resume(co, f)
end
Expand Down
1 change: 1 addition & 0 deletions lualib/snax/gateserver.lua
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ function gateserver.start(handler)
local port = assert(conf.port)
maxclient = conf.maxclient or 1024
nodelay = conf.nodelay
skynet.error(string.format("Listen on %s:%d", address, port))
socket = socketdriver.listen(address, port)
socketdriver.start(socket)
if handler.open then
Expand Down
1 change: 0 additions & 1 deletion service-src/service_snlua.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include <lua.h>
#include <lualib.h>
#include <lauxlib.h>
#include <assert.h>

#include <assert.h>
#include <string.h>
Expand Down
12 changes: 12 additions & 0 deletions skynet-src/skynet_harbor.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,17 @@ skynet_harbor_init(int harbor) {

void
skynet_harbor_start(void *ctx) {
// the HARBOR must be reserved to ensure the pointer is valid.
// It will be released at last by calling skynet_harbor_exit
skynet_context_reserve(ctx);
REMOTE = ctx;
}

void
skynet_harbor_exit() {
struct skynet_context * ctx = REMOTE;
REMOTE= NULL;
if (ctx) {
skynet_context_release(ctx);
}
}
1 change: 1 addition & 0 deletions skynet-src/skynet_harbor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ void skynet_harbor_send(struct remote_message *rmsg, uint32_t source, int sessio
int skynet_harbor_message_isremote(uint32_t handle);
void skynet_harbor_init(int harbor);
void skynet_harbor_start(void * ctx);
void skynet_harbor_exit();

#endif
36 changes: 33 additions & 3 deletions skynet-src/skynet_mq.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// 1 means mq is in global mq , or the message is dispatching.

#define MQ_IN_GLOBAL 1
#define MQ_OVERLOAD 1024

struct message_queue {
uint32_t handle;
Expand All @@ -24,6 +25,8 @@ struct message_queue {
int lock;
int release;
int in_global;
int overload;
int overload_threshold;
struct skynet_message *queue;
struct message_queue *next;
};
Expand Down Expand Up @@ -116,6 +119,8 @@ skynet_mq_create(uint32_t handle) {
// If the service init success, skynet_context_new will call skynet_mq_force_push to push it to global queue.
q->in_global = MQ_IN_GLOBAL;
q->release = 0;
q->overload = 0;
q->overload_threshold = MQ_OVERLOAD;
q->queue = skynet_malloc(sizeof(struct skynet_message) * q->cap);
q->next = NULL;

Expand Down Expand Up @@ -150,17 +155,42 @@ skynet_mq_length(struct message_queue *q) {
return tail + cap - head;
}

int
skynet_mq_overload(struct message_queue *q) {
if (q->overload) {
int overload = q->overload;
q->overload = 0;
return overload;
}
return 0;
}

int
skynet_mq_pop(struct message_queue *q, struct skynet_message *message) {
int ret = 1;
LOCK(q)

if (q->head != q->tail) {
*message = q->queue[q->head];
*message = q->queue[q->head++];
ret = 0;
if ( ++ q->head >= q->cap) {
q->head = 0;
int head = q->head;
int tail = q->tail;
int cap = q->cap;

if (head >= cap) {
q->head = head = 0;
}
int length = tail - head;
if (length < 0) {
length += cap;
}
while (length > q->overload_threshold) {
q->overload = length;
q->overload_threshold *= 2;
}
} else {
// reset overload_threshold when queue is empty
q->overload_threshold = MQ_OVERLOAD;
}

if (ret) {
Expand Down
1 change: 1 addition & 0 deletions skynet-src/skynet_mq.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ void skynet_mq_push(struct message_queue *q, struct skynet_message *message);

// return the length of message queue, for debug
int skynet_mq_length(struct message_queue *q);
int skynet_mq_overload(struct message_queue *q);

void skynet_mq_init();

Expand Down
12 changes: 12 additions & 0 deletions skynet-src/skynet_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,14 @@ skynet_context_grab(struct skynet_context *ctx) {
__sync_add_and_fetch(&ctx->ref,1);
}

void
skynet_context_reserve(struct skynet_context *ctx) {
skynet_context_grab(ctx);
// don't count the context reserved, because skynet abort (the worker threads terminate) only when the total context is 0 .
// the reserved context will be release at last.
context_dec();
}

static void
delete_context(struct skynet_context *ctx) {
if (ctx->logfile) {
Expand Down Expand Up @@ -283,6 +291,10 @@ skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue
n = skynet_mq_length(q);
n >>= weight;
}
int overload = skynet_mq_overload(q);
if (overload) {
skynet_error(ctx, "May overload, message queue length = %d", overload);
}

skynet_monitor_trigger(sm, msg.source , handle);

Expand Down
1 change: 1 addition & 0 deletions skynet-src/skynet_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ struct skynet_monitor;

struct skynet_context * skynet_context_new(const char * name, const char * parm);
void skynet_context_grab(struct skynet_context *);
void skynet_context_reserve(struct skynet_context *ctx);
struct skynet_context * skynet_context_release(struct skynet_context *);
uint32_t skynet_context_handle(struct skynet_context *);
int skynet_context_push(uint32_t handle, struct skynet_message *message);
Expand Down
5 changes: 4 additions & 1 deletion skynet-src/skynet_start.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ _worker(void *p) {
for (;;) {
q = skynet_context_message_dispatch(sm, q, weight);
if (q == NULL) {
CHECK_ABORT
if (pthread_mutex_lock(&m->mutex) == 0) {
++ m->sleep;
// "spurious wakeup" is harmless,
Expand All @@ -140,6 +139,7 @@ _worker(void *p) {
}
}
}
CHECK_ABORT
}
return NULL;
}
Expand Down Expand Up @@ -232,6 +232,9 @@ skynet_start(struct skynet_config * config) {
bootstrap(ctx, config->bootstrap);

_start(config->thread);

// harbor_exit may call socket send, so it should exit before socket_free
skynet_harbor_exit();
skynet_socket_free();
if (config->daemon) {
daemon_exit(config->daemon);
Expand Down
44 changes: 44 additions & 0 deletions test/testoverload.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
local skynet = require "skynet"

local mode = ...

if mode == "slave" then

local CMD = {}

function CMD.sum(n)
skynet.error("for loop begin")
local s = 0
for i = 1, n do
s = s + i
end
skynet.error("for loop end")
end

function CMD.blackhole()
end

skynet.start(function()
skynet.dispatch("lua", function(_,_, cmd, ...)
local f = CMD[cmd]
f(...)
end)
end)

else

skynet.start(function()
local slave = skynet.newservice(SERVICE_NAME, "slave")
for step = 1, 20 do
skynet.error("overload test ".. step)
for i = 1, 512 * step do
skynet.send(slave, "lua", "blackhole")
end
skynet.sleep(step)
end
local n = 1000000000
skynet.error(string.format("endless test n=%d", n))
skynet.send(slave, "lua", "sum", n)
end)

end

0 comments on commit d2aa206

Please sign in to comment.