diff --git a/blox/blox.go b/blox/blox.go index 42ec5157..9813b3db 100644 --- a/blox/blox.go +++ b/blox/blox.go @@ -202,6 +202,24 @@ func (p *Blox) AnnounceJoinPoolRequestPeriodically(ctx context.Context) { go p.an.AnnounceJoinPoolRequestPeriodically(ctx) } +func (p *Blox) ProvideLinkByDht(l ipld.Link) error { + //This is for unit testing and no need to call directly + log.Debug("ProvideLinkByDht test") + return p.ex.ProvideDht(l) +} + +func (p *Blox) FindLinkProvidersByDht(l ipld.Link) ([]peer.AddrInfo, error) { + //This is for unit testing and no need to call directly + log.Debug("FindLinkProvidersByDht test") + return p.ex.FindProvidersDht(l) +} + +func (p *Blox) UpdateDhtPeers(peers []peer.ID) error { + //This is for unit testing and no need to call directly + log.Debug("UpdateDhtPeers test") + return p.ex.UpdateDhtPeers(peers) +} + func (p *Blox) Shutdown(ctx context.Context) error { log.Info("Shutdown in progress") diff --git a/cmd/blox/main.go b/cmd/blox/main.go index 2ea92d44..bdef3580 100644 --- a/cmd/blox/main.go +++ b/cmd/blox/main.go @@ -21,9 +21,11 @@ import ( logging "github.com/ipfs/go-log/v2" "github.com/ipni/index-provider/engine" "github.com/libp2p/go-libp2p" + dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/host/autorelay" "github.com/mdp/qrterminal" "github.com/multiformats/go-multiaddr" @@ -145,7 +147,7 @@ func init() { Name: "ipniPublishDirectAnnounce", Usage: "The list of IPNI URLs to which make direct announcements.", Destination: &app.config.directAnnounce, - Value: cli.NewStringSlice("https://cid.contact/ingest/announce"), + Value: cli.NewStringSlice("https://announce.relay2.functionyard.fula.network/ingest/announce"), }), altsrc.NewStringFlag(&cli.StringFlag{ Name: "ipniPublisherIdentity", @@ -489,6 +491,13 @@ func action(ctx *cli.Context) error { engine.WithPublisherKind(engine.DataTransferPublisher), engine.WithDirectAnnounce(app.config.IpniPublishDirectAnnounce...), ), + exchange.WithDhtProviderOptions( + dht.Datastore(namespace.Wrap(ds, datastore.NewKey("dht"))), + dht.ProtocolExtension(protocol.ID("/"+app.config.PoolName)), + dht.ProtocolPrefix("/fula"), + dht.Resiliency(1), + dht.Mode(dht.ModeAutoServer), + ), ), ) if err != nil { diff --git a/cmd/dhttest/main.go b/cmd/dhttest/main.go new file mode 100644 index 00000000..1eab441e --- /dev/null +++ b/cmd/dhttest/main.go @@ -0,0 +1,475 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "math/rand" + "net/http" + "time" + + "github.com/functionland/go-fula/blox" + "github.com/functionland/go-fula/exchange" + logging "github.com/ipfs/go-log/v2" + "github.com/ipld/go-ipld-prime/fluent" + "github.com/ipld/go-ipld-prime/node/basicnode" + "github.com/libp2p/go-libp2p" + dht "github.com/libp2p/go-libp2p-kad-dht" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/core/protocol" +) + +var log = logging.Logger("fula/dhttest") + +// requestLoggerMiddleware logs the details of each request +func requestLoggerMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Log the request + body, _ := io.ReadAll(r.Body) + log.Debugw("Received request", "url", r.URL.Path, "method", r.Method, "body", string(body)) + if r.URL.Path == "/fula/pool/vote" { + fmt.Printf("Voted on QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe %s", string(body)) + } + + // Create a new io.Reader from the read body as the original body is now drained + r.Body = io.NopCloser(bytes.NewBuffer(body)) + + // Call the next handler + next.ServeHTTP(w, r) + }) +} +func startMockServer(addr string) *http.Server { + handler := http.NewServeMux() + + handler.HandleFunc("/fula/pool/join", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/cancel_join", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/poolrequests", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "poolrequests": []map[string]interface{}{ + { + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + "voted": []string{}, + "positive_votes": 0, + "peer_id": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + }, + }, + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/all", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pools": []map[string]interface{}{ + { + "pool_id": 1, + "creator": "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY", + "pool_name": "PoolTest1", + "region": "Ontario", + "parent": nil, + "participants": []string{ + "QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT", + "QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF", + "QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA", + }, + }, + }, + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/users", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "users": []map[string]interface{}{ + { + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + "pool_id": nil, + "request_pool_id": 1, + "peer_id": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + }, + { + "account": "QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT", + "pool_id": 1, + "request_pool_id": nil, + "peer_id": "QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT", + }, + { + "account": "QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF", + "pool_id": 1, + "request_pool_id": nil, + "peer_id": "QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF", + }, + { + "account": "QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA", + "pool_id": 1, + "request_pool_id": nil, + "peer_id": "QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA", + }, + }, + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/vote", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/leave", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + } + json.NewEncoder(w).Encode(response) + }) + + // Wrap the handlers with the logging middleware + loggedHandler := requestLoggerMiddleware(handler) + + // Create an HTTP server + server := &http.Server{ + Addr: addr, + Handler: loggedHandler, + } + // Start the server in a new goroutine + go func() { + if err := server.ListenAndServe(); err != http.ErrServerClosed { + panic(err) // Handle the error as you see fit + } + }() + + // Give the server a moment to start + time.Sleep(time.Millisecond * 100) + + return server +} + +func updatePoolName(newPoolName string) error { + return nil +} + +func main() { + server := startMockServer("127.0.0.1:4000") + defer func() { + // Shutdown the server after test + if err := server.Shutdown(context.Background()); err != nil { + panic(err) // Handle the error as you see fit + } + }() + + const poolName = "1" + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + // Elevate log level to show internal communications. + if err := logging.SetLogLevel("*", "info"); err != nil { + panic(err) + } + + // Use a deterministic random generator to generate deterministic + // output for the example. + rng := rand.New(rand.NewSource(42)) + + // Instantiate the first node in the pool + pid1, _, err := crypto.GenerateECDSAKeyPair(rng) + if err != nil { + panic(err) + } + h1, err := libp2p.New(libp2p.Identity(pid1)) + if err != nil { + panic(err) + } + n1, err := blox.New( + blox.WithPoolName(poolName), + blox.WithTopicName(poolName), + blox.WithHost(h1), + blox.WithUpdatePoolName(updatePoolName), + blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithExchangeOpts( + exchange.WithDhtProviderOptions( + dht.ProtocolExtension(protocol.ID("/"+poolName)), + dht.ProtocolPrefix("/fula"), + dht.Resiliency(1), + dht.Mode(dht.ModeAutoServer), + ), + ), + ) + if err != nil { + panic(err) + } + if err := n1.Start(ctx); err != nil { + panic(err) + } + defer n1.Shutdown(ctx) + fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String()) + + // Instantiate the second node in the pool + pid2, _, err := crypto.GenerateECDSAKeyPair(rng) + if err != nil { + panic(err) + } + h2, err := libp2p.New(libp2p.Identity(pid2)) + if err != nil { + panic(err) + } + n2, err := blox.New( + blox.WithPoolName(poolName), + blox.WithTopicName(poolName), + blox.WithHost(h2), + blox.WithUpdatePoolName(updatePoolName), + blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithExchangeOpts( + exchange.WithDhtProviderOptions( + dht.ProtocolExtension(protocol.ID("/"+poolName)), + dht.ProtocolPrefix("/fula"), + dht.Resiliency(1), + dht.Mode(dht.ModeAutoServer), + ), + ), + ) + if err != nil { + panic(err) + } + if err := n2.Start(ctx); err != nil { + panic(err) + } + defer n2.Shutdown(ctx) + fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String()) + + // Instantiate the third node in the pool + pid3, _, err := crypto.GenerateECDSAKeyPair(rng) + if err != nil { + panic(err) + } + h3, err := libp2p.New(libp2p.Identity(pid3)) + if err != nil { + panic(err) + } + n3, err := blox.New( + blox.WithPoolName(poolName), + blox.WithTopicName(poolName), + blox.WithHost(h3), + blox.WithUpdatePoolName(updatePoolName), + blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithExchangeOpts( + exchange.WithDhtProviderOptions( + dht.ProtocolExtension(protocol.ID("/"+poolName)), + dht.ProtocolPrefix("/fula"), + dht.Resiliency(1), + dht.Mode(dht.ModeAutoServer), + ), + ), + ) + if err != nil { + panic(err) + } + if err := n3.Start(ctx); err != nil { + panic(err) + } + defer n3.Shutdown(ctx) + fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h3.ID().String()) + + // Instantiate the fourth node not in the pool + pid4, _, err := crypto.GenerateECDSAKeyPair(rng) + if err != nil { + panic(err) + } + h4, err := libp2p.New(libp2p.Identity(pid4)) + if err != nil { + panic(err) + } + n4, err := blox.New( + blox.WithPoolName("0"), + blox.WithTopicName("0"), + blox.WithHost(h4), + blox.WithUpdatePoolName(updatePoolName), + blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithExchangeOpts( + exchange.WithDhtProviderOptions( + dht.ProtocolExtension(protocol.ID("/"+poolName)), + dht.ProtocolPrefix("/fula"), + dht.Resiliency(1), + dht.Mode(dht.ModeAutoServer), + ), + ), + ) + if err != nil { + panic(err) + } + if err := n4.Start(ctx); err != nil { + panic(err) + } + defer n4.Shutdown(ctx) + fmt.Printf("Instantiated node in pool %s with ID: %s\n", "0", h4.ID().String()) + + // Connect n1 to n2 and n3 so that there is a path for gossip propagation. + // Note that we are not connecting n2 to n3 as they should discover + // each other via pool's iexist announcements. + h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL) + if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil { + panic(err) + } + h1.Peerstore().AddAddrs(h3.ID(), h3.Addrs(), peerstore.PermanentAddrTTL) + if err = h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}); err != nil { + panic(err) + } + + // Wait until the nodes discover each other + for { + if len(h1.Peerstore().Peers()) == 4 && + len(h2.Peerstore().Peers()) == 4 && + len(h3.Peerstore().Peers()) == 4 { + break + } + select { + case <-ctx.Done(): + panic(ctx.Err()) + default: + time.Sleep(time.Second) + } + } + + h1Peers := h1.Peerstore().Peers() + fmt.Printf("Finally %s peerstore contains %d nodes:\n", h1.ID(), len(h1Peers)) + for _, id := range h1Peers { + fmt.Printf("- %s\n", id) + } + + h2Peers := h2.Peerstore().Peers() + fmt.Printf("Finally %s peerstore contains %d nodes:\n", h2.ID(), len(h2Peers)) + for _, id := range h2Peers { + fmt.Printf("- %s\n", id) + } + + h3Peers := h3.Peerstore().Peers() + fmt.Printf("Finally %s peerstore contains %d nodes:\n", h3.ID(), len(h3Peers)) + for _, id := range h3Peers { + fmt.Printf("- %s\n", id) + } + + //Manually adding h4 as it is not in the same pool + h1.Peerstore().AddAddrs(h4.ID(), h4.Addrs(), peerstore.PermanentAddrTTL) + if err = h1.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil { + panic(err) + } + //Manually adding h4 as it is not in the same pool + h2.Peerstore().AddAddrs(h4.ID(), h4.Addrs(), peerstore.PermanentAddrTTL) + if err = h2.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil { + panic(err) + } + //Manually adding h4 as it is not in the same pool + h3.Peerstore().AddAddrs(h4.ID(), h4.Addrs(), peerstore.PermanentAddrTTL) + if err = h3.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil { + panic(err) + } + + // Wait until the fourth node discover others + for { + if len(h4.Peerstore().Peers()) >= 4 { + break + } else { + fmt.Printf("%s peerstore contains %d nodes:\n", h4.ID(), len(h4.Peerstore().Peers())) + } + select { + case <-ctx.Done(): + panic(ctx.Err()) + default: + time.Sleep(time.Second) + } + } + + //Store a link in h1 and find providers from h2 + + // Generate a sample DAG and store it on node 1 (n1) in the pool, which we will pull from n1 + n1leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) { + na.AssembleEntry("this").AssignBool(true) + }) + n1leafLink, err := n1.Store(ctx, n1leaf) + if err != nil { + panic(err) + } + n1Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) { + na.AssembleEntry("that").AssignInt(42) + na.AssembleEntry("oneLeafLink").AssignLink(n1leafLink) + }) + n1RootLink, err := n1.Store(ctx, n1Root) + if err != nil { + panic(err) + } + fmt.Printf("%s stored IPLD data with links:\n root: %s\n leaf:%s\n", h1.ID(), n1RootLink, n1leafLink) + + // Generate a sample DAG and store it on node 2 (n1) in the pool, which we will push to n1 + n2leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) { + na.AssembleEntry("that").AssignBool(false) + }) + n2leafLink, err := n2.Store(ctx, n2leaf) + if err != nil { + panic(err) + } + n2Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) { + na.AssembleEntry("this").AssignInt(24) + na.AssembleEntry("anotherLeafLink").AssignLink(n2leafLink) + }) + n2RootLink, err := n2.Store(ctx, n2Root) + if err != nil { + panic(err) + } + fmt.Printf("%s stored IPLD data with links:\n root: %s\n leaf:%s\n", h1.ID(), n2RootLink, n2leafLink) + + //n1.UpdateDhtPeers(h2.Peerstore().Peers()) + //n2.UpdateDhtPeers(h1.Peerstore().Peers()) + + err = n1.ProvideLinkByDht(n1RootLink) + if err != nil { + fmt.Print("Error happened here") + panic(err) + } + peerlist1, err := n2.FindLinkProvidersByDht(n1RootLink) + if err != nil { + panic(err) + } + fmt.Println(peerlist1) + + // Unordered output: + // Instantiated node in pool 1 with ID: QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // Instantiated node in pool 1 with ID: QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF + // Instantiated node in pool 1 with ID: QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA + // Instantiated node in pool 0 with ID: QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe + // Finally QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT peerstore contains 4 nodes: + // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF + // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA + // - QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe + // Finally QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF peerstore contains 4 nodes: + // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF + // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA + // - QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe + // Finally QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA peerstore contains 4 nodes: + // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF + // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA + // - QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe + // [{QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT: [/ip4/127.0.0.1/udp/62495/quic-v1/webtransport/certhash/uEiA8nNwsF3xgNvpbRn7TBQgFKFmTFHka5os9D8yXYaeK2Q/certhash/uEiCDw6zn5RGqUe6FHv_bz45QnLPkCoZDwNdx6wETFMx4pA /ip4/192.168.2.14/tcp/42433 /ip4/192.168.2.14/udp/62494/quic-v1 /ip6/::1/udp/62497/quic-v1/webtransport/certhash/uEiA8nNwsF3xgNvpbRn7TBQgFKFmTFHka5os9D8yXYaeK2Q/certhash/uEiCDw6zn5RGqUe6FHv_bz45QnLPkCoZDwNdx6wETFMx4pA /ip4/127.0.0.1/tcp/42433 /ip4/127.0.0.1/udp/62494/quic-v1 /ip4/192.168.2.14/udp/62495/quic-v1/webtransport/certhash/uEiA8nNwsF3xgNvpbRn7TBQgFKFmTFHka5os9D8yXYaeK2Q/certhash/uEiCDw6zn5RGqUe6FHv_bz45QnLPkCoZDwNdx6wETFMx4pA /ip6/::1/tcp/42434 /ip6/::1/udp/62496/quic /ip6/::1/udp/62496/quic-v1 /dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835/p2p-circuit /ip4/127.0.0.1/udp/62494/quic /ip4/192.168.2.14/udp/62494/quic]}] +} diff --git a/exchange/dht.go b/exchange/dht.go new file mode 100644 index 00000000..0ec21a31 --- /dev/null +++ b/exchange/dht.go @@ -0,0 +1,81 @@ +package exchange + +import ( + "context" + "errors" + + "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + dht "github.com/libp2p/go-libp2p-kad-dht" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multihash" +) + +type fulaDht struct { + *options + h host.Host + dh *dht.IpfsDHT + ctx context.Context + cancel context.CancelFunc +} + +func newDhtProvider(h host.Host, opts *options) (*fulaDht, error) { + d := &fulaDht{ + h: h, + options: opts, + } + d.ctx, d.cancel = context.WithCancel(context.Background()) + var err error + d.dh, err = dht.New(d.ctx, h, opts.dhtProviderOpts...) + if err != nil { + return nil, err + } + log.Infow("Dht is initialized with this", "peers") + return d, nil +} + +func (d *fulaDht) AddPeer(p peer.ID) { + d.dh.RoutingTable().PeerAdded(p) + err := d.dh.Ping(d.ctx, p) + log.Infow("AddPeer", "self", d.dh.PeerID(), "added", p, "err", err) + d.dh.RefreshRoutingTable() +} + +func (d *fulaDht) Provide(l ipld.Link) error { + if l == nil { + err := errors.New("link is nil") + log.Errorw("Failed to Provide Dht, link is nil", "err", err) + return err + } + link, ok := l.(cidlink.Link) + if ok && + !cid.Undef.Equals(link.Cid) && + link.Cid.Prefix().MhType != multihash.IDENTITY { + return d.dh.Provide(d.ctx, link.Cid, true) + } + err := errors.New("cid is undefined or invalid") + return err +} + +func (d *fulaDht) FindProviders(l ipld.Link) ([]peer.AddrInfo, error) { + if l == nil { + err := errors.New("link is nil") + log.Errorw("Failed to Provide Dht, link is nil", "err", err) + return nil, err + } + link, ok := l.(cidlink.Link) + if ok && + !cid.Undef.Equals(link.Cid) && + link.Cid.Prefix().MhType != multihash.IDENTITY { + return d.dh.FindProviders(d.ctx, link.Cid) + } + err := errors.New("cid is undefined or invalid") + return nil, err +} + +func (d *fulaDht) Shutdown() error { + d.cancel() + return d.dh.Close() +} diff --git a/exchange/dht_test.go b/exchange/dht_test.go new file mode 100644 index 00000000..0c16cb41 --- /dev/null +++ b/exchange/dht_test.go @@ -0,0 +1,480 @@ +package exchange_test + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "math/rand" + "net/http" + "testing" + "time" + + "github.com/functionland/go-fula/blox" + "github.com/functionland/go-fula/exchange" + logging "github.com/ipfs/go-log/v2" + "github.com/ipld/go-ipld-prime/fluent" + "github.com/ipld/go-ipld-prime/node/basicnode" + "github.com/libp2p/go-libp2p" + dht "github.com/libp2p/go-libp2p-kad-dht" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/core/protocol" +) + +var log = logging.Logger("fula/dhttest") + +// requestLoggerMiddleware logs the details of each request +func requestLoggerMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Log the request + body, _ := io.ReadAll(r.Body) + log.Debugw("Received request", "url", r.URL.Path, "method", r.Method, "body", string(body)) + if r.URL.Path == "/fula/pool/vote" { + fmt.Printf("Voted on QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe %s", string(body)) + } + + // Create a new io.Reader from the read body as the original body is now drained + r.Body = io.NopCloser(bytes.NewBuffer(body)) + + // Call the next handler + next.ServeHTTP(w, r) + }) +} +func startMockServer(addr string) *http.Server { + handler := http.NewServeMux() + + handler.HandleFunc("/fula/pool/join", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/cancel_join", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/poolrequests", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "poolrequests": []map[string]interface{}{ + { + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + "voted": []string{}, + "positive_votes": 0, + "peer_id": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + }, + }, + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/all", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pools": []map[string]interface{}{ + { + "pool_id": 1, + "creator": "5GrwvaEF5zXb26Fz9rcQpDWS57CtERHpNehXCPcNoHGKutQY", + "pool_name": "PoolTest1", + "region": "Ontario", + "parent": nil, + "participants": []string{ + "QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT", + "QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF", + "QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA", + }, + }, + }, + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/users", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "users": []map[string]interface{}{ + { + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + "pool_id": nil, + "request_pool_id": 1, + "peer_id": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + }, + { + "account": "QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT", + "pool_id": 1, + "request_pool_id": nil, + "peer_id": "QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT", + }, + { + "account": "QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF", + "pool_id": 1, + "request_pool_id": nil, + "peer_id": "QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF", + }, + { + "account": "QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA", + "pool_id": 1, + "request_pool_id": nil, + "peer_id": "QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA", + }, + }, + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/vote", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + } + json.NewEncoder(w).Encode(response) + }) + + handler.HandleFunc("/fula/pool/leave", func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "pool_id": 1, + "account": "QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe", + } + json.NewEncoder(w).Encode(response) + }) + + // Wrap the handlers with the logging middleware + loggedHandler := requestLoggerMiddleware(handler) + + // Create an HTTP server + server := &http.Server{ + Addr: addr, + Handler: loggedHandler, + } + // Start the server in a new goroutine + go func() { + if err := server.ListenAndServe(); err != http.ErrServerClosed { + panic(err) // Handle the error as you see fit + } + }() + + // Give the server a moment to start + time.Sleep(time.Millisecond * 100) + + return server +} + +func updatePoolName(newPoolName string) error { + return nil +} + +func TestDHTFunctionality(t *testing.T) { + server := startMockServer("127.0.0.1:4001") + defer func() { + // Shutdown the server after test + if err := server.Shutdown(context.Background()); err != nil { + panic(err) // Handle the error as you see fit + } + }() + + const poolName = "1" + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + // Elevate log level to show internal communications. + if err := logging.SetLogLevel("*", "info"); err != nil { + panic(err) + } + + // Use a deterministic random generator to generate deterministic + // output for the example. + rng := rand.New(rand.NewSource(42)) + + // Instantiate the first node in the pool + pid1, _, err := crypto.GenerateECDSAKeyPair(rng) + if err != nil { + panic(err) + } + h1, err := libp2p.New(libp2p.Identity(pid1)) + if err != nil { + panic(err) + } + n1, err := blox.New( + blox.WithPoolName(poolName), + blox.WithTopicName(poolName), + blox.WithHost(h1), + blox.WithUpdatePoolName(updatePoolName), + blox.WithBlockchainEndPoint("127.0.0.1:4001"), + blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithExchangeOpts( + exchange.WithDhtProviderOptions( + dht.ProtocolExtension(protocol.ID("/"+poolName)), + dht.ProtocolPrefix("/fula"), + dht.Resiliency(1), + dht.Mode(dht.ModeAutoServer), + ), + ), + ) + if err != nil { + panic(err) + } + if err := n1.Start(ctx); err != nil { + panic(err) + } + defer n1.Shutdown(ctx) + fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h1.ID().String()) + + // Instantiate the second node in the pool + pid2, _, err := crypto.GenerateECDSAKeyPair(rng) + if err != nil { + panic(err) + } + h2, err := libp2p.New(libp2p.Identity(pid2)) + if err != nil { + panic(err) + } + n2, err := blox.New( + blox.WithPoolName(poolName), + blox.WithTopicName(poolName), + blox.WithHost(h2), + blox.WithUpdatePoolName(updatePoolName), + blox.WithBlockchainEndPoint("127.0.0.1:4001"), + blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithExchangeOpts( + exchange.WithDhtProviderOptions( + dht.ProtocolExtension(protocol.ID("/"+poolName)), + dht.ProtocolPrefix("/fula"), + dht.Resiliency(1), + dht.Mode(dht.ModeAutoServer), + ), + ), + ) + if err != nil { + panic(err) + } + if err := n2.Start(ctx); err != nil { + panic(err) + } + defer n2.Shutdown(ctx) + fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h2.ID().String()) + + // Instantiate the third node in the pool + pid3, _, err := crypto.GenerateECDSAKeyPair(rng) + if err != nil { + panic(err) + } + h3, err := libp2p.New(libp2p.Identity(pid3)) + if err != nil { + panic(err) + } + n3, err := blox.New( + blox.WithPoolName(poolName), + blox.WithTopicName(poolName), + blox.WithHost(h3), + blox.WithUpdatePoolName(updatePoolName), + blox.WithBlockchainEndPoint("127.0.0.1:4001"), + blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithExchangeOpts( + exchange.WithDhtProviderOptions( + dht.ProtocolExtension(protocol.ID("/"+poolName)), + dht.ProtocolPrefix("/fula"), + dht.Resiliency(1), + dht.Mode(dht.ModeAutoServer), + ), + ), + ) + if err != nil { + panic(err) + } + if err := n3.Start(ctx); err != nil { + panic(err) + } + defer n3.Shutdown(ctx) + fmt.Printf("Instantiated node in pool %s with ID: %s\n", poolName, h3.ID().String()) + + // Instantiate the fourth node not in the pool + pid4, _, err := crypto.GenerateECDSAKeyPair(rng) + if err != nil { + panic(err) + } + h4, err := libp2p.New(libp2p.Identity(pid4)) + if err != nil { + panic(err) + } + n4, err := blox.New( + blox.WithPoolName("0"), + blox.WithTopicName("0"), + blox.WithHost(h4), + blox.WithUpdatePoolName(updatePoolName), + blox.WithBlockchainEndPoint("127.0.0.1:4001"), + blox.WithRelays([]string{"/dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835"}), + blox.WithExchangeOpts( + exchange.WithDhtProviderOptions( + dht.ProtocolExtension(protocol.ID("/"+poolName)), + dht.ProtocolPrefix("/fula"), + dht.Resiliency(1), + dht.Mode(dht.ModeAutoServer), + ), + ), + ) + if err != nil { + panic(err) + } + if err := n4.Start(ctx); err != nil { + panic(err) + } + defer n4.Shutdown(ctx) + fmt.Printf("Instantiated node in pool %s with ID: %s\n", "0", h4.ID().String()) + + // Connect n1 to n2 and n3 so that there is a path for gossip propagation. + // Note that we are not connecting n2 to n3 as they should discover + // each other via pool's iexist announcements. + h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL) + if err = h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil { + panic(err) + } + h1.Peerstore().AddAddrs(h3.ID(), h3.Addrs(), peerstore.PermanentAddrTTL) + if err = h1.Connect(ctx, peer.AddrInfo{ID: h3.ID(), Addrs: h3.Addrs()}); err != nil { + panic(err) + } + + // Wait until the nodes discover each other + for { + if len(h1.Peerstore().Peers()) == 4 && + len(h2.Peerstore().Peers()) == 4 && + len(h3.Peerstore().Peers()) == 4 { + break + } + select { + case <-ctx.Done(): + panic(ctx.Err()) + default: + time.Sleep(time.Second) + } + } + + h1Peers := h1.Peerstore().Peers() + fmt.Printf("Finally %s peerstore contains %d nodes:\n", h1.ID(), len(h1Peers)) + for _, id := range h1Peers { + fmt.Printf("- %s\n", id) + } + + h2Peers := h2.Peerstore().Peers() + fmt.Printf("Finally %s peerstore contains %d nodes:\n", h2.ID(), len(h2Peers)) + for _, id := range h2Peers { + fmt.Printf("- %s\n", id) + } + + h3Peers := h3.Peerstore().Peers() + fmt.Printf("Finally %s peerstore contains %d nodes:\n", h3.ID(), len(h3Peers)) + for _, id := range h3Peers { + fmt.Printf("- %s\n", id) + } + + //Manually adding h4 as it is not in the same pool + h1.Peerstore().AddAddrs(h4.ID(), h4.Addrs(), peerstore.PermanentAddrTTL) + if err = h1.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil { + panic(err) + } + //Manually adding h4 as it is not in the same pool + h2.Peerstore().AddAddrs(h4.ID(), h4.Addrs(), peerstore.PermanentAddrTTL) + if err = h2.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil { + panic(err) + } + //Manually adding h4 as it is not in the same pool + h3.Peerstore().AddAddrs(h4.ID(), h4.Addrs(), peerstore.PermanentAddrTTL) + if err = h3.Connect(ctx, peer.AddrInfo{ID: h4.ID(), Addrs: h4.Addrs()}); err != nil { + panic(err) + } + + // Wait until the fourth node discover others + for { + if len(h4.Peerstore().Peers()) >= 4 { + break + } else { + fmt.Printf("%s peerstore contains %d nodes:\n", h4.ID(), len(h4.Peerstore().Peers())) + } + select { + case <-ctx.Done(): + panic(ctx.Err()) + default: + time.Sleep(time.Second) + } + } + + //Store a link in h1 and find providers from h2 + + // Generate a sample DAG and store it on node 1 (n1) in the pool, which we will pull from n1 + n1leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) { + na.AssembleEntry("this").AssignBool(true) + }) + n1leafLink, err := n1.Store(ctx, n1leaf) + if err != nil { + panic(err) + } + n1Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) { + na.AssembleEntry("that").AssignInt(42) + na.AssembleEntry("oneLeafLink").AssignLink(n1leafLink) + }) + n1RootLink, err := n1.Store(ctx, n1Root) + if err != nil { + panic(err) + } + fmt.Printf("%s stored IPLD data with links:\n root: %s\n leaf:%s\n", h1.ID(), n1RootLink, n1leafLink) + + // Generate a sample DAG and store it on node 2 (n1) in the pool, which we will push to n1 + n2leaf := fluent.MustBuildMap(basicnode.Prototype.Map, 1, func(na fluent.MapAssembler) { + na.AssembleEntry("that").AssignBool(false) + }) + n2leafLink, err := n2.Store(ctx, n2leaf) + if err != nil { + panic(err) + } + n2Root := fluent.MustBuildMap(basicnode.Prototype.Map, 2, func(na fluent.MapAssembler) { + na.AssembleEntry("this").AssignInt(24) + na.AssembleEntry("anotherLeafLink").AssignLink(n2leafLink) + }) + n2RootLink, err := n2.Store(ctx, n2Root) + if err != nil { + panic(err) + } + fmt.Printf("%s stored IPLD data with links:\n root: %s\n leaf:%s\n", h1.ID(), n2RootLink, n2leafLink) + + //n1.UpdateDhtPeers(h2.Peerstore().Peers()) + //n2.UpdateDhtPeers(h1.Peerstore().Peers()) + + err = n1.ProvideLinkByDht(n1RootLink) + if err != nil { + fmt.Print("Error happened here") + panic(err) + } + peerlist1, err := n2.FindLinkProvidersByDht(n1RootLink) + if err != nil { + panic(err) + } + fmt.Println(peerlist1) + + // Unordered output: + // Instantiated node in pool 1 with ID: QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // Instantiated node in pool 1 with ID: QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF + // Instantiated node in pool 1 with ID: QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA + // Instantiated node in pool 0 with ID: QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe + // Finally QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT peerstore contains 4 nodes: + // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF + // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA + // - QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe + // Finally QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF peerstore contains 4 nodes: + // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF + // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA + // - QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe + // Finally QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA peerstore contains 4 nodes: + // - QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT + // - QmPNZMi2LAhczsN2FoXXQng6YFYbSHApuP6RpKuHbBH9eF + // - QmYMEnv3GUKPNr34gePX2qQmBH4YEQcuGhQHafuKuujvMA + // - QmUg1bGBZ1rSNt3LZR7kKf9RDy3JtJLZZDZGKrzSP36TMe + // [{QmaUMRTBMoANXqpUbfARnXkw9esfz9LP2AjXRRr7YknDAT: [/ip4/127.0.0.1/udp/62495/quic-v1/webtransport/certhash/uEiA8nNwsF3xgNvpbRn7TBQgFKFmTFHka5os9D8yXYaeK2Q/certhash/uEiCDw6zn5RGqUe6FHv_bz45QnLPkCoZDwNdx6wETFMx4pA /ip4/192.168.2.14/tcp/42433 /ip4/192.168.2.14/udp/62494/quic-v1 /ip6/::1/udp/62497/quic-v1/webtransport/certhash/uEiA8nNwsF3xgNvpbRn7TBQgFKFmTFHka5os9D8yXYaeK2Q/certhash/uEiCDw6zn5RGqUe6FHv_bz45QnLPkCoZDwNdx6wETFMx4pA /ip4/127.0.0.1/tcp/42433 /ip4/127.0.0.1/udp/62494/quic-v1 /ip4/192.168.2.14/udp/62495/quic-v1/webtransport/certhash/uEiA8nNwsF3xgNvpbRn7TBQgFKFmTFHka5os9D8yXYaeK2Q/certhash/uEiCDw6zn5RGqUe6FHv_bz45QnLPkCoZDwNdx6wETFMx4pA /ip6/::1/tcp/42434 /ip6/::1/udp/62496/quic /ip6/::1/udp/62496/quic-v1 /dns/relay.dev.fx.land/tcp/4001/p2p/12D3KooWDRrBaAfPwsGJivBoUw5fE7ZpDiyfUjqgiURq2DEcL835/p2p-circuit /ip4/127.0.0.1/udp/62494/quic /ip4/192.168.2.14/udp/62494/quic]}] +} diff --git a/exchange/fx_exchange.go b/exchange/fx_exchange.go index b04d3e3c..83c24e3a 100644 --- a/exchange/fx_exchange.go +++ b/exchange/fx_exchange.go @@ -57,6 +57,7 @@ type ( authorizedPeers map[peer.ID]struct{} authorizedPeersLock sync.RWMutex pub *ipniPublisher + dht *fulaDht } pushRequest struct { Link cid.Cid `json:"link"` @@ -95,19 +96,41 @@ func NewFxExchange(h host.Host, ls ipld.LinkSystem, o ...Option) (*FxExchange, e return nil, err } } - if !e.ipniPublishDisabled { - e.pub, err = newIpniPublisher(h, opts) - if err != nil { - return nil, err - } + //if !e.ipniPublishDisabled { + e.pub, err = newIpniPublisher(h, opts) + if err != nil { + return nil, err + } + //} + e.dht, err = newDhtProvider(h, opts) + if err != nil { + return nil, err } return e, nil } +func (e *FxExchange) UpdateDhtPeers(peers []peer.ID) error { + for _, ma := range peers { + e.dht.AddPeer(ma) + } + log.Infow("peers are set", "peers", peers) + return nil +} + func (e *FxExchange) GetAuth(ctx context.Context) (peer.ID, error) { return e.authorizer, nil } +// The below method is exposed for unit testing only +func (e *FxExchange) ProvideDht(l ipld.Link) error { + return e.dht.Provide(l) +} + +// The below method is exposed for unit testing only +func (e *FxExchange) FindProvidersDht(l ipld.Link) ([]peer.AddrInfo, error) { + return e.dht.FindProviders(l) +} + func (e *FxExchange) GetAuthorizedPeers(ctx context.Context) ([]peer.ID, error) { var peerList []peer.ID for peerId := range e.authorizedPeers { @@ -120,14 +143,20 @@ func (e *FxExchange) GetAuthorizedPeers(ctx context.Context) ([]peer.ID, error) return peerList, nil } +func (e *FxExchange) IpniNotifyLink(link ipld.Link) { + log.Debugw("Notifying link to IPNI publisher...", "link", link) + e.pub.notifyReceivedLink(link) + log.Debugw("Successfully notified link to IPNI publisher", "link", link) +} + func (e *FxExchange) Start(ctx context.Context) error { gsn := gsnet.NewFromLibp2pHost(e.h) e.gx = gs.New(ctx, gsn, e.ls) + if err := e.pub.Start(ctx); err != nil { + return err + } if !e.ipniPublishDisabled { - if err := e.pub.Start(ctx); err != nil { - return err - } e.gx.RegisterIncomingBlockHook(func(p peer.ID, responseData graphsync.ResponseData, blockData graphsync.BlockData, hookActions graphsync.IncomingBlockHookActions) { go func(link ipld.Link) { log.Debugw("Notifying link to IPNI publisher...", "link", link) @@ -374,11 +403,11 @@ func (e *FxExchange) authorized(pid peer.ID, action string) bool { } func (e *FxExchange) Shutdown(ctx context.Context) error { - if !e.ipniPublishDisabled { - if err := e.pub.shutdown(); err != nil { - log.Warnw("Failed to shutdown IPNI publisher gracefully", "err", err) - } + //if !e.ipniPublishDisabled { + if err := e.pub.shutdown(); err != nil { + log.Warnw("Failed to shutdown IPNI publisher gracefully", "err", err) } + //} e.c.CloseIdleConnections() return e.s.Shutdown(ctx) } diff --git a/exchange/interface.go b/exchange/interface.go index d4b1c351..012bc51c 100644 --- a/exchange/interface.go +++ b/exchange/interface.go @@ -13,4 +13,5 @@ type Exchange interface { Pull(context.Context, peer.ID, ipld.Link) error SetAuth(context.Context, peer.ID, peer.ID, bool) error Shutdown(context.Context) error + IpniNotifyLink(link ipld.Link) } diff --git a/exchange/noop_exchange.go b/exchange/noop_exchange.go index b4eca218..d8f7a7ab 100644 --- a/exchange/noop_exchange.go +++ b/exchange/noop_exchange.go @@ -35,3 +35,7 @@ func (n NoopExchange) Shutdown(context.Context) error { log.Debug("Shut down noop exchange.") return nil } + +func (n NoopExchange) IpniNotifyLink(l ipld.Link) { + log.Debugw("IpniNotifyLink noop exchange.", "link", l) +} diff --git a/exchange/options.go b/exchange/options.go index d62ed253..c61a205b 100644 --- a/exchange/options.go +++ b/exchange/options.go @@ -4,6 +4,7 @@ import ( "time" "github.com/ipni/index-provider/engine" + dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/peer" ) @@ -19,6 +20,7 @@ type ( ipniPublishChanBuffer int ipniPublishMaxBatchSize int ipniProviderEngineOpts []engine.Option + dhtProviderOpts []dht.Option updateConfig ConfigUpdater } ) @@ -103,3 +105,10 @@ func WithIpniProviderEngineOptions(e ...engine.Option) Option { return nil } } + +func WithDhtProviderOptions(d ...dht.Option) Option { + return func(o *options) error { + o.dhtProviderOpts = d + return nil + } +} diff --git a/go.mod b/go.mod index e26e6a5c..1fde9d84 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/ipfs/go-datastore v0.6.0 github.com/ipfs/go-ds-badger v0.3.0 github.com/ipfs/go-graphsync v0.14.6 - github.com/ipfs/go-ipld-format v0.4.0 + github.com/ipfs/go-ipld-format v0.5.0 github.com/ipfs/go-log/v2 v2.5.1 github.com/ipld/go-ipld-prime v0.20.0 github.com/ipni/go-libipni v0.2.4 @@ -19,8 +19,10 @@ require ( github.com/joho/godotenv v1.5.1 github.com/libp2p/go-libp2p v0.28.1 github.com/libp2p/go-libp2p-gostream v0.6.0 + github.com/libp2p/go-libp2p-kad-dht v0.25.0 github.com/libp2p/go-libp2p-pubsub v0.9.3 github.com/mdp/qrterminal v1.0.1 + github.com/mr-tron/base58 v1.2.0 github.com/multiformats/go-multiaddr v0.9.0 github.com/multiformats/go-multicodec v0.9.0 github.com/multiformats/go-multihash v0.2.3 @@ -86,8 +88,10 @@ require ( github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/hashicorp/golang-lru/v2 v2.0.2 // indirect github.com/huin/goupnp v1.2.0 // indirect + github.com/ipfs/boxo v0.10.0 // indirect github.com/ipfs/go-block-format v0.1.2 // indirect github.com/ipfs/go-ipfs-pq v0.0.3 // indirect github.com/ipfs/go-ipfs-util v0.0.2 // indirect @@ -108,6 +112,8 @@ require ( github.com/libp2p/go-cidranger v1.1.0 // indirect github.com/libp2p/go-flow-metrics v0.1.0 // indirect github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect + github.com/libp2p/go-libp2p-kbucket v0.6.3 // indirect + github.com/libp2p/go-libp2p-record v0.2.0 // indirect github.com/libp2p/go-msgio v0.3.0 // indirect github.com/libp2p/go-nat v0.2.0 // indirect github.com/libp2p/go-netroute v0.2.1 // indirect @@ -120,7 +126,6 @@ require ( github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect github.com/minio/sha256-simd v1.0.1 // indirect - github.com/mr-tron/base58 v1.2.0 // indirect github.com/multiformats/go-base32 v0.1.0 // indirect github.com/multiformats/go-base36 v0.2.0 // indirect github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect @@ -149,7 +154,9 @@ require ( github.com/twmb/murmur3 v1.1.6 // indirect github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect github.com/whyrusleeping/cbor-gen v0.0.0-20230418232409-daab9ece03a0 // indirect + github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect + go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/otel v1.16.0 // indirect go.opentelemetry.io/otel/metric v1.16.0 // indirect go.opentelemetry.io/otel/trace v1.16.0 // indirect @@ -165,6 +172,7 @@ require ( golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.15.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect + gonum.org/v1/gonum v0.13.0 // indirect google.golang.org/protobuf v1.30.0 // indirect lukechampine.com/blake3 v1.2.1 // indirect rsc.io/qr v0.2.0 // indirect diff --git a/go.sum b/go.sum index 36577bfa..bb42b772 100644 --- a/go.sum +++ b/go.sum @@ -59,6 +59,7 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/cilium/ebpf v0.2.0/go.mod h1:To2CFviqOWL/M0gIMsvSMlqe7em/l1ALkX1PyjrX2Qs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327/go.mod h1:ZJeTFisyysqgcCdecO57Dj79RfL0LNeGiFUqLYQRYLE= github.com/containerd/cgroups v1.1.0 h1:v8rEWFl6EoqHB+swVNjVoCJE8o3jX7e8nqBGPLaDFBM= github.com/containerd/cgroups v1.1.0/go.mod h1:6ppBcbh/NOOUU+dMKrykgaBnK9lCIBxHqJDGwsa1mIw= @@ -103,7 +104,9 @@ github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25Kn github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/4= github.com/elastic/gosigar v0.14.2/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/filecoin-project/go-address v1.1.0 h1:ofdtUtEsNxkIxkDw67ecSmvtzaVSdcea4boAmLbnHfE= github.com/filecoin-project/go-amt-ipld/v4 v4.0.0 h1:XM81BJ4/6h3FV0WfFjh74cIDIgqMbJsMBLM0fIuLUUk= @@ -173,6 +176,13 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= @@ -183,7 +193,9 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= @@ -199,6 +211,7 @@ github.com/google/pprof v0.0.0-20230602150820-91b7bce49751 h1:hR7/MlvK23p6+lIw9S github.com/google/pprof v0.0.0-20230602150820-91b7bce49751/go.mod h1:Jh3hGz2jkYak8qXPD19ryItVnUgpgeqzdkY/D0EaeuA= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY= @@ -228,6 +241,7 @@ github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9 github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= +github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/golang-lru/v2 v2.0.2 h1:Dwmkdr5Nc/oBiXgJS3CDHNhJtIHkuZ3DZF5twqnfBdU= github.com/hashicorp/golang-lru/v2 v2.0.2/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= @@ -236,6 +250,8 @@ github.com/huin/goupnp v1.2.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFck github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= +github.com/ipfs/boxo v0.10.0 h1:tdDAxq8jrsbRkYoF+5Rcqyeb91hgWe2hp7iLu7ORZLY= +github.com/ipfs/boxo v0.10.0/go.mod h1:Fg+BnfxZ0RPzR0nOodzdIq3A7KgoWAOWsEIImrIQdBM= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY= github.com/ipfs/go-block-format v0.1.2 h1:GAjkfhVx1f4YTODS6Esrj1wt2HhrtwTnhEr+DyPUaJo= @@ -280,9 +296,9 @@ github.com/ipfs/go-ipld-cbor v0.0.6 h1:pYuWHyvSpIsOOLw4Jy7NbBkCyzLDcl64Bf/LZW7eB github.com/ipfs/go-ipld-cbor v0.0.6/go.mod h1:ssdxxaLJPXH7OjF5V4NSjBbcfh+evoR4ukuru0oPXMA= github.com/ipfs/go-ipld-format v0.0.1/go.mod h1:kyJtbkDALmFHv3QR6et67i35QzO3S0dCDnkOJhcZkms= github.com/ipfs/go-ipld-format v0.0.2/go.mod h1:4B6+FM2u9OJ9zCV+kSbgFAZlOrv1Hqbf0INGQgiKf9k= -github.com/ipfs/go-ipld-format v0.4.0 h1:yqJSaJftjmjc9jEOFYlpkwOLVKv68OD27jFLlSghBlQ= -github.com/ipfs/go-ipld-format v0.4.0/go.mod h1:co/SdBE8h99968X0hViiw1MNlh6fvxxnHpvVLnH7jSM= -github.com/ipfs/go-ipld-legacy v0.1.1 h1:BvD8PEuqwBHLTKqlGFTHSwrwFOMkVESEvwIYwR2cdcc= +github.com/ipfs/go-ipld-format v0.5.0 h1:WyEle9K96MSrvr47zZHKKcDxJ/vlpET6PSiQsAFO+Ds= +github.com/ipfs/go-ipld-format v0.5.0/go.mod h1:ImdZqJQaEouMjCvqCe0ORUS+uoBmf7Hf+EO/jh+nk3M= +github.com/ipfs/go-ipld-legacy v0.2.1 h1:mDFtrBpmU7b//LzLSypVrXsD8QxkEWxu5qVxN99/+tk= github.com/ipfs/go-libipfs v0.7.0 h1:Mi54WJTODaOL2/ZSm5loi3SwI3jI2OuFWUrQIkJ5cpM= github.com/ipfs/go-log v1.0.0/go.mod h1:JO7RzlMK6rA+CIxFMLOuB6Wf5b81GDiKElL7UPSIKjA= github.com/ipfs/go-log v1.0.1/go.mod h1:HuWlQttfN6FWNHRhlY5yMk/lW7evQC0HHGOxEwMRR8I= @@ -298,7 +314,7 @@ github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fG github.com/ipfs/go-peertaskqueue v0.8.1 h1:YhxAs1+wxb5jk7RvS0LHdyiILpNmRIRnZVztekOF0pg= github.com/ipfs/go-peertaskqueue v0.8.1/go.mod h1:Oxxd3eaK279FxeydSPPVGHzbwVeHjatZ2GA8XD+KbPU= github.com/ipfs/go-unixfs v0.4.5 h1:wj8JhxvV1G6CD7swACwSKYa+NgtdWC1RUit+gFnymDU= -github.com/ipfs/go-unixfsnode v1.6.0 h1:JOSA02yaLylRNi2rlB4ldPr5VcZhcnaIVj5zNLcOjDo= +github.com/ipfs/go-unixfsnode v1.7.1 h1:RRxO2b6CSr5UQ/kxnGzaChTjp5LWTdf3Y4n8ANZgB/s= github.com/ipfs/go-verifcid v0.0.2 h1:XPnUv0XmdH+ZIhLGKg6U2vaPaRDXb9urMyNVCE7uvTs= github.com/ipld/go-car/v2 v2.10.0 h1:0Wrt0uk3IoBge1PjEokXsS1eOX6v8QxeTxjPQ9TH71M= github.com/ipld/go-car/v2 v2.10.0/go.mod h1:mBZ4d86IKvL7eKhNHhQgywQ5coZHAGhmG1P+cMrdby8= @@ -363,8 +379,14 @@ github.com/libp2p/go-libp2p-asn-util v0.3.0 h1:gMDcMyYiZKkocGXDQ5nsUQyquC9+H+iLE github.com/libp2p/go-libp2p-asn-util v0.3.0/go.mod h1:B1mcOrKUE35Xq/ASTmQ4tN3LNzVVaMNmq2NACuqyB9w= github.com/libp2p/go-libp2p-gostream v0.6.0 h1:QfAiWeQRce6pqnYfmIVWJFXNdDyfiR/qkCnjyaZUPYU= github.com/libp2p/go-libp2p-gostream v0.6.0/go.mod h1:Nywu0gYZwfj7Jc91PQvbGU8dIpqbQQkjWgDuOrFaRdA= +github.com/libp2p/go-libp2p-kad-dht v0.25.0 h1:T2SXQ/VlXTQVLChWY/+OyOsmGMRJvB5kiR+eJt7jtvI= +github.com/libp2p/go-libp2p-kad-dht v0.25.0/go.mod h1:P6fz+J+u4tPigvS5J0kxQ1isksqAhmXiS/pNaEw/nFI= +github.com/libp2p/go-libp2p-kbucket v0.6.3 h1:p507271wWzpy2f1XxPzCQG9NiN6R6lHL9GiSErbQQo0= +github.com/libp2p/go-libp2p-kbucket v0.6.3/go.mod h1:RCseT7AH6eJWxxk2ol03xtP9pEHetYSPXOaJnOiD8i0= github.com/libp2p/go-libp2p-pubsub v0.9.3 h1:ihcz9oIBMaCK9kcx+yHWm3mLAFBMAUsM4ux42aikDxo= github.com/libp2p/go-libp2p-pubsub v0.9.3/go.mod h1:RYA7aM9jIic5VV47WXu4GkcRxRhrdElWf8xtyli+Dzc= +github.com/libp2p/go-libp2p-record v0.2.0 h1:oiNUOCWno2BFuxt3my4i1frNrt7PerzB3queqa1NkQ0= +github.com/libp2p/go-libp2p-record v0.2.0/go.mod h1:I+3zMkvvg5m2OcSdoL0KPljyJyvNDFGKX7QdlpYUcwk= github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA= github.com/libp2p/go-msgio v0.3.0 h1:mf3Z8B1xcFN314sWX+2vOTShIE0Mmn2TXn3YCUQGNj0= github.com/libp2p/go-msgio v0.3.0/go.mod h1:nyRM819GmVaF9LX3l03RMh10QdOroF++NBbxAb0mmDM= @@ -557,12 +579,17 @@ github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb6 github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA= @@ -590,6 +617,8 @@ github.com/whyrusleeping/cbor-gen v0.0.0-20200826160007-0b9f6c5fb163/go.mod h1:f github.com/whyrusleeping/cbor-gen v0.0.0-20230418232409-daab9ece03a0 h1:XYEgH2nJgsrcrj32p+SAbx6T3s/6QknOXezXtz7kzbg= github.com/whyrusleeping/cbor-gen v0.0.0-20230418232409-daab9ece03a0/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f h1:jQa4QT2UP9WYv2nzyawpKMOCl+Z/jW7djv2/J50lj9E= +github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k= +github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= @@ -602,6 +631,8 @@ go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= +go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s= go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4= go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo= @@ -714,6 +745,7 @@ golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= @@ -844,6 +876,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= +gonum.org/v1/gonum v0.13.0 h1:a0T3bh+7fhRyqeNbiC3qVHYmkiQgit3wnNan/2c0HMM= +gonum.org/v1/gonum v0.13.0/go.mod h1:/WPYRckkfWrhWefxyYTfrTtQR0KH4iyHNuzxqXAKyAU= google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/api v0.0.0-20181030000543-1d582fd0359e/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0= google.golang.org/api v0.1.0/go.mod h1:UGEZY7KEX120AnNLIHFMKIo4obdJhkp2tPbaPlQx13Y= @@ -879,6 +913,7 @@ google.golang.org/genproto v0.0.0-20191115194625-c23dd37a84c9/go.mod h1:n3cpQtvx google.golang.org/genproto v0.0.0-20191216164720-4f79533eabd1/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/genproto v0.0.0-20200212174721-66ed5ce911ce/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= @@ -886,9 +921,20 @@ google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= diff --git a/mobile/client.go b/mobile/client.go index 226b5398..d0a8168c 100644 --- a/mobile/client.go +++ b/mobile/client.go @@ -19,6 +19,7 @@ import ( basicnode "github.com/ipld/go-ipld-prime/node/basic" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/mr-tron/base58" "github.com/multiformats/go-multicodec" ) @@ -113,6 +114,14 @@ func toLink(key []byte) (cidlink.Link, error) { return cidlink.Link{Cid: cc}, nil } +func toLinkFromString(l string) (cidlink.Link, error) { + decodedBytes, err := base58.Decode(l) + if err != nil { + return cidlink.Link{}, err + } + return toLink(decodedBytes) +} + // Pull downloads the data corresponding to the given key from blox at Config.BloxAddr. // The key must be a valid ipld.Link. func (c *Client) Pull(key []byte) error { @@ -271,6 +280,13 @@ func (c *Client) SetAuth(on string, subject string, allow bool) error { return c.ex.SetAuth(context.TODO(), onp, subp, allow) } +func (c *Client) IpniNotifyLink(l string) { + link, err := toLinkFromString(l) + if err == nil { + c.ex.IpniNotifyLink(link) + } +} + // Shutdown closes all resources used by Client. // After calling this function Client must be discarded. func (c *Client) Shutdown() error { diff --git a/mobile/config.go b/mobile/config.go index c91fae10..91b18359 100644 --- a/mobile/config.go +++ b/mobile/config.go @@ -9,14 +9,17 @@ import ( "github.com/functionland/go-fula/blockchain" "github.com/functionland/go-fula/exchange" "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" dssync "github.com/ipfs/go-datastore/sync" badger "github.com/ipfs/go-ds-badger" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/libp2p/go-libp2p" + dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/host/autorelay" "github.com/multiformats/go-multiaddr" ) @@ -54,6 +57,7 @@ type Config struct { // AllowTransientConnection allows transient connectivity via relay when direct connection is // not possible. Defaults to enabled if unspecified. AllowTransientConnection bool + PoolName string // TODO: we don't need to take BloxAddr when there is a discovery mechanism facilitated via fx.land. // For now we manually take BloxAddr as config. @@ -65,6 +69,7 @@ func NewConfig() *Config { StaticRelays: []string{devRelay}, ForceReachabilityPrivate: true, AllowTransientConnection: true, + PoolName: "0", } } @@ -167,6 +172,12 @@ func (cfg *Config) init(mc *Client) error { exchange.WithAuthorizer(mc.h.ID()), exchange.WithAllowTransientConnection(cfg.AllowTransientConnection), exchange.WithIpniPublishDisabled(true), + exchange.WithDhtProviderOptions( + dht.Datastore(namespace.Wrap(mc.ds, datastore.NewKey("dht"))), + dht.ProtocolExtension(protocol.ID("/"+cfg.PoolName)), + dht.ProtocolPrefix("/fula"), + dht.Resiliency(1), + ), ) if err != nil { return err