diff --git a/include/qt_spawncache.h b/include/qt_spawncache.h index 6ff151112..25e7d5318 100644 --- a/include/qt_spawncache.h +++ b/include/qt_spawncache.h @@ -10,8 +10,8 @@ typedef struct _qt_threadqueue_node qt_threadqueue_node_t; typedef struct _qt_threadqueue_private { qt_threadqueue_node_t *head, *tail; qt_threadqueue_node_t *on_deck; - long qlength; - long qlength_stealable; + _Atomic long qlength; + _Atomic long qlength_stealable; struct _qt_threadqueue_private *next; } qt_threadqueue_private_t; diff --git a/src/threadqueues/sherwood_threadqueues.c b/src/threadqueues/sherwood_threadqueues.c index 5a1b5cccd..48a44335b 100644 --- a/src/threadqueues/sherwood_threadqueues.c +++ b/src/threadqueues/sherwood_threadqueues.c @@ -34,11 +34,11 @@ struct _qt_threadqueue_node { struct _qt_threadqueue { qt_threadqueue_node_t *head; qt_threadqueue_node_t *tail; - long qlength; - long qlength_stealable; /* number of stealable tasks on queue - stop steal - * attempts that will fail because tasks cannot be - * moved - 4/1/11 AKP - */ + _Atomic long qlength; + _Atomic long qlength_stealable; /* number of stealable tasks on queue - stop + * steal attempts that will fail because + * tasks cannot be moved - 4/1/11 AKP + */ QTHREAD_TRYLOCK_TYPE qlock; } /* qt_threadqueue_t */; @@ -144,12 +144,12 @@ ssize_t INTERNAL qt_threadqueue_advisory_queuelen(qt_threadqueue_t *q) { /*{{{*/ #if (QTHREAD_ASSEMBLY_ARCH == QTHREAD_AMD64) || \ (QTHREAD_ASSEMBLY_ARCH == QTHREAD_POWERPC64) /* only works if a basic load is atomic */ - return q->qlength; + return atomic_load_explicit(&q->qlength, memory_order_relaxed); #else ssize_t tmp; QTHREAD_TRYLOCK_LOCK(&q->qlock); - tmp = q->qlength; + tmp = atomic_load_explicit(&q->qlength, memory_order_relaxed); QTHREAD_TRYLOCK_UNLOCK(&q->qlock); return tmp; #endif /* if (QTHREAD_ASSEMBLY_ARCH == QTHREAD_AMD64) || \ @@ -170,8 +170,8 @@ qt_threadqueue_t INTERNAL *qt_threadqueue_new(void) { /*{{{*/ if (q != NULL) { q->head = NULL; q->tail = NULL; - q->qlength = 0; - q->qlength_stealable = 0; + atomic_store_explicit(&q->qlength, 0, memory_order_relaxed); + atomic_store_explicit(&q->qlength_stealable, 0, memory_order_relaxed); QTHREAD_TRYLOCK_INIT(q->qlock); } @@ -202,8 +202,8 @@ void INTERNAL qt_threadqueue_free(qt_threadqueue_t *q) { /*{{{*/ } assert(q->head == NULL); assert(q->tail == NULL); - q->qlength = 0; - q->qlength_stealable = 0; + atomic_store_explicit(&q->qlength, 0, memory_order_relaxed); + atomic_store_explicit(&q->qlength_stealable, 0, memory_order_relaxed); QTHREAD_TRYLOCK_UNLOCK(&q->qlock); } assert(q->head == q->tail); @@ -241,8 +241,11 @@ void INTERNAL qt_threadqueue_enqueue(qt_threadqueue_t *restrict q, } else { node->prev->next = node; } - q->qlength++; - q->qlength_stealable += node->stealable; + // q->qlength++; + atomic_fetch_add_explicit(&q->qlength, 1, memory_order_relaxed); + // q->qlength_stealable += node->stealable; + atomic_fetch_add_explicit( + &q->qlength_stealable, node->stealable, memory_order_relaxed); QTHREAD_TRYLOCK_UNLOCK(&q->qlock); } /*}}}*/ @@ -269,8 +272,12 @@ void INTERNAL qt_threadqueue_enqueue_yielded(qt_threadqueue_t *restrict q, } else { node->next->prev = node; } - q->qlength++; - if (node->stealable) { q->qlength_stealable++; } + // q->qlength++; + atomic_fetch_add_explicit(&q->qlength, 1, memory_order_relaxed); + if (node->stealable) { + // q->qlength_stealable++; + atomic_fetch_add_explicit(&q->qlength_stealable, 1, memory_order_relaxed); + } QTHREAD_TRYLOCK_UNLOCK(&q->qlock); } /*}}}*/ @@ -323,19 +330,20 @@ int INTERNAL qt_keep_adding_agg_task( qt_threadqueue_node_t **head_addr, *head_l = NULL; qt_threadqueue_node_t **tail_addr, *tail_l = NULL; - long *length, *stealable, len_l = 0, ste_l = 0; + _Atomic long *length, *stealable, len_l = 0, ste_l = 0; if (lock) { qt_threadqueue_t *public_q = (qt_threadqueue_t *)q; head_addr = &(public_q->head); tail_addr = &(public_q->tail); length = &(public_q->qlength); - assert(*length > 0); + assert(atomic_load_explicit(length, memory_order_relaxed) > 0); stealable = &(public_q->qlength_stealable); int stop_adding = 0; if (count > 0) { int local_max_t = - (count + *length) / qthread_readstate(TOTAL_WORKERS) / DIV_FACTOR; + (count + atomic_load_explicit(length, memory_order_relaxed)) / + qthread_readstate(TOTAL_WORKERS) / DIV_FACTOR; local_max_t = (local_max_t > MAX_ABS_AGG ? MAX_ABS_AGG : local_max_t); if (local_max_t < max_t) { max_t = local_max_t; } if (max_t <= count) { stop_adding = 1; } @@ -346,25 +354,31 @@ int INTERNAL qt_keep_adding_agg_task( tail_l = *tail_addr; if (tail_l != NULL) { int max_allowed = max_t - count; - while ((len_l < max_allowed) && - QTHREAD_TASK_IS_AGGREGABLE(atomic_load_explicit( - &tail_l->value->flags, memory_order_relaxed)) && - tail_l != head_l) { - len_l++; - if (tail_l->stealable) { ste_l++; } + while ( + (atomic_load_explicit(&len_l, memory_order_relaxed) < max_allowed) && + QTHREAD_TASK_IS_AGGREGABLE(atomic_load_explicit( + &tail_l->value->flags, memory_order_relaxed)) && + tail_l != head_l) { + atomic_fetch_add_explicit(&len_l, 1, memory_order_relaxed); + if (tail_l->stealable) { + atomic_fetch_add_explicit(&ste_l, 1, memory_order_relaxed); + } tail_l = tail_l->prev; } } if (tail_l == *tail_addr) { // didn't keep anything - assert(len_l == 0 && ste_l == 0); + assert(atomic_load_explicit(&len_l, memory_order_relaxed) == 0 && + atomic_load_explicit(&ste_l, memory_order_relaxed) == 0); head_l = NULL; tail_l = NULL; } else if (head_l == tail_l) { // kept eveything head_l = *head_addr; tail_l = *tail_addr; *head_addr = *tail_addr = NULL; // empty q - *length = *stealable = 0; + //*length = *stealable = 0; + atomic_store_explicit(length, 0, memory_order_relaxed); + atomic_store_explicit(stealable, 0, memory_order_relaxed); } else { // tail_l is the new q tail, = first task which should remain // there interchange tail_l and real tail using head_l as tmp head_l = tail_l; @@ -374,9 +388,17 @@ int INTERNAL qt_keep_adding_agg_task( head_l = (*tail_addr)->next; head_l->prev = NULL; (*tail_addr)->next = NULL; - *length -= len_l; - assert(*length > 0); - *stealable -= ste_l; + //*length -= len_l; + long val = atomic_fetch_sub_explicit( + length, + atomic_load_explicit(&len_l, memory_order_relaxed), + memory_order_relaxed); + assert(val > 0); + atomic_fetch_sub_explicit( + stealable, + atomic_load_explicit(&ste_l, memory_order_relaxed), + memory_order_relaxed); + //*stealable -= ste_l; } } QTHREAD_TRYLOCK_UNLOCK(&public_q->qlock); @@ -421,9 +443,12 @@ int INTERNAL qt_keep_adding_agg_task( } else { (*tail_addr)->next = NULL; } - *length = *length - 1; - assert(*length >= 0); - if (node->stealable) { *stealable = *stealable - 1; } + long val = atomic_fetch_sub_explicit(length, 1, memory_order_relaxed); + assert(val >= 0); + if (node->stealable) { + atomic_fetch_sub_explicit( + stealable, 1, memory_order_relaxed); /**stealable = *stealable - 1;*/ + } // update the other info list_of_fret[count] = t->ret; count++; @@ -456,10 +481,20 @@ int INTERNAL qt_keep_adding_agg_task( head_l->prev = (*tail_addr); (*tail_addr) = tail_l; } - assert(public_q->qlength >= 0); - public_q->qlength += len_l; - assert(public_q->qlength > 0); - public_q->qlength_stealable += ste_l; + long val = atomic_fetch_add_explicit( + &public_q->qlength, + atomic_load_explicit(&len_l, memory_order_relaxed), + memory_order_relaxed); + assert(val >= 0); + val = atomic_fetch_add_explicit( + &public_q->qlength, + atomic_load_explicit(&ste_l, memory_order_relaxed), + memory_order_relaxed); + assert(val >= 0); + // assert(public_q->qlength >= 0); + // public_q->qlength += len_l; + // assert(public_q->qlength > 0); + // public_q->qlength_stealable += ste_l; } } return count; @@ -527,8 +562,10 @@ qthread_t INTERNAL *qt_scheduler_get_thread(qt_threadqueue_t *q, #ifdef QTHREAD_TASK_AGGREGATION if (QTHREAD_TASK_IS_AGGREGABLE( atomic_load_explicit(&node->value->flags, memory_order_relaxed)) && - ((max_t = (qc->qlength + 1 + q->qlength) / - qthread_readstate(ACTIVE_WORKERS) / DIV_FACTOR) > 1)) { + ((max_t = + (atomic_load_explicit(&qc->qlength, memory_order_relaxed) + 1 + + atomic_load_explicit(&q->qlength, memory_order_relaxed)) / + qthread_readstate(ACTIVE_WORKERS) / DIV_FACTOR) > 1)) { max_t = (max_t > MAX_ABS_AGG ? MAX_ABS_AGG : max_t); assert(node->value->thread_state != QTHREAD_STATE_TERM_SHEP); qt_add_first_agg_task(t, &curr_cost, node); @@ -536,7 +573,7 @@ qthread_t INTERNAL *qt_scheduler_get_thread(qt_threadqueue_t *q, int *count_addr = &(((int *)t->preconds)[0]); int lcount = qt_keep_adding_agg_task(t, max_t, &curr_cost, qc, 0); - if ((qc->qlength == 0) && + if ((atomic_load_explicit(&qc->qlength, memory_order_relaxed) == 0) && ((curr_cost < qlib->max_c) && (*count_addr < max_t))) { // cache empty and can still add, get more from q QTHREAD_TRYLOCK_LOCK(&q->qlock); @@ -552,7 +589,7 @@ qthread_t INTERNAL *qt_scheduler_get_thread(qt_threadqueue_t *q, } #endif /* ifdef QTHREAD_TASK_AGGREGATION */ - if (qc->qlength > 0) { + if (atomic_load_explicit(&qc->qlength, memory_order_relaxed) > 0) { qt_threadqueue_node_t *first = qc->head; qt_threadqueue_node_t *last = qc->tail; assert(last->next == NULL); @@ -600,13 +637,21 @@ qthread_t INTERNAL *qt_scheduler_get_thread(qt_threadqueue_t *q, } else { first->prev->next = first; } - q->qlength += qc->qlength; - q->qlength_stealable += qc->qlength_stealable; + atomic_fetch_add_explicit( + &q->qlength, + atomic_load_explicit(&qc->qlength, memory_order_relaxed), + memory_order_relaxed); + atomic_fetch_add_explicit( + &q->qlength_stealable, + atomic_load_explicit(&qc->qlength_stealable, memory_order_relaxed), + memory_order_relaxed); assert(q->tail->next == NULL); assert(q->head->prev == NULL); QTHREAD_TRYLOCK_UNLOCK(&q->qlock); qc->head = qc->tail = NULL; - qc->qlength = qc->qlength_stealable = 0; + // qc->qlength = qc->qlength_stealable = 0; + atomic_store_explicit(&qc->qlength_stealable, 0, memory_order_relaxed); + atomic_store_explicit(&qc->qlength, 0, memory_order_relaxed); #endif /* if 0 */ } } else if (q->head) { @@ -614,7 +659,7 @@ qthread_t INTERNAL *qt_scheduler_get_thread(qt_threadqueue_t *q, node = q->tail; if (node != NULL) { assert(q->head); - assert(q->qlength > 0); + assert(atomic_load_explicit(&q->qlength, memory_order_relaxed) > 0); q->tail = node->prev; if (q->tail == NULL) { @@ -622,14 +667,15 @@ qthread_t INTERNAL *qt_scheduler_get_thread(qt_threadqueue_t *q, } else { q->tail->next = NULL; } - assert(q->qlength > 0); - q->qlength--; - q->qlength_stealable -= node->stealable; + assert(atomic_load_explicit(&q->qlength, memory_order_relaxed) > 0); + atomic_fetch_sub_explicit(&q->qlength, 1, memory_order_relaxed); + atomic_fetch_sub_explicit( + &q->qlength_stealable, node->stealable, memory_order_relaxed); #ifdef QTHREAD_TASK_AGGREGATION if (QTHREAD_TASK_IS_AGGREGABLE(atomic_load_explicit( &node->value->flags, memory_order_relaxed)) && - ((max_t = - (q->qlength) / qthread_readstate(ACTIVE_WORKERS) / DIV_FACTOR) > + ((max_t = atomic_load_explicit(&q->qlength, memory_load_explicit) / + qthread_readstate(ACTIVE_WORKERS) / DIV_FACTOR) > 1)) { // no point creating an agg task with a single simple task max_t = (max_t > MAX_ABS_AGG ? MAX_ABS_AGG : max_t); assert(node->value->thread_state != QTHREAD_STATE_TERM_SHEP); @@ -738,8 +784,9 @@ void INTERNAL qt_threadqueue_enqueue_multiple( } else { first->prev->next = first; } - q->qlength += addCnt; - q->qlength_stealable += addCnt; + atomic_fetch_add_explicit(&q->qlength, addCnt, memory_order_relaxed); + atomic_fetch_add_explicit( + &q->qlength_stealable, addCnt, memory_order_relaxed); QTHREAD_TRYLOCK_UNLOCK(&q->qlock); } /*}}}*/ @@ -754,7 +801,8 @@ qt_threadqueue_dequeue_steal(qt_threadqueue_t *h, long desired_stolen; if (steal_chunksize == 0) { - desired_stolen = v->qlength_stealable / 2; + desired_stolen = + atomic_load_explicit(&v->qlength_stealable, memory_order_relaxed) / 2; } else { desired_stolen = steal_chunksize; } @@ -765,7 +813,9 @@ qt_threadqueue_dequeue_steal(qt_threadqueue_t *h, if (desired_stolen == 0) { desired_stolen = 1; } if (!QTHREAD_TRYLOCK_TRY(&v->qlock)) { return NULL; } - while (v->qlength_stealable > 0 && amtStolen < desired_stolen) { + while (atomic_load_explicit(&v->qlength_stealable, memory_order_relaxed) > + 0 && + amtStolen < desired_stolen) { node = (qt_threadqueue_node_t *)v->head; do { // Find next stealable node (if one exists) @@ -783,8 +833,11 @@ qt_threadqueue_dequeue_steal(qt_threadqueue_t *h, amtStolen++; // Adjust queue length(s) - v->qlength--; - v->qlength_stealable--; + // v->qlength--; + atomic_fetch_sub_explicit(&v->qlength, 1, memory_order_relaxed); + // v->qlength_stealable--; + atomic_fetch_sub_explicit( + &v->qlength_stealable, 1, memory_order_relaxed); // Find next unstealable node, or amount we want to steal qt_threadqueue_node_t *next_to_steal = last_stolen->next; @@ -797,8 +850,9 @@ qt_threadqueue_dequeue_steal(qt_threadqueue_t *h, amtStolen++; // Adjust queue length(s) - v->qlength--; - v->qlength_stealable--; + atomic_fetch_sub_explicit(&v->qlength, 1, memory_order_relaxed); + atomic_fetch_sub_explicit( + &v->qlength_stealable, 1, memory_order_relaxed); next_to_steal = next_to_steal->next; } @@ -831,7 +885,9 @@ qt_threadqueue_dequeue_steal(qt_threadqueue_t *h, } else { break; } - } while (v->qlength_stealable > 0 && amtStolen < desired_stolen); + } while (atomic_load_explicit(&v->qlength_stealable, memory_order_relaxed) > + 0 && + amtStolen < desired_stolen); break; } QTHREAD_TRYLOCK_UNLOCK(&v->qlock); @@ -877,7 +933,8 @@ qthread_steal(qthread_shepherd_t *thief_shepherd) { /*{{{*/ while (stolen == NULL) { qt_threadqueue_t *victim_queue = shepherds[sorted_sheplist[i]].ready; - if (0 != victim_queue->qlength_stealable) { + if (0 != atomic_load_explicit(&victim_queue->qlength_stealable, + memory_order_relaxed)) { stolen = qt_threadqueue_dequeue_steal(myqueue, victim_queue); if (stolen) { qt_threadqueue_node_t *surplus = stolen->next; @@ -890,7 +947,7 @@ qthread_steal(qthread_shepherd_t *thief_shepherd) { /*{{{*/ } } - if ((0 < myqueue->qlength) || + if ((0 < atomic_load_explicit(&myqueue->qlength, memory_order_relaxed)) || steal_disable) { // work at home quit steal attempt break; } @@ -918,7 +975,7 @@ void INTERNAL qt_threadqueue_filter(qt_threadqueue_t *q, */ QTHREAD_TRYLOCK_LOCK(&q->qlock); - if (q->qlength > 0) { + if (atomic_load_explicit(&q->qlength, memory_order_relaxed) > 0) { qt_threadqueue_node_t **lp = NULL; qt_threadqueue_node_t **rp = NULL; @@ -952,8 +1009,11 @@ void INTERNAL qt_threadqueue_filter(qt_threadqueue_t *q, *lp = node->next; *rp = node->prev; - q->qlength--; - q->qlength_stealable -= node->stealable; + // q->qlength--; + // q->qlength_stealable -= node->stealable; + atomic_fetch_sub_explicit(&q->qlength, 1, memory_order_relaxed); + atomic_fetch_sub_explicit( + &q->qlength_stealable, node->stealable, memory_order_relaxed); freeme = node; node = node->prev; if (q->head == node) { @@ -967,8 +1027,11 @@ void INTERNAL qt_threadqueue_filter(qt_threadqueue_t *q, case REMOVE_AND_STOP: // remove, stop looking *lp = node->next; *rp = node->prev; - q->qlength--; - q->qlength_stealable -= node->stealable; + // q->qlength--; + // q->qlength_stealable -= node->stealable; + atomic_fetch_sub_explicit(&q->qlength, 1, memory_order_relaxed); + atomic_fetch_sub_explicit( + &q->qlength_stealable, node->stealable, memory_order_relaxed); FREE_TQNODE(node); node = NULL; break; @@ -989,7 +1052,7 @@ qthread_t INTERNAL *qt_threadqueue_dequeue_specific(qt_threadqueue_t *q, assert(q != NULL); QTHREAD_TRYLOCK_LOCK(&q->qlock); - if (q->qlength > 0) { + if (atomic_load_explicit(&q->qlength, memory_order_relaxed) > 0) { node = (qt_threadqueue_node_t *)q->tail; t = (node) ? (qthread_t *)node->value : NULL; while ((t != NULL) && (t->ret != value)) { diff --git a/test/basics/queue.c b/test/basics/queue.c index 46599fe0b..140b5b198 100644 --- a/test/basics/queue.c +++ b/test/basics/queue.c @@ -103,7 +103,7 @@ int main(int argc, char *argv[]) { test_check(qthread_queue_length(the_queue) == 0); // Again, this relies on approximate estimates that aren't reliable. - // test_check(qthread_readstate(NODE_BUSYNESS) == 1); + test_check(qthread_readstate(NODE_BUSYNESS) == 1); iprintf("6/6 Test passed!\n"); free(retvals);