Skip to content

Commit

Permalink
Update the threading example in 13 and 14 docs, and add some basic ex…
Browse files Browse the repository at this point in the history
…planation text.
  • Loading branch information
taikedz committed Dec 13, 2024
1 parent 08afb41 commit 25131b7
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,77 @@ test "threading" {

Threads, however, aren't particularly useful without strategies for thread
safety.

## Mutexes and WaitGroups

After starting multiple threads, our main thread may need to know when all are finished; in this case we can use a _thread pool_ which, together with a _wait group_ element, allows the spawned threads to declare when they are started and finished working, as well as allowing the main thread to wait for all to have finished working. The `std.Thread.WaitGroup` instance is passed to each threaded function to allow it to declare its state; the `std.Thread.Pool` receives the `WorkGroup` as an element to wait on.

The `Pool` needs to be initialized with an thread safe allocator, one which can allocate memory with awareness of threading. We create a basic allocator (General Purpose, Arena, Heap, etc), and then wrap it in a `ThreadSafeAllocator` to provide this safety for us. This thread-safe allocator can now be passed into any threaded functions.

When different threads try to access a shared variable simultaneously, a race condition can occur where the value is improperly stored to memory. This is a case of thread-unsafe behaviour. To provide thread safety for our own code, we must use a mutually exclusive lock, a _mutex_, provided as `std.Thread.Mutex`. This allows different threads to work together on a same piece of data in memory without clashing.

With these concepts together, we can write the following example program which allows each thread to update a line counter, and print a message. Note that neither thread can attempt to update the counter until the other has finished with accessing it, due to the encapsulation in a mutex, which is declared at the start of the accessing block via `lock()`, and whose release via `.unlock()` is deferred to the end to guarantee its execution. Similarly, the `WaitGroup` is also declared at start of block, and guaranteed to release with `.finished()` at end of block via a `defer`.

```zig
const std = @import("std");
const Thread = std.Thread;
var line_counter:u8 = 0;
fn sleepy(name:[]const u8, steps:u8, mut:*Thread.Mutex, wg:*Thread.WaitGroup) void {
var i:u8 = 0;
wg.start();
defer wg.finish();
while (i < steps) {
std.time.sleep(1 * std.time.ns_per_s);
{
mut.lock();
defer mut.unlock();
line_counter += 1;
i += 1;
std.debug.print("{d} {s}\n", .{line_counter, name});
}
}
}
pub fn main() !void {
// ----- Memory
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
// ----- Thread safety wrapping
var tsafe_allocator: std.heap.ThreadSafeAllocator = .{
.child_allocator = gpa.allocator(),
};
const alloc = tsafe_allocator.allocator();
// To wait on threads we need a waitgroup, and a thread pool
// to wrap the waitgroup.
var wg:Thread.WaitGroup = undefined;
wg.reset();
var pool:Thread.Pool = undefined;
try pool.init(.{.allocator = alloc});
defer pool.deinit();
// A mutex to ensure we don't write the counter simultaneously
var mut:Thread.Mutex = undefined;
// Use OS Thread spawning, pass in a function, and the arguments to pass
// down to it in an anonymous struct
_ = try std.Thread.spawn(.{}, sleepy, .{"One", 1, &mut, &wg});
_ = try std.Thread.spawn(.{}, sleepy, .{"Two", 2, &mut, &wg});
// Wait a little for a thread to call .start() - sometimes we get to the waitgroup
// here and see it empty... before any thread acquires it ...!!
std.time.sleep(1 * std.time.ns_per_s);
pool.waitAndWork(&wg);
std.debug.print("Finished.\n", .{});
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,78 @@ test "threading" {

Threads, however, aren't particularly useful without strategies for thread
safety.

## Mutexes and WaitGroups

After starting multiple threads, our main thread may need to know when all are finished; in this case we can use a _thread pool_ which, together with a _wait group_ element, allows the spawned threads to declare when they are started and finished working, as well as allowing the main thread to wait for all to have finished working. The `std.Thread.WaitGroup` instance is passed to each threaded function to allow it to declare its state; the `std.Thread.Pool` receives the `WorkGroup` as an element to wait on.

The `Pool` needs to be initialized with an thread safe allocator, one which can allocate memory with awareness of threading. We create a basic allocator (General Purpose, Arena, Heap, etc), and then wrap it in a `ThreadSafeAllocator` to provide this safety for us. This thread-safe allocator can now be passed into any threaded functions.

When different threads try to access a shared variable simultaneously, a race condition can occur where the value is improperly stored to memory. This is a case of thread-unsafe behaviour. To provide thread safety for our own code, we must use a mutually exclusive lock, a _mutex_, provided as `std.Thread.Mutex`. This allows different threads to work together on a same piece of data in memory without clashing.

With these concepts together, we can write the following example program which allows each thread to update a line counter, and print a message. Note that neither thread can attempt to update the counter until the other has finished with accessing it, due to the encapsulation in a mutex, which is declared at the start of the accessing block via `lock()`, and whose release via `.unlock()` is deferred to the end to guarantee its execution. Similarly, the `WaitGroup` is also declared at start of block, and guaranteed to release with `.finished()` at end of block via a `defer`.

```zig
const std = @import("std");
const Thread = std.Thread;
var line_counter:u8 = 0;
fn sleepy(name:[]const u8, steps:u8, mut:*Thread.Mutex, wg:*Thread.WaitGroup) void {
var i:u8 = 0;
wg.start();
defer wg.finish();
while (i < steps) {
std.time.sleep(1 * std.time.ns_per_s);
{
mut.lock();
defer mut.unlock();
line_counter += 1;
i += 1;
std.debug.print("{d} {s}\n", .{line_counter, name});
}
}
}
pub fn main() !void {
// ----- Memory
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
// ----- Thread safety wrapping
var tsafe_allocator: std.heap.ThreadSafeAllocator = .{
.child_allocator = gpa.allocator(),
};
const alloc = tsafe_allocator.allocator();
// To wait on threads we need a waitgroup, and a thread pool
// to wrap the waitgroup.
var wg:Thread.WaitGroup = undefined;
wg.reset();
var pool:Thread.Pool = undefined;
try pool.init(.{.allocator = alloc});
defer pool.deinit();
// A mutex to ensure we don't write the counter simultaneously
var mut:Thread.Mutex = undefined;
// Use OS Thread spawning, pass in a function, and the arguments to pass
// down to it in an anonymous struct
_ = try std.Thread.spawn(.{}, sleepy, .{"One", 1, &mut, &wg});
_ = try std.Thread.spawn(.{}, sleepy, .{"Two", 2, &mut, &wg});
// Wait a little for a thread to call .start() - sometimes we get to the waitgroup
// here and see it empty... before any thread acquires it ...!!
std.time.sleep(1 * std.time.ns_per_s);
pool.waitAndWork(&wg);
std.debug.print("Finished.\n", .{});
}
```

0 comments on commit 25131b7

Please sign in to comment.