Skip to content

Commit

Permalink
Fix a few issues (#48)
Browse files Browse the repository at this point in the history
* Fix a few issues

* Make sure that adding a dispatch return event to a node stream creates the stream if needed.
  This could cause jobs that are added right on startup to fail to be dispatched.
* Fix potential panic in cleanup goroutine when the pending jobs map contains nil values
* Make sure to close all maps when closing the node.
* Properly log event information when discarding stale ack events.
* More consistent logging

* Fix a few additional issues

* Ensure stale node streams are gced
* Make close and shutdown code cleaner
* Cleanup all maps properly on node close
  • Loading branch information
raphael authored Nov 30, 2024
1 parent 9881791 commit 67b2457
Show file tree
Hide file tree
Showing 16 changed files with 626 additions and 355 deletions.
4 changes: 2 additions & 2 deletions examples/streaming/multi-sinks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func main() {
}

// Don't forget to close the sink when done
defer sink1.Close()
defer sink1.Close(ctx)

// Read and acknowlege event
ev := <-sink1.Subscribe()
Expand All @@ -70,7 +70,7 @@ func main() {
if err != nil {
panic(err)
}
defer sink2.Close()
defer sink2.Close(ctx)

// Read second event
ev = <-sink2.Subscribe()
Expand Down
2 changes: 1 addition & 1 deletion examples/streaming/multi-streams/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func main() {
}

// Don't forget to close the sink when done
defer sink.Close()
defer sink.Close(ctx)

// Subscribe to events
c := sink.Subscribe()
Expand Down
2 changes: 1 addition & 1 deletion examples/streaming/pub-sub/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func main() {
}

// Don't forget to close the sink when done
defer sink.Close()
defer sink.Close(ctx)

// Read both events
c := sink.Subscribe()
Expand Down
2 changes: 1 addition & 1 deletion examples/streaming/single-sink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func main() {
}

// Don't forget to close the sink when done
defer sink.Close()
defer sink.Close(ctx)

// Consume event
ev := <-sink.Subscribe()
Expand Down
Loading

0 comments on commit 67b2457

Please sign in to comment.