前言
开源社区活跃着很多RPC框架
,例如rpcx,tars等等,用于在不同场景中实现不同的需求。
本篇我们尝试深入理解Go原生的RPC包,研究它在一些特殊场景下的用法。
rpc客户端的实现原理
俗话说,知其然,更要知其所以然。前两篇博文我们已经通过net/rpc
包写过rpc服务
,并且实现了客户端
来调用服务接口。可这个客户端是怎么完成服务调用的呢?我们可以通过它的源码一窥其原理。
同步阻塞调用
Go 的 rpc 包最简单的使用方式是通过 Client.Call
方法进行同步阻塞调用
,该方法的实现如下:
1
2
3
4
5
6
7
|
func (client *Client) Call(
serviceMethod string, args interface{},
reply interface{},
) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
|
通过 Client.Go
方法进行一次异步调用
,返回一个表示这次调用的 Call
结构体。然后等待 Call
结构体的 Done
管道返回调用结果。
异步调用
执行异步调用的Client.Go
方法是怎么实现的呢?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func (client *Client) Go(
serviceMethod string, args interface{},
reply interface{},
done chan *Call,
) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
call.Done = make(chan *Call, 10) // buffered.
client.send(call)
return call
}
|
源码很简单,分为两步
:
- 构造一个表示当前调用的 call 变量;
- 通过
client.send
将 call 的完整参数发送到 RPC 框架。
client.send
方法调用是线程安全
的,可以从多个 Goroutine 同时向同一个 RPC 连接发送调用指令。
当调用完成或者发生错误时,即调用 call.done
方法通知完成:
1
2
3
4
5
6
7
8
9
|
func (call *Call) done() {
select {
case call.Done <- call:
// ok
default:
// We don't want to block here. It is the caller's responsibility to make
// sure the channel has enough buffer space. See comment in Go().
}
}
|
可以看到,call.Done
管道会将处理后的 call 返回。
用户侧异步调用
了解了Client的调用原理,我们就可以通过 Client.Go
方法异步调用
前面的 HelloService 服务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func doClientWork(client *rpc.Client) {
helloCall := client.Go("HelloService.Hello", "hello", new(string), nil)
// do some thing
helloCall = <-helloCall.Done
if err := helloCall.Error; err != nil {
log.Fatal(err)
}
args := helloCall.Args.(string)
reply := helloCall.Reply.(*string)
fmt.Println(args, *reply)
}
|
在异步调用
命令发出后,一般会执行其他的任务,等异步调用结束,输入参数和返回值可以通过返回的 Call
变量进行获取。
基于RPC实现watch功能
服务简介
watch
是一个用于监视数据变化
的功能。它可以监听
一个或多个数据,并在数据发生变化时
执行开发人员预定义好的逻辑。
很多系统都提供了类似功能的接口,当系统满足某种条件
时, 接口返回监控的结果。
这一节我们尝试通过 RPC 框架实现一个基本的 Watch 功能。如前文所描述,因为 client.send
是线程安全的,我们可以在不同的 Goroutine 中同时并发阻塞调用 RPC 方法。通过在一个独立的 Goroutine 中调用 Watch 函数进行监控。
一个栗子
为了便于演示,我们通过 RPC 构造一个简单的内存 KV 数据库
。除了数据库必备的Get
和Set
方法,我们额外提供一个Watch
接口,客户端可以通过调用这个接口,来监听Key的变化。
KVStore服务
服务定义
我们参考系列博客第一篇,这里为了演示我们直接构造最简单的rpc服务,你可以自行尝试把它封装成更安全
的RPC接口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
type KVStoreService struct {
// m 用于存储 KV 数据
m map[string]string
// filter 对应每个Watch调用时定义的过滤器函数列表
filter map[string]func(key string)
// 多个Goroutine并发访问m时的互斥锁
mu sync.Mutex
}
func NewKVStoreService() *KVStoreService {
return &KVStoreService{
m: make(map[string]string),
filter: make(map[string]func(key string)),
}
}
|
Get接口
既然是K-V数据库,那我们自然要提供一个Get接口
用于获取内存数据
。注意接口定义必须满足Go的RPC规则
:
1
2
3
4
5
6
7
8
9
10
11
|
func (p *KVStoreService) Get(key string, value *string) error {
p.mu.Lock()
defer p.mu.Unlock()
if v, ok := p.m[key]; ok {
*value = v
return nil
}
return fmt.Errorf("not found")
}
|
Set接口
同样的,我们需要提供一个Set接口
用于往K-V数据库中写入数据
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func (p *KVStoreService) Set(kv [2]string, reply *struct{}) error {
p.mu.Lock()
defer p.mu.Unlock()
key, value := kv[0], kv[1]
if oldValue := p.m[key]; oldValue != value {
for _, fn := range p.filter {
fn(key)
}
}
p.m[key] = value
return nil
}
|
大致的实现思路
如下:
- 将 key 和 value 组成的
数组
作为输入参数
传入接口;
- 简单起见,
Set接口
不需要返回数据。用一个匿名的空结构体
表示忽略
了输出参数,同时又能保证不会有额外的内存开销;
- 我们主要用于监听
key
的变动,所以修改某个 key 对应的值时,需要调用每一个过滤器函数
,逻辑就是将变动的key写入管道
回给客户端。
Watch接口
客户端可以通过这个接口,在一定时间范围
内监听key的变化
。另外,过滤器列表也在 Watch 方法中提供:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
func (p *KVStoreService) Watch(timeoutSecond int, keyChanged *string) error {
id := fmt.Sprintf("watch-%s-%03d", time.Now(), rand.Int())
ch := make(chan string, 10) // buffered
p.mu.Lock()
// 过滤器函数非自定义 当有key变动时就把key写入ch
p.filter[id] = func(key string) { ch <- key }
p.mu.Unlock()
select {
case <-time.After(time.Duration(timeoutSecond) * time.Second):
return fmt.Errorf("timeout")
case key := <-ch:
*keyChanged = key
return nil
}
}
|
Watch 方法的输入参数
是超时的秒数,返回值有两种情况:
- 将变化的 key 作为返回值返回;
- 如果超过时间内没有 key 被修改,则返回
超时错误
。
Watch 的实现中,用唯一 id
表示每个 Watch 调用,然后根据 id 将自身对应的过滤器函数注册到 p.filter
列表。
之所以叫“过滤器”,应该是支持自定义
,可以将某些key过滤掉,即使有变动也不通知客户端。
测试
KV数据库服务的注册和启动过程不再赘述,可自行实现或参考完整代码。
我们按顺序启动5个client:
- watch1和watch2分别调用watch接口,监听key的变化。同步的 watch 调用会阻塞,直到有 key 发生变化或者超时;
- setKey调用Set接口设置KV,服务器会将变化的 key 通过 Watch 方法返回;
- getKey调用Get接口测试内存数据库有没有正常工作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
// getKey.go
var v string
err = client.Call("KVStoreService.Get", "蜡笔小新主角", &v)
if err != nil {
log.Fatal(err)
}
fmt.Println("key's v|", v)
// setKey.go
err = client.Call(
"KVStoreService.Set", [2]string{"蜡笔小新主角", "风间"},
new(struct{}),
)
// watch1 & watch2
var keyChanged string
err = client.Call("KVStoreService.Watch", 30, &keyChanged)
if err != nil {
log.Fatal(err)
}
fmt.Println("watch:", keyChanged)
|
以上。我们就实现了对系统某些状态的监控
。
反向 RPC
通常的 RPC 是C/S 结构
,RPC 的服务端对应网络的服务器,RPC 的客户端也对应网络客户端。但对于一些特殊场景
,比如在公司内网提供一个 RPC 服务,但是在外网无法连接
到内网的服务器。这种时候我们可以参考类似反向代理
的技术,首先从内网主动连接
到外网的 TCP 服务器,然后基于 TCP 连接向外网提供 RPC 服务。
todo: 怎么模拟这种情况呢?s能ping通c,但c没办法ping通s…
server
以下是启动反向 RPC 服务的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func main() {
rpc.Register(new(HelloService))
for {
conn, _ := net.Dial("tcp", "localhost:1234")
if conn == nil {
time.Sleep(time.Second)
continue
}
rpc.ServeConn(conn)
conn.Close()
}
}
|
反向 RPC 的内网服务将不再主动提供 TCP 监听服务,而是首先主动连接
到对方的 TCP 服务器。然后基于每个建立的 TCP 连接向对方提供 RPC 服务。
client
反向 RPC 客户端则需要在一个公共的地址
提供一个 TCP 服务,用于接受 RPC 服务器的连接请求:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
func main() {
listener, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal("ListenTCP error:", err)
}
clientChan := make(chan *rpc.Client)
go func() {
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal("Accept error:", err)
}
clientChan <- rpc.NewClient(conn)
}
}()
doClientWork(clientChan)
}
|
当每个连接建立后,基于网络连接构造 RPC 客户端对象并发送到 clientChan 管道,这是个无缓冲通道
。
客户端执行 RPC 调用的操作在 doClientWork 函数完成:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func doClientWork(clientChan <-chan *rpc.Client) {
// 从管道取出一个 RPC 客户端对象
client := <-clientChan
// 在函数退出前关闭客户端
defer client.Close()
// 执行 RPC 调用
var reply string
err := client.Call("HelloService.Hello", "hello", &reply)
if err != nil {
log.Fatal(err)
}
fmt.Println(reply)
}
|
上下文信息
有时候,我们希望针对不同客户端提供定制化的 RPC 服务
。这就需要通过为每个连接提供独立的 RPC 服务,来实现对上下文特性
的支持。
首先改造 HelloService,里面增加了对应连接的 conn
成员:
1
2
3
|
type HelloService struct {
conn net.Conn
}
|
然后为每个连接启动独立的 RPC 服务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
func main() {
listener, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal("ListenTCP error:", err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Fatal("Accept error:", err)
}
go func() {
defer conn.Close()
p := rpc.NewServer()
p.Register(&HelloService{conn: conn})
p.ServeConn(conn)
} ()
}
}
|
Hello 方法中就可以根据 conn 成员识别不同连接
的 RPC 调用:
1
2
3
4
|
func (p *HelloService) Hello(request string, reply *string) error {
*reply = "hello:" + request + ", from" + p.conn.RemoteAddr().String()
return nil
}
|
基于上下文信息,我们可以方便地为 RPC 服务增加简单的登陆状态的验证:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
type HelloService struct {
conn net.Conn
isLogin bool
}
func (p *HelloService) Login(request string, reply *string) error {
if request != "user:password" {
return fmt.Errorf("auth failed")
}
log.Println("login ok")
p.isLogin = true
return nil
}
func (p *HelloService) Hello(request string, reply *string) error {
if !p.isLogin {
return fmt.Errorf("please login")
}
*reply = "hello:" + request + ", from" + p.conn.RemoteAddr().String()
return nil
}
|
这样可以要求在客户端连接 RPC 服务时,首先要执行登陆操作,登陆成功后才能正常执行其他的服务。