前言
开源社区活跃着很多RPC框架,例如rpcx,tars等等,用于在不同场景中实现不同的需求。
本篇我们尝试深入理解Go原生的RPC包,研究它在一些特殊场景下的用法。
rpc客户端的实现原理
俗话说,知其然,更要知其所以然。前两篇博文我们已经通过net/rpc包写过rpc服务,并且实现了客户端来调用服务接口。可这个客户端是怎么完成服务调用的呢?我们可以通过它的源码一窥其原理。
同步阻塞调用
Go 的 rpc 包最简单的使用方式是通过 Client.Call 方法进行同步阻塞调用,该方法的实现如下:
1
2
3
4
5
6
7
|
func (client *Client) Call(
serviceMethod string, args interface{},
reply interface{},
) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
|
通过 Client.Go 方法进行一次异步调用,返回一个表示这次调用的 Call 结构体。然后等待 Call 结构体的 Done 管道返回调用结果。
异步调用
执行异步调用的Client.Go方法是怎么实现的呢?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func (client *Client) Go(
serviceMethod string, args interface{},
reply interface{},
done chan *Call,
) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
call.Done = make(chan *Call, 10) // buffered.
client.send(call)
return call
}
|
源码很简单,分为两步:
- 构造一个表示当前调用的 call 变量;
- 通过
client.send 将 call 的完整参数发送到 RPC 框架。
client.send 方法调用是线程安全的,可以从多个 Goroutine 同时向同一个 RPC 连接发送调用指令。
当调用完成或者发生错误时,即调用 call.done 方法通知完成:
1
2
3
4
5
6
7
8
9
|
func (call *Call) done() {
select {
case call.Done <- call:
// ok
default:
// We don't want to block here. It is the caller's responsibility to make
// sure the channel has enough buffer space. See comment in Go().
}
}
|
可以看到,call.Done 管道会将处理后的 call 返回。
用户侧异步调用
了解了Client的调用原理,我们就可以通过 Client.Go 方法异步调用前面的 HelloService 服务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func doClientWork(client *rpc.Client) {
helloCall := client.Go("HelloService.Hello", "hello", new(string), nil)
// do some thing
helloCall = <-helloCall.Done
if err := helloCall.Error; err != nil {
log.Fatal(err)
}
args := helloCall.Args.(string)
reply := helloCall.Reply.(*string)
fmt.Println(args, *reply)
}
|
在异步调用命令发出后,一般会执行其他的任务,等异步调用结束,输入参数和返回值可以通过返回的 Call 变量进行获取。
基于RPC实现watch功能
服务简介
watch是一个用于监视数据变化的功能。它可以监听一个或多个数据,并在数据发生变化时执行开发人员预定义好的逻辑。
很多系统都提供了类似功能的接口,当系统满足某种条件时, 接口返回监控的结果。
这一节我们尝试通过 RPC 框架实现一个基本的 Watch 功能。如前文所描述,因为 client.send 是线程安全的,我们可以在不同的 Goroutine 中同时并发阻塞调用 RPC 方法。通过在一个独立的 Goroutine 中调用 Watch 函数进行监控。
一个栗子
为了便于演示,我们通过 RPC 构造一个简单的内存 KV 数据库。除了数据库必备的Get和Set方法,我们额外提供一个Watch接口,客户端可以通过调用这个接口,来监听Key的变化。
KVStore服务
服务定义我们参考系列博客第一篇,这里为了演示我们直接构造最简单的rpc服务,你可以自行尝试把它封装成更安全的RPC接口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
type KVStoreService struct {
// m 用于存储 KV 数据
m map[string]string
// filter 对应每个Watch调用时定义的过滤器函数列表
filter map[string]func(key string)
// 多个Goroutine并发访问m时的互斥锁
mu sync.Mutex
}
func NewKVStoreService() *KVStoreService {
return &KVStoreService{
m: make(map[string]string),
filter: make(map[string]func(key string)),
}
}
|
Get接口
既然是K-V数据库,那我们自然要提供一个Get接口用于获取内存数据。注意接口定义必须满足Go的RPC规则:
1
2
3
4
5
6
7
8
9
10
11
|
func (p *KVStoreService) Get(key string, value *string) error {
p.mu.Lock()
defer p.mu.Unlock()
if v, ok := p.m[key]; ok {
*value = v
return nil
}
return fmt.Errorf("not found")
}
|
Set接口
同样的,我们需要提供一个Set接口用于往K-V数据库中写入数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func (p *KVStoreService) Set(kv [2]string, reply *struct{}) error {
p.mu.Lock()
defer p.mu.Unlock()
key, value := kv[0], kv[1]
if oldValue := p.m[key]; oldValue != value {
for _, fn := range p.filter {
fn(key)
}
}
p.m[key] = value
return nil
}
|
大致的实现思路如下:
- 将 key 和 value 组成的
数组作为输入参数传入接口;
- 简单起见,
Set接口不需要返回数据。用一个匿名的空结构体表示忽略了输出参数,同时又能保证不会有额外的内存开销;
- 我们主要用于监听
key的变动,所以修改某个 key 对应的值时,需要调用每一个过滤器函数,逻辑就是将变动的key写入管道回给客户端。
Watch接口
客户端可以通过这个接口,在一定时间范围内监听key的变化。另外,过滤器列表也在 Watch 方法中提供:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
func (p *KVStoreService) Watch(timeoutSecond int, keyChanged *string) error {
id := fmt.Sprintf("watch-%s-%03d", time.Now(), rand.Int())
ch := make(chan string, 10) // buffered
p.mu.Lock()
// 过滤器函数非自定义 当有key变动时就把key写入ch
p.filter[id] = func(key string) { ch <- key }
p.mu.Unlock()
select {
case <-time.After(time.Duration(timeoutSecond) * time.Second):
return fmt.Errorf("timeout")
case key := <-ch:
*keyChanged = key
return nil
}
}
|
Watch 方法的输入参数是超时的秒数,返回值有两种情况:
- 将变化的 key 作为返回值返回;
- 如果超过时间内没有 key 被修改,则返回
超时错误。
Watch 的实现中,用唯一 id 表示每个 Watch 调用,然后根据 id 将自身对应的过滤器函数注册到 p.filter 列表。
之所以叫“过滤器”,应该是支持自定义,可以将某些key过滤掉,即使有变动也不通知客户端。
测试
KV数据库服务的注册和启动过程不再赘述,可自行实现或参考完整代码。
我们按顺序启动5个client:
- watch1和watch2分别调用watch接口,监听key的变化。同步的 watch 调用会阻塞,直到有 key 发生变化或者超时;
- setKey调用Set接口设置KV,服务器会将变化的 key 通过 Watch 方法返回;
- getKey调用Get接口测试内存数据库有没有正常工作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
// getKey.go
var v string
err = client.Call("KVStoreService.Get", "蜡笔小新主角", &v)
if err != nil {
log.Fatal(err)
}
fmt.Println("key's v|", v)
// setKey.go
err = client.Call(
"KVStoreService.Set", [2]string{"蜡笔小新主角", "风间"},
new(struct{}),
)
// watch1 & watch2
var keyChanged string
err = client.Call("KVStoreService.Watch", 30, &keyChanged)
if err != nil {
log.Fatal(err)
}
fmt.Println("watch:", keyChanged)
|
以上。我们就实现了对系统某些状态的监控。
反向 RPC
通常的 RPC 是C/S 结构,RPC 的服务端对应网络的服务器,RPC 的客户端也对应网络客户端。但对于一些特殊场景,比如在公司内网提供一个 RPC 服务,但是在外网无法连接到内网的服务器。这种时候我们可以参考类似反向代理的技术,首先从内网主动连接到外网的 TCP 服务器,然后基于 TCP 连接向外网提供 RPC 服务。
todo: 怎么模拟这种情况呢?s能ping通c,但c没办法ping通s…
server
以下是启动反向 RPC 服务的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func main() {
rpc.Register(new(HelloService))
for {
conn, _ := net.Dial("tcp", "localhost:1234")
if conn == nil {
time.Sleep(time.Second)
continue
}
rpc.ServeConn(conn)
conn.Close()
}
}
|
反向 RPC 的内网服务将不再主动提供 TCP 监听服务,而是首先主动连接到对方的 TCP 服务器。然后基于每个建立的 TCP 连接向对方提供 RPC 服务。
client
反向 RPC 客户端则需要在一个公共的地址提供一个 TCP 服务,用于接受 RPC 服务器的连接请求:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
func main() {
listener, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal("ListenTCP error:", err)
}
clientChan := make(chan *rpc.Client)
go func() {
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal("Accept error:", err)
}
clientChan <- rpc.NewClient(conn)
}
}()
doClientWork(clientChan)
}
|
当每个连接建立后,基于网络连接构造 RPC 客户端对象并发送到 clientChan 管道,这是个无缓冲通道。
客户端执行 RPC 调用的操作在 doClientWork 函数完成:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func doClientWork(clientChan <-chan *rpc.Client) {
// 从管道取出一个 RPC 客户端对象
client := <-clientChan
// 在函数退出前关闭客户端
defer client.Close()
// 执行 RPC 调用
var reply string
err := client.Call("HelloService.Hello", "hello", &reply)
if err != nil {
log.Fatal(err)
}
fmt.Println(reply)
}
|
上下文信息
有时候,我们希望针对不同客户端提供定制化的 RPC 服务。这就需要通过为每个连接提供独立的 RPC 服务,来实现对上下文特性的支持。
首先改造 HelloService,里面增加了对应连接的 conn 成员:
1
2
3
|
type HelloService struct {
conn net.Conn
}
|
然后为每个连接启动独立的 RPC 服务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
func main() {
listener, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal("ListenTCP error:", err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal("Accept error:", err)
}
go func() {
defer conn.Close()
p := rpc.NewServer()
p.Register(&HelloService{conn: conn})
p.ServeConn(conn)
} ()
}
}
|
Hello 方法中就可以根据 conn 成员识别不同连接的 RPC 调用:
1
2
3
4
|
func (p *HelloService) Hello(request string, reply *string) error {
*reply = "hello:" + request + ", from" + p.conn.RemoteAddr().String()
return nil
}
|
基于上下文信息,我们可以方便地为 RPC 服务增加简单的登陆状态的验证:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
type HelloService struct {
conn net.Conn
isLogin bool
}
func (p *HelloService) Login(request string, reply *string) error {
if request != "user:password" {
return fmt.Errorf("auth failed")
}
log.Println("login ok")
p.isLogin = true
return nil
}
func (p *HelloService) Hello(request string, reply *string) error {
if !p.isLogin {
return fmt.Errorf("please login")
}
*reply = "hello:" + request + ", from" + p.conn.RemoteAddr().String()
return nil
}
|
这样可以要求在客户端连接 RPC 服务时,首先要执行登陆操作,登陆成功后才能正常执行其他的服务。