From 124627be799488d13f8a1c75b6656c45a0dd4c72 Mon Sep 17 00:00:00 2001 From: Peter Bourgon Date: Wed, 23 Mar 2016 15:41:53 +0100 Subject: [PATCH] Remove all global log invocations --- connection.go | 16 +++++++++---- connection_maker.go | 8 ++++--- examples/increment-only-counter/main.go | 26 ++++++++++----------- gossip_channel.go | 8 ++++--- gossip_test.go | 6 +++-- local_peer.go | 16 +++++++------ metcd/metcdsrv/main.go | 2 +- metcd/server.go | 2 +- mocks_test.go | 6 +++-- peer.go | 3 +-- peers.go | 17 ++++++++------ peers_test.go | 6 +++-- router.go | 30 +++++++++++++------------ 13 files changed, 84 insertions(+), 62 deletions(-) diff --git a/connection.go b/connection.go index 7677253..099535a 100644 --- a/connection.go +++ b/connection.go @@ -35,15 +35,17 @@ type remoteConnection struct { remoteTCPAddr string outbound bool established bool + logger *log.Logger } -func newRemoteConnection(from, to *Peer, tcpAddr string, outbound bool, established bool) *remoteConnection { +func newRemoteConnection(from, to *Peer, tcpAddr string, outbound bool, established bool, logger *log.Logger) *remoteConnection { return &remoteConnection{ local: from, remote: to, remoteTCPAddr: tcpAddr, outbound: outbound, established: established, + logger: logger, } } @@ -57,6 +59,12 @@ func (conn *remoteConnection) isOutbound() bool { return conn.outbound } func (conn *remoteConnection) isEstablished() bool { return conn.established } +func (conn *remoteConnection) shutdown(error) {} + +func (conn *remoteConnection) log(args ...interface{}) { + conn.logger.Println(append(append([]interface{}{}, fmt.Sprintf("->[%s|%s]:", conn.remoteTCPAddr, conn.remote)), args...)...) +} + // LocalConnection is the local (our) side of a connection. // It implements ProtocolSender, and manages per-channel GossipSenders. type LocalConnection struct { @@ -82,7 +90,7 @@ type LocalConnection struct { // connections map. func startLocalConnection(connRemote *remoteConnection, tcpConn *net.TCPConn, router *Router, acceptNewPeer bool) { if connRemote.local != router.Ourself.Peer { - log.Fatal("Attempt to create local connection from a peer which is not ourself") + panic("attempt to create local connection from a peer which is not ourself") } actionChan := make(chan connectionAction, ChannelSize) errorChan := make(chan error, 1) @@ -377,14 +385,14 @@ func (conn *LocalConnection) actorLoop(actionChan <-chan connectionAction, error func (conn *LocalConnection) teardown(err error) { if conn.remote == nil { - log.Printf("->[%s] connection shutting down due to error during handshake: %v", conn.remoteTCPAddr, err) + conn.logger.Printf("->[%s] connection shutting down due to error during handshake: %v", conn.remoteTCPAddr, err) } else { conn.log("connection shutting down due to error:", err) } if conn.tcpConn != nil { if err := conn.tcpConn.Close(); err != nil { - log.Printf("warning: %v", err) + conn.logger.Printf("warning: %v", err) } } diff --git a/connection_maker.go b/connection_maker.go index bece670..9ce12c7 100644 --- a/connection_maker.go +++ b/connection_maker.go @@ -28,6 +28,7 @@ type connectionMaker struct { connections map[Connection]struct{} directPeers peerAddrs actionChan chan<- connectionMakerAction + logger *log.Logger } // TargetState describes the connection state of a remote target. @@ -57,7 +58,7 @@ type connectionMakerAction func() bool // peers, making outbound connections from localAddr, and listening on // port. If discovery is true, ConnectionMaker will attempt to // initiate new connections with peers it's not directly connected to. -func newConnectionMaker(ourself *localPeer, peers *Peers, localAddr string, port int, discovery bool) *connectionMaker { +func newConnectionMaker(ourself *localPeer, peers *Peers, localAddr string, port int, discovery bool, logger *log.Logger) *connectionMaker { actionChan := make(chan connectionMakerAction, ChannelSize) cm := &connectionMaker{ ourself: ourself, @@ -69,6 +70,7 @@ func newConnectionMaker(ourself *localPeer, peers *Peers, localAddr string, port targets: make(map[string]*target), connections: make(map[Connection]struct{}), actionChan: actionChan, + logger: logger, } go cm.queryLoop(actionChan) return cm @@ -365,9 +367,9 @@ func (cm *connectionMaker) connectToTargets(validTarget map[string]struct{}, dir } func (cm *connectionMaker) attemptConnection(address string, acceptNewPeer bool) { - log.Printf("->[%s] attempting connection", address) + cm.logger.Printf("->[%s] attempting connection", address) if err := cm.ourself.createConnection(cm.localAddr, address, acceptNewPeer); err != nil { - log.Printf("->[%s] error during connection attempt: %v", address, err) + cm.logger.Printf("->[%s] error during connection attempt: %v", address, err) cm.connectionAborted(address, err) } } diff --git a/examples/increment-only-counter/main.go b/examples/increment-only-counter/main.go index 26b670b..127e0a6 100644 --- a/examples/increment-only-counter/main.go +++ b/examples/increment-only-counter/main.go @@ -3,6 +3,7 @@ package main import ( "flag" "fmt" + "io/ioutil" "log" "net" "net/http" @@ -29,20 +30,20 @@ func main() { flag.Var(peers, "peer", "initial peer (may be repeated)") flag.Parse() - log.SetPrefix(*nickname + "> ") + logger := log.New(os.Stderr, *nickname+"> ", log.LstdFlags) host, portStr, err := net.SplitHostPort(*meshListen) if err != nil { - log.Fatalf("mesh address: %s: %v", *meshListen, err) + logger.Fatalf("mesh address: %s: %v", *meshListen, err) } port, err := strconv.Atoi(portStr) if err != nil { - log.Fatalf("mesh address: %s: %v", *meshListen, err) + logger.Fatalf("mesh address: %s: %v", *meshListen, err) } name, err := mesh.PeerNameFromString(*hwaddr) if err != nil { - log.Fatalf("%s: %v", *hwaddr, err) + logger.Fatalf("%s: %v", *hwaddr, err) } router := mesh.NewRouter(mesh.Config{ @@ -53,38 +54,35 @@ func main() { ConnLimit: 64, PeerDiscovery: true, TrustedSubnets: []*net.IPNet{}, - }, name, *nickname, mesh.NullOverlay{}) + }, name, *nickname, mesh.NullOverlay{}, log.New(ioutil.Discard, "", 0)) - peer := newPeer(name, log.New(os.Stderr, *nickname+"> ", log.LstdFlags)) + peer := newPeer(name, logger) gossip := router.NewGossip(*channel, peer) peer.register(gossip) func() { - log.Printf("mesh router starting (%s)", *meshListen) + logger.Printf("mesh router starting (%s)", *meshListen) router.Start() }() defer func() { - log.Printf("mesh router stopping") + logger.Printf("mesh router stopping") router.Stop() }() router.ConnectionMaker.InitiateConnections(peers.slice(), true) - errs := make(chan error, 2) - + errs := make(chan error) go func() { c := make(chan os.Signal) signal.Notify(c, syscall.SIGINT) errs <- fmt.Errorf("%s", <-c) }() - go func() { - log.Printf("HTTP server starting (%s)", *httpListen) + logger.Printf("HTTP server starting (%s)", *httpListen) http.HandleFunc("/", handle(peer)) errs <- http.ListenAndServe(*httpListen, nil) }() - - log.Print(<-errs) + logger.Print(<-errs) } type counter interface { diff --git a/gossip_channel.go b/gossip_channel.go index 3b585e8..c7bd36a 100644 --- a/gossip_channel.go +++ b/gossip_channel.go @@ -13,16 +13,18 @@ type gossipChannel struct { ourself *localPeer routes *routes gossiper Gossiper + logger *log.Logger } // newGossipChannel returns a named, usable channel. // It delegates receiving duties to the passed Gossiper. -func newGossipChannel(channelName string, ourself *localPeer, r *routes, g Gossiper) *gossipChannel { +func newGossipChannel(channelName string, ourself *localPeer, r *routes, g Gossiper, logger *log.Logger) *gossipChannel { return &gossipChannel{ name: channelName, ourself: ourself, routes: r, gossiper: g, + logger: logger, } } @@ -134,7 +136,7 @@ func (c *gossipChannel) makeBroadcastMsg(srcName PeerName, msg []byte) protocolM } func (c *gossipChannel) log(args ...interface{}) { - log.Println(append(append([]interface{}{}, "[gossip "+c.name+"]:"), args...)...) + c.logger.Println(append(append([]interface{}{}, "[gossip "+c.name+"]:"), args...)...) } // GobEncode gob-encodes each item and returns the resulting byte slice. @@ -143,7 +145,7 @@ func gobEncode(items ...interface{}) []byte { enc := gob.NewEncoder(buf) for _, i := range items { if err := enc.Encode(i); err != nil { - log.Fatal(err) + panic(err) } } return buf.Bytes() diff --git a/gossip_test.go b/gossip_test.go index df5a346..89bd6a7 100644 --- a/gossip_test.go +++ b/gossip_test.go @@ -2,6 +2,8 @@ package mesh import ( "fmt" + "io/ioutil" + "log" "sync" "testing" @@ -22,7 +24,7 @@ var _ gossipConnection = &mockGossipConnection{} func newTestRouter(name string) *Router { peerName, _ := PeerNameFromString(name) - router := NewRouter(Config{}, peerName, "nick", nil) + router := NewRouter(Config{}, peerName, "nick", nil, log.New(ioutil.Discard, "", 0)) router.Start() return router } @@ -74,7 +76,7 @@ func (router *Router) newTestGossipConnection(r *Router) *mockGossipConnection { toPeer = router.Peers.fetchWithDefault(toPeer) // Has side-effect of incrementing refcount conn := &mockGossipConnection{ - remoteConnection: remoteConnection{router.Ourself.Peer, toPeer, "", false, true}, + remoteConnection: remoteConnection{router.Ourself.Peer, toPeer, "", false, true, log.New(ioutil.Discard, "", 0)}, dest: r, start: make(chan struct{}), } diff --git a/local_peer.go b/local_peer.go index 5e1c224..fa05dbf 100644 --- a/local_peer.go +++ b/local_peer.go @@ -16,18 +16,20 @@ type localPeer struct { *Peer router *Router actionChan chan<- localPeerAction + logger *log.Logger } // The actor closure used by localPeer. type localPeerAction func() // newLocalPeer returns a usable LocalPeer. -func newLocalPeer(name PeerName, nickName string, router *Router) *localPeer { +func newLocalPeer(name PeerName, nickName string, router *Router, logger *log.Logger) *localPeer { actionChan := make(chan localPeerAction, ChannelSize) peer := &localPeer{ Peer: newPeer(name, nickName, randomPeerUID(), 0, randomPeerShortID()), router: router, actionChan: actionChan, + logger: logger, } go peer.actorLoop(actionChan) return peer @@ -95,7 +97,7 @@ func (peer *localPeer) createConnection(localAddr string, peerAddr string, accep if err != nil { return err } - connRemote := newRemoteConnection(peer.Peer, nil, tcpConn.RemoteAddr().String(), true, false) + connRemote := newRemoteConnection(peer.Peer, nil, tcpConn.RemoteAddr().String(), true, false, peer.logger) startLocalConnection(connRemote, tcpConn, peer.router, acceptNewPeer) return nil } @@ -150,10 +152,10 @@ func (peer *localPeer) actorLoop(actionChan <-chan localPeerAction) { func (peer *localPeer) handleAddConnection(conn ourConnection) error { if peer.Peer != conn.getLocal() { - log.Fatal("Attempt made to add connection to peer where peer is not the source of connection") + panic("Attempt made to add connection to peer where peer is not the source of connection") } if conn.Remote() == nil { - log.Fatal("Attempt made to add connection to peer with unknown remote peer") + panic("Attempt made to add connection to peer with unknown remote peer") } toName := conn.Remote().Name dupErr := fmt.Errorf("Multiple connections to %s added to %s", conn.Remote(), peer.String()) @@ -196,7 +198,7 @@ func (peer *localPeer) handleAddConnection(conn ourConnection) error { func (peer *localPeer) handleConnectionEstablished(conn ourConnection) { if peer.Peer != conn.getLocal() { - log.Fatal("Peer informed of active connection where peer is not the source of connection") + panic("Peer informed of active connection where peer is not the source of connection") } if dupConn, found := peer.connections[conn.Remote().Name]; !found || conn != dupConn { conn.shutdown(fmt.Errorf("Cannot set unknown connection active")) @@ -211,10 +213,10 @@ func (peer *localPeer) handleConnectionEstablished(conn ourConnection) { func (peer *localPeer) handleDeleteConnection(conn ourConnection) { if peer.Peer != conn.getLocal() { - log.Fatal("Attempt made to delete connection from peer where peer is not the source of connection") + panic("Attempt made to delete connection from peer where peer is not the source of connection") } if conn.Remote() == nil { - log.Fatal("Attempt made to delete connection to peer with unknown remote peer") + panic("Attempt made to delete connection to peer with unknown remote peer") } toName := conn.Remote().Name if connFound, found := peer.connections[toName]; !found || connFound != conn { diff --git a/metcd/metcdsrv/main.go b/metcd/metcdsrv/main.go index 3468a28..a81a3fb 100644 --- a/metcd/metcdsrv/main.go +++ b/metcd/metcdsrv/main.go @@ -77,7 +77,7 @@ func main() { ConnLimit: 64, PeerDiscovery: true, TrustedSubnets: []*net.IPNet{}, - }, name, *nickname, mesh.NullOverlay{}) + }, name, *nickname, mesh.NullOverlay{}, logger) // Create a meshconn.Peer. peer := meshconn.NewPeer(name, router.Ourself.UID, logger) diff --git a/metcd/server.go b/metcd/server.go index a86406d..6a077e6 100644 --- a/metcd/server.go +++ b/metcd/server.go @@ -140,7 +140,7 @@ func NewDefaultServer(minPeerCount int, logger *log.Logger) *wackygrpc.Server { ConnLimit: 64, PeerDiscovery: true, TrustedSubnets: []*net.IPNet{}, - }, peerName, nickName, mesh.NullOverlay{}) + }, peerName, nickName, mesh.NullOverlay{}, logger) // Create a meshconn.Peer and connect it to a channel. peer := meshconn.NewPeer(router.Ourself.Peer.Name, router.Ourself.UID, logger) diff --git a/mocks_test.go b/mocks_test.go index 9cc436b..37cb8bf 100644 --- a/mocks_test.go +++ b/mocks_test.go @@ -7,6 +7,8 @@ package mesh import ( "fmt" + "io/ioutil" + "log" "testing" "github.com/stretchr/testify/require" @@ -29,7 +31,7 @@ func (peers *Peers) AddTestRemoteConnection(p1, p2 *Peer) { fromPeer = peers.fetchWithDefault(fromPeer) toPeer := newPeerFrom(p2) toPeer = peers.fetchWithDefault(toPeer) - peers.ourself.addConnection(&remoteConnection{fromPeer, toPeer, "", false, false}) + peers.ourself.addConnection(&remoteConnection{fromPeer, toPeer, "", false, false, log.New(ioutil.Discard, "", 0)}) } func (peers *Peers) DeleteTestConnection(p *Peer) { @@ -46,7 +48,7 @@ func (peers *Peers) DeleteTestConnection(p *Peer) { // from what is created by the real code. func newMockConnection(from, to *Peer) Connection { type mockConnection struct{ remoteConnection } - return &mockConnection{remoteConnection{from, to, "", false, false}} + return &mockConnection{remoteConnection{from, to, "", false, false, log.New(ioutil.Discard, "", 0)}} } func checkEqualConns(t *testing.T, ourName PeerName, got, wanted map[PeerName]Connection) { diff --git a/peer.go b/peer.go index 899441b..22b4138 100644 --- a/peer.go +++ b/peer.go @@ -4,7 +4,6 @@ import ( "crypto/rand" "encoding/binary" "fmt" - "log" "sort" "strconv" ) @@ -169,7 +168,7 @@ func randomPeerShortID() PeerShortID { func randBytes(n int) []byte { buf := make([]byte, n) if _, err := rand.Read(buf); err != nil { - log.Fatal(err) + panic(err) } return buf } diff --git a/peers.go b/peers.go index 56bcd01..dee9f70 100644 --- a/peers.go +++ b/peers.go @@ -19,6 +19,8 @@ type Peers struct { // Called when the mapping from short IDs to peers changes onInvalidateShortIDs []func() + + logger *log.Logger } type shortIDPeers struct { @@ -56,11 +58,12 @@ type peersPendingNotifications struct { localPeerModified bool } -func newPeers(ourself *localPeer) *Peers { +func newPeers(ourself *localPeer, logger *log.Logger) *Peers { peers := &Peers{ ourself: ourself, byName: make(map[PeerName]*Peer), byShortID: make(map[PeerShortID]shortIDPeers), + logger: logger, } peers.fetchWithDefault(ourself.Peer) return peers @@ -492,7 +495,7 @@ func (peers *Peers) applyDecodedUpdate(decodedUpdate []*Peer, decodedConns [][]c pending.localPeerModified = peers.ourself.setVersionBeyond(newPeer.Version) } case newPeer: - peer.connections = makeConnsMap(peer, connSummaries, peers.byName) + peer.connections = makeConnsMap(peer, connSummaries, peers.byName, peers.logger) newUpdate[name] = struct{}{} default: // existing peer if newPeer.Version < peer.Version || @@ -505,7 +508,7 @@ func (peers *Peers) applyDecodedUpdate(decodedUpdate []*Peer, decodedConns [][]c peer.Version = newPeer.Version peer.UID = newPeer.UID peer.NickName = newPeer.NickName - peer.connections = makeConnsMap(peer, connSummaries, peers.byName) + peer.connections = makeConnsMap(peer, connSummaries, peers.byName, peers.logger) if newPeer.ShortID != peer.ShortID || newPeer.HasShortID != peer.HasShortID { peers.deleteByShortID(peer, pending) @@ -521,7 +524,7 @@ func (peers *Peers) applyDecodedUpdate(decodedUpdate []*Peer, decodedConns [][]c func (peer *Peer) encode(enc *gob.Encoder) { if err := enc.Encode(peer.peerSummary); err != nil { - log.Fatal(err) + panic(err) } connSummaries := []connectionSummary{} @@ -535,7 +538,7 @@ func (peer *Peer) encode(enc *gob.Encoder) { } if err := enc.Encode(connSummaries); err != nil { - log.Fatal(err) + panic(err) } } @@ -549,12 +552,12 @@ func decodePeer(dec *gob.Decoder) (ps peerSummary, connSummaries []connectionSum return } -func makeConnsMap(peer *Peer, connSummaries []connectionSummary, byName map[PeerName]*Peer) map[PeerName]Connection { +func makeConnsMap(peer *Peer, connSummaries []connectionSummary, byName map[PeerName]*Peer, logger *log.Logger) map[PeerName]Connection { conns := make(map[PeerName]Connection) for _, connSummary := range connSummaries { name := PeerNameFromBin(connSummary.NameByte) remotePeer := byName[name] - conn := newRemoteConnection(peer, remotePeer, connSummary.RemoteTCPAddr, connSummary.Outbound, connSummary.Established) + conn := newRemoteConnection(peer, remotePeer, connSummary.RemoteTCPAddr, connSummary.Outbound, connSummary.Established, logger) conns[name] = conn } return conns diff --git a/peers_test.go b/peers_test.go index c9bec85..e3dcd6f 100644 --- a/peers_test.go +++ b/peers_test.go @@ -2,6 +2,8 @@ package mesh import ( "fmt" + "io/ioutil" + "log" "math/rand" "testing" "time" @@ -19,8 +21,8 @@ import ( // - non-gc of peers that are only referenced locally func newNode(name PeerName) (*Peer, *Peers) { - peer := newLocalPeer(name, "", nil) - peers := newPeers(peer) + peer := newLocalPeer(name, "", nil, log.New(ioutil.Discard, "", 0)) + peers := newPeers(peer, log.New(ioutil.Discard, "", 0)) return peer.Peer, peers } diff --git a/router.go b/router.go index 08d9fe3..9e1048c 100644 --- a/router.go +++ b/router.go @@ -53,10 +53,11 @@ type Router struct { gossipChannels gossipChannels topologyGossip Gossip acceptLimiter *tokenBucket + logger *log.Logger } // NewRouter returns a new router. It must be started. -func NewRouter(config Config, name PeerName, nickName string, overlay Overlay) *Router { +func NewRouter(config Config, name PeerName, nickName string, overlay Overlay, logger *log.Logger) *Router { router := &Router{Config: config, gossipChannels: make(gossipChannels)} if overlay == nil { @@ -64,15 +65,16 @@ func NewRouter(config Config, name PeerName, nickName string, overlay Overlay) * } router.Overlay = overlay - router.Ourself = newLocalPeer(name, nickName, router) - router.Peers = newPeers(router.Ourself) + router.Ourself = newLocalPeer(name, nickName, router, logger) + router.Peers = newPeers(router.Ourself, logger) router.Peers.OnGC(func(peer *Peer) { - log.Println("Removed unreachable peer", peer) + logger.Println("Removed unreachable peer", peer) }) router.Routes = newRoutes(router.Ourself, router.Peers) - router.ConnectionMaker = newConnectionMaker(router.Ourself, router.Peers, net.JoinHostPort(router.Host, "0"), router.Port, router.PeerDiscovery) + router.ConnectionMaker = newConnectionMaker(router.Ourself, router.Peers, net.JoinHostPort(router.Host, "0"), router.Port, router.PeerDiscovery, logger) router.topologyGossip = router.NewGossip("topology", router) router.acceptLimiter = newTokenBucket(acceptMaxTokens, acceptTokenDelay) + router.logger = logger return router } @@ -96,18 +98,18 @@ func (router *Router) usingPassword() bool { func (router *Router) listenTCP() { localAddr, err := net.ResolveTCPAddr("tcp4", net.JoinHostPort(router.Host, fmt.Sprint(router.Port))) if err != nil { - log.Fatal(err) + panic(err) } ln, err := net.ListenTCP("tcp4", localAddr) if err != nil { - log.Fatal(err) + panic(err) } go func() { defer ln.Close() for { tcpConn, err := ln.AcceptTCP() if err != nil { - log.Println(err) + router.logger.Println(err) continue } router.acceptTCP(tcpConn) @@ -118,8 +120,8 @@ func (router *Router) listenTCP() { func (router *Router) acceptTCP(tcpConn *net.TCPConn) { remoteAddrStr := tcpConn.RemoteAddr().String() - log.Printf("->[%s] connection accepted", remoteAddrStr) - connRemote := newRemoteConnection(router.Ourself.Peer, nil, remoteAddrStr, false, false) + router.logger.Printf("->[%s] connection accepted", remoteAddrStr) + connRemote := newRemoteConnection(router.Ourself.Peer, nil, remoteAddrStr, false, false, router.logger) startLocalConnection(connRemote, tcpConn, router, true) } @@ -127,11 +129,11 @@ func (router *Router) acceptTCP(tcpConn *net.TCPConn) { // // TODO(pb): rename? func (router *Router) NewGossip(channelName string, g Gossiper) Gossip { - channel := newGossipChannel(channelName, router.Ourself, router.Routes, g) + channel := newGossipChannel(channelName, router.Ourself, router.Routes, g, router.logger) router.gossipLock.Lock() defer router.gossipLock.Unlock() if _, found := router.gossipChannels[channelName]; found { - log.Fatalf("[gossip] duplicate channel %s", channelName) + panic(fmt.Sprintf("[gossip] duplicate channel %s", channelName)) } router.gossipChannels[channelName] = channel return channel @@ -149,7 +151,7 @@ func (router *Router) gossipChannel(channelName string) *gossipChannel { if channel, found = router.gossipChannels[channelName]; found { return channel } - channel = newGossipChannel(channelName, router.Ourself, router.Routes, &surrogateGossiper{}) + channel = newGossipChannel(channelName, router.Ourself, router.Routes, &surrogateGossiper{}, router.logger) channel.log("created surrogate channel") router.gossipChannels[channelName] = channel return channel @@ -277,7 +279,7 @@ func (router *Router) trusts(remote *remoteConnection) bool { } } else { // Should not happen as remoteTCPAddr was obtained from TCPConn - log.Printf("Unable to parse remote TCP addr: %s", err) + router.logger.Printf("Unable to parse remote TCP addr: %s", err) } return false }