(rpc系列)03 玩转RPC

rpc在一些特殊场景中的应用...

前言

开源社区活跃着很多RPC框架,例如rpcxtars等等,用于在不同场景中实现不同的需求。

本篇我们尝试深入理解Go原生的RPC包,研究它在一些特殊场景下的用法。

rpc客户端的实现原理

俗话说,知其然,更要知其所以然。前两篇博文我们已经通过net/rpc包写过rpc服务,并且实现了客户端来调用服务接口。可这个客户端是怎么完成服务调用的呢?我们可以通过它的源码一窥其原理。

同步阻塞调用

Go 的 rpc 包最简单的使用方式是通过 Client.Call 方法进行同步阻塞调用,该方法的实现如下:

1
2
3
4
5
6
7
func (client *Client) Call(
	serviceMethod string, args interface{},
	reply interface{},
) error {
	call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
	return call.Error
}

通过 Client.Go 方法进行一次异步调用,返回一个表示这次调用的 Call 结构体。然后等待 Call 结构体的 Done 管道返回调用结果。

异步调用

执行异步调用的Client.Go方法是怎么实现的呢?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (client *Client) Go(
	serviceMethod string, args interface{},
	reply interface{},
	done chan *Call,
) *Call {
	call := new(Call)
	call.ServiceMethod = serviceMethod
	call.Args = args
	call.Reply = reply
	call.Done = make(chan *Call, 10) // buffered.

	client.send(call)
	return call
}

源码很简单,分为两步

  1. 构造一个表示当前调用的 call 变量;
  2. 通过 client.send 将 call 的完整参数发送到 RPC 框架。

client.send 方法调用是线程安全的,可以从多个 Goroutine 同时向同一个 RPC 连接发送调用指令。

当调用完成或者发生错误时,即调用 call.done 方法通知完成:

1
2
3
4
5
6
7
8
9
func (call *Call) done() {
	select {
	case call.Done <- call:
		// ok
	default:
		// We don't want to block here. It is the caller's responsibility to make
		// sure the channel has enough buffer space. See comment in Go().
	}
}

可以看到,call.Done 管道会将处理后的 call 返回。

用户侧异步调用

了解了Client的调用原理,我们就可以通过 Client.Go 方法异步调用前面的 HelloService 服务:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func doClientWork(client *rpc.Client) {
	helloCall := client.Go("HelloService.Hello", "hello", new(string), nil)

	// do some thing

	helloCall = <-helloCall.Done
	if err := helloCall.Error; err != nil {
		log.Fatal(err)
	}

	args := helloCall.Args.(string)
	reply := helloCall.Reply.(*string)
	fmt.Println(args, *reply)
}

异步调用命令发出后,一般会执行其他的任务,等异步调用结束,输入参数和返回值可以通过返回的 Call 变量进行获取。

基于RPC实现watch功能

服务简介

watch是一个用于监视数据变化的功能。它可以监听一个或多个数据,并在数据发生变化时执行开发人员预定义好的逻辑。

很多系统都提供了类似功能的接口,当系统满足某种条件时, 接口返回监控的结果。

这一节我们尝试通过 RPC 框架实现一个基本的 Watch 功能。如前文所描述,因为 client.send 是线程安全的,我们可以在不同的 Goroutine 中同时并发阻塞调用 RPC 方法。通过在一个独立的 Goroutine 中调用 Watch 函数进行监控。

一个栗子

为了便于演示,我们通过 RPC 构造一个简单的内存 KV 数据库。除了数据库必备的GetSet方法,我们额外提供一个Watch接口,客户端可以通过调用这个接口,来监听Key的变化。

KVStore服务

服务定义我们参考系列博客第一篇,这里为了演示我们直接构造最简单的rpc服务,你可以自行尝试把它封装成更安全的RPC接口:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
type KVStoreService struct {
    // m 用于存储 KV 数据
	m      map[string]string
    // filter 对应每个Watch调用时定义的过滤器函数列表
	filter map[string]func(key string)
    // 多个Goroutine并发访问m时的互斥锁
	mu     sync.Mutex
}

func NewKVStoreService() *KVStoreService {
	return &KVStoreService{
		m:      make(map[string]string),
		filter: make(map[string]func(key string)),
	}
}

Get接口

既然是K-V数据库,那我们自然要提供一个Get接口用于获取内存数据。注意接口定义必须满足Go的RPC规则

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func (p *KVStoreService) Get(key string, value *string) error {
	p.mu.Lock()
	defer p.mu.Unlock()

	if v, ok := p.m[key]; ok {
		*value = v
		return nil
	}

	return fmt.Errorf("not found")
}

Set接口

同样的,我们需要提供一个Set接口用于往K-V数据库中写入数据

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func (p *KVStoreService) Set(kv [2]string, reply *struct{}) error {
	p.mu.Lock()
	defer p.mu.Unlock()

	key, value := kv[0], kv[1]

	if oldValue := p.m[key]; oldValue != value {
        for _, fn := range p.filter {
			fn(key)
		}
	}

	p.m[key] = value
	return nil
}

大致的实现思路如下:

  1. 将 key 和 value 组成的数组作为输入参数传入接口;
  2. 简单起见,Set接口不需要返回数据。用一个匿名的空结构体表示忽略了输出参数,同时又能保证不会有额外的内存开销;
  3. 我们主要用于监听key的变动,所以修改某个 key 对应的值时,需要调用每一个过滤器函数,逻辑就是将变动的key写入管道回给客户端。

Watch接口

客户端可以通过这个接口,在一定时间范围监听key的变化。另外,过滤器列表也在 Watch 方法中提供:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func (p *KVStoreService) Watch(timeoutSecond int, keyChanged *string) error {
	id := fmt.Sprintf("watch-%s-%03d", time.Now(), rand.Int())
	ch := make(chan string, 10) // buffered

	p.mu.Lock()
    // 过滤器函数非自定义 当有key变动时就把key写入ch
	p.filter[id] = func(key string) { ch <- key }
	p.mu.Unlock()

	select {
	case <-time.After(time.Duration(timeoutSecond) * time.Second):
		return fmt.Errorf("timeout")
	case key := <-ch:
		*keyChanged = key
		return nil
	}
}

Watch 方法的输入参数是超时的秒数,返回值有两种情况:

  1. 将变化的 key 作为返回值返回;
  2. 如果超过时间内没有 key 被修改,则返回超时错误

Watch 的实现中,用唯一 id 表示每个 Watch 调用,然后根据 id 将自身对应的过滤器函数注册到 p.filter 列表。

之所以叫“过滤器”,应该是支持自定义,可以将某些key过滤掉,即使有变动也不通知客户端。

测试

KV数据库服务的注册和启动过程不再赘述,可自行实现或参考完整代码

我们按顺序启动5个client:

  • watch1和watch2分别调用watch接口,监听key的变化。同步的 watch 调用会阻塞,直到有 key 发生变化或者超时;
  • setKey调用Set接口设置KV,服务器会将变化的 key 通过 Watch 方法返回;
  • getKey调用Get接口测试内存数据库有没有正常工作。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// getKey.go
var v string
err = client.Call("KVStoreService.Get", "蜡笔小新主角", &v)
if err != nil {
    log.Fatal(err)
}
fmt.Println("key's v|", v)

// setKey.go
err = client.Call(
		"KVStoreService.Set", [2]string{"蜡笔小新主角", "风间"},
		new(struct{}),
)

// watch1 & watch2
var keyChanged string
err = client.Call("KVStoreService.Watch", 30, &keyChanged)
if err != nil {
    log.Fatal(err)
}

fmt.Println("watch:", keyChanged)

以上。我们就实现了对系统某些状态的监控

反向 RPC

通常的 RPC 是C/S 结构,RPC 的服务端对应网络的服务器,RPC 的客户端也对应网络客户端。但对于一些特殊场景,比如在公司内网提供一个 RPC 服务,但是在外网无法连接到内网的服务器。这种时候我们可以参考类似反向代理的技术,首先从内网主动连接到外网的 TCP 服务器,然后基于 TCP 连接向外网提供 RPC 服务。

todo: 怎么模拟这种情况呢?s能ping通c,但c没办法ping通s…

server

以下是启动反向 RPC 服务的代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func main() {
	rpc.Register(new(HelloService))

	for {
		conn, _ := net.Dial("tcp", "localhost:1234")
		if conn == nil {
			time.Sleep(time.Second)
			continue
		}

		rpc.ServeConn(conn)
		conn.Close()
	}
}

反向 RPC 的内网服务将不再主动提供 TCP 监听服务,而是首先主动连接到对方的 TCP 服务器。然后基于每个建立的 TCP 连接向对方提供 RPC 服务。

client

反向 RPC 客户端则需要在一个公共的地址提供一个 TCP 服务,用于接受 RPC 服务器的连接请求:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func main() {
	listener, err := net.Listen("tcp", ":1234")
	if err != nil {
		log.Fatal("ListenTCP error:", err)
	}

	clientChan := make(chan *rpc.Client)

	go func() {
		for {
			conn, err := listener.Accept()
			if err != nil {
				log.Fatal("Accept error:", err)
			}
			clientChan <- rpc.NewClient(conn)
		}
	}()

	doClientWork(clientChan)
}

当每个连接建立后,基于网络连接构造 RPC 客户端对象并发送到 clientChan 管道,这是个无缓冲通道

客户端执行 RPC 调用的操作在 doClientWork 函数完成:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func doClientWork(clientChan <-chan *rpc.Client) {
	// 从管道取出一个 RPC 客户端对象
    client := <-clientChan
    // 在函数退出前关闭客户端
	defer client.Close()

    // 执行 RPC 调用
	var reply string
	err := client.Call("HelloService.Hello", "hello", &reply)
	if err != nil {
		log.Fatal(err)
	}

	fmt.Println(reply)
}

上下文信息

有时候,我们希望针对不同客户端提供定制化的 RPC 服务。这就需要通过为每个连接提供独立的 RPC 服务,来实现对上下文特性的支持。

首先改造 HelloService,里面增加了对应连接的 conn 成员:

1
2
3
type HelloService struct {
	conn net.Conn
}

然后为每个连接启动独立的 RPC 服务:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func main() {
	listener, err := net.Listen("tcp", ":1234")
	if err != nil {
		log.Fatal("ListenTCP error:", err)
	}

	for {
		conn, err := listener.Accept()
		if err != nil {
			log.Fatal("Accept error:", err)
		}

		go func() {
			defer conn.Close()

			p := rpc.NewServer()
			p.Register(&HelloService{conn: conn})
			p.ServeConn(conn)
		} ()
	}
}

Hello 方法中就可以根据 conn 成员识别不同连接的 RPC 调用:

1
2
3
4
func (p *HelloService) Hello(request string, reply *string) error {
	*reply = "hello:" + request + ", from" + p.conn.RemoteAddr().String()
	return nil
}

基于上下文信息,我们可以方便地为 RPC 服务增加简单的登陆状态的验证:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
type HelloService struct {
	conn    net.Conn
	isLogin bool
}

func (p *HelloService) Login(request string, reply *string) error {
	if request != "user:password" {
		return fmt.Errorf("auth failed")
	}
	log.Println("login ok")
	p.isLogin = true
	return nil
}

func (p *HelloService) Hello(request string, reply *string) error {
	if !p.isLogin {
		return fmt.Errorf("please login")
	}
	*reply = "hello:" + request + ", from" + p.conn.RemoteAddr().String()
	return nil
}

这样可以要求在客户端连接 RPC 服务时,首先要执行登陆操作,登陆成功后才能正常执行其他的服务。

Licensed under CC BY-NC-SA 4.0
我的玫瑰,种在繁星中的一颗~
Built with Hugo
主题 StackJimmy 设计