RPC即远程过程调用,是一种计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外的为这个交互作用编程。
假如我们有一个应用,刚开始的时候业务很简单,只需要一台计算机就能完全独立运行,但是随着业务的发展,业务逐渐增多,这个时候我们需要对我们的业务重新进行划分解耦,解耦完成后,一台计算机已经无法解决那么多的业务,因此就需要将不同的业务部署到不同的计算机上,让一台计算机对应一个服务,因为有些业务之间存在联系,便导致不同的计算机该怎么相互调用呢?,此时便需要我们使用RPC。 总结来说,RPC解决了分布式系统中服务与服务之间的调用问题。
- 服务注册中心,负责将本地的服务发布成远程的服务,管理远程服务,提供给消费者使用。
- 服务提供者,主要负责提供服务的接口和服务的实现
- 服务的消费者,调用远程的服务
- 客户端通过本地的方式调用服务
- stub接收到调用后,负责传输
- 客户端通过socket将消息发送至server
- stub收到消息进行解码
- stub开始调用本地服务
- 服务端将结果给stub
- stub进行发送
- 通过socket发送给客户端
- 客户端收到消息解码
- 得到最终结果
RPC则是将2~9这些步骤进行封装,使调用更加方便。
首先需要安装rpcx:
go get -u -v -tags "reuseport quic kcp zookeeper etcd consul ping" github.com/smallnest/rpcx/...
**tags **对应:
- quic: 支持 quic 协议
- kcp: 支持 kcp 协议
- zookeeper: 支持 zookeeper 注册中心
- etcd: 支持 etcd 注册中心
- consul: 支持 consul 注册中心
- ping: 支持 网络质量负载均衡
- reuseport: 支持 reuseport
作为服务提供者,首先你需要定义服务。且这个服务必须满足以下条件
- 必须是可导出类型的方法
- 接受3个参数,第一个是
context.Context
类型,其他2个都是可导出(或内置)的类型。 - 第3个参数是一个指针
- 有一个 error 类型的返回值
package RPC
import (
"context"
"fmt"
)
//作为调用服务的参数
type Args struct {
A int
B int
}
//作为服务端返回结果
type Reply struct {
C int
}
type Arith int
func (t *Arith) Mul(ctx context.Context, args *Args, reply *Reply) error {
reply.C = args.A * args.B
fmt.Printf("call: %d * %d = %d\n", args.A, args.B, reply.C)
return nil
}
暴露服务即是启动一个TCP或者UDP服务器来监听我们的服务器。代码解释如下:
package main
import (
"context"
"flag"
example "pratice/RPC"
"github.com/smallnest/rpcx/server"
)
var (
addr = flag.String("addr", "localhost:8972", "server address")
)
type Arith struct{}
// the second parameter is not a pointer
func (t *Arith) Mul(ctx context.Context, args example.Args, reply *example.Reply) error {
reply.C = args.A * args.B
return nil
}
func main() {
flag.Parse()
//创建一个服务器实例
s := server.NewServer()
//注册我们的服务
s.RegisterName("Arith", new(Arith), "")
//监听服务
err := s.Serve("tcp", *addr)
if err != nil {
panic(err)
}
}
- 客户端进行服务的发现和治理
客户端对服务的发现有多种方式
- Peer to Peer: 客户端直连每个服务节点。 the client connects the single service directly. It acts like the
client
type. - Peer to Multiple: 客户端可以连接多个服务。服务可以被编程式配置。
- Zookeeper: 通过 zookeeper 寻找服务。
- Etcd: 通过 etcd 寻找服务。
- Consul: 通过 consul 寻找服务。
- mDNS: 通过 mDNS 寻找服务(支持本地服务发现)。
- In process: 在同一进程寻找服务。客户端通过进程调用服务,不走TCP或UDP,方便调试使用。
- 调用服务
一个XCLinet只对一个服务负责,它可以通过serviceMethod参数来调用这个服务的所有方法。如果你想调用多个服务,你必须为每个服务创建一个XClient。
type XClient interface {
//用来设置 Plugin 容器, Auth 可以用来设置鉴权token。
SetPlugins(plugins PluginContainer)
ConfigGeoSelector(latitude, longitude float64)
Auth(auth string)
//Go 代表异步调用,
Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *Call) (*Call, error)
// Call 代表同步调用。
Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error
Close() error
}
客户端代码示例
package main
import (
"context"
"flag"
"log"
example "pratice/RPC"
"github.com/smallnest/rpcx/client"
)
var (
addr = flag.String("addr", "localhost:8972", "server address")
)
func main() {
flag.Parse()
//客户端直连每个服务节点。
d := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
//xclient是对客户端的封装,返回一个客户端提供服务的发现和治理
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
args := example.Args{
A: 10,
B: 20,
}
reply := &example.Reply{}
//服务的调用
err := xclient.Call(context.Background(), "Mul", args, reply)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
log.Printf("%d * %d = %d", args.A, args.B, reply.C)
}