etcd学习笔记(etcdv3, gRPC服务发现和负载均衡)

etcd 和 grpc 负载均衡学习笔记

etcd

etcd 是一个 分布式的 key-value 数据库

数据模型

etcd 设计时主要考虑两个目标

  • 可靠的存储不频繁更新的数据
  • 提供可靠的监控查询 (监控键值对的变化)

为了提供轻量级的快照和监控键值对的历史记录, etcd 暴露键值对的历史版本。一个持久化、多版本、分布式的数据模型适合这些案例

etcd 存储了一个多版本持久化的 store 中存取数据,每一次对 key 的修改不会替换原来的值,而是生成一个新的结构,所有过去的版本的数值在这次修改后仍然可以被获取并且监听。为了防止这个 store 随着时间越来越大要存储的数据越来越大,store 会将老版本的数据被压缩到一个叫 shed 的地方。

逻辑视图

store 的逻辑视图是一个扁平的二进制 键空间,键空间根据字符做了排序,因此查询的消耗很低

键空间的维护开很多版本,每一次对对这个键值的原子修改操作都在健空间生成一份新的修订,所有之前版本修订都保持不变。所有旧的版本数据仍然可以被获取到,但是当 store 被压缩到 shed 中的时候, 修订将会被移除。

一个 key 的生命周期一直有一个 generation, 如果新建一个之前不存在的 key , 那么这个 key 的generation 是 1, 每一次对这个 key 的修改都会将 增加这个 key 的 generation。 删除一个 key 将产生一个 key 的墓碑, 一旦压缩发生后,所有在压缩之前发生的修订都会被删除。

物理视图

etcd 将物理数据存储在 b+树中, 每一次修订只包括了这次和上一次修订的变化。 每次修定对应树中的多个 key。

键值对的 key 是一个三元组 (major, sub, type )。 其中 major 对应每一次修订, sub 区分修订中的不同key。 type 是一些特殊值的前缀,value 记录了和上一次修订的差值。 压缩则移除过期的键值对

etcd 在内存中也建立了一份 b树索引, 这样能够让对key 的查询变得高效。 b数 的key 就是 store 暴露给用户的 key

硬件推荐

  • cpu 需求比较低,通常两到四核就够用了,如果负载很高,可以考虑用 8或16核的机器
  • 内存 内存消耗也不高,8G基本已经够用
  • 磁盘 磁盘对于 etcd 的性能和稳定性至关重要,首先是写入延迟对于 etcd 的性能影响很大。一般情况下硬盘的带宽能满足我们的需求, 如果可以,最好使用 固态硬盘, 不论对于固态硬盘还是机械硬盘, RAID 0 都能提升效率。对于至少三个节点的机器来说,复制和备份都不太重要。 etcd 本身就能保证高可用性
  • 对于多节点的 etcd 来说, 低延迟的网络能够提升写入速率, 高带宽的网络可以减少故障恢复的时间,因此最好将 etcd 集群部署在一个数据中心,内网带宽越高越好

etcd 网关

什么是 etcd 网关

etcd 网关是一个简单的 TCP 代理, 网关是无状态和透明的,既不会检查客户端的请求也不会修改集群的响应。 网关支持远端多个 etcd 节点, 可以用简单的 轮询方式依次访问每一个节点,如果某个节点出错了, 网关可以对客户端隐藏失败细节。 以后可能会支持概率轮询等其他方式。

什么时候应该使用 etcd 网关模式

当一台机器上有多个应用程序都需要使用 etcd 的时候,每一个程序都需要知道 etcd 的地址, 如果 etcd 的地址修改了,那么每一个应用都需要修改 etcd 的地址。 如果在该机器上部署一个 etcd 网关节点,当 etcd 远端地址修改的时候,只用修改每一台机器上的 etcd 远端地址就行了。

什么时候不应该使用网关模式

  • 追求性能 网关模式并没有为了考虑提升性能而设计。 不提供缓存,合并,和批处理等操作。 将来可能会开发带缓存的代理网关。
  • 运行一个集群管理系统的时候 高级的集群管理系统 如k8s 等原生支持服务发现功能, 应用可以通过 系统管理的 DNS 或者虚拟 IP 来访问 etcd 节点。

grpc 负载均衡与服务发现

为了构建高可用,高性能的后段服务,通常采用负载均衡、服务发现、容错处理等机制。通常有以下三种负载均衡方式

代理模式 (Proxy Model)

通常使用一个独立的客户端来代理所有的请求,通常采用专门的硬件或者 HAproxy 等软件来实现。客户端不直接请求服务端,而是向代理发送请求,代理再将所有的请求按照某种策略如轮询等方式发送给服务端并将服务端的结果返回给客户端。 代理模式通常具备健康检查能力,可以移除故障的服务端实例。缺点在于:

  • 代理模式需要额外的资源,因为它需要复制请求和响应。
  • 在客户端和服务端之间增加了一级,有一定的性能损耗并且会增加延迟。

进程内负载均衡(Balancing-aware Client)

这种厚客户端将负载均衡的逻辑部分更多的放到了客户端来做, 客户端可以包含多种负载均衡的策略(轮询、随机等)来从服务列表中选取。 客户端静态的配置了服务端列表,或者客户端通过命名解析系统维护服务端。这种情况下就是客户端负责从列表中选取服务。客户端直接连接服务端,开销比第一种模式低。该方案的问题在于:

  • 开发成本, 开发和维护多版本、多语言的客户端的成本很高。
  • 一些算法需要客户端和服务端之间维持通信,客户端会变的更加复杂

外部负载均衡

这种情况是前两种模式的折中,将负载均衡的逻辑独立出来交给单独的均衡器。 均衡器负责和后端服务保持通信,收集服务的负载和状态信息。 客户端通过均衡器来选择服务列表。相比第一种方案,客户端仍然是直接连接服务端,不会有额外的开销。 客户端也不需要为不同语言和不同版本来维护不同的库。

结构

gRPC 并没有直接实现负载均衡和服务发现的功能,但是已经提供了自己的设计思路。已经为命名解析和负载均衡提供了接口。 我们可以用 etcd 方便的实现 gRPC 的负载均衡。和服务发现。

  • 启动的时候 gRPC 客户端发起一个命名解析请求。 命名将被解析成一个或者多个 IP 地址, 每个地址将被标明他是一个负载均衡器的地址还是后端服务的地址,每个地址还标明客户端将使用的负载均衡策略
  • 客户端实例化负载均衡策略(如果其中命名解析器返回的任何一个地址是负载均衡的地址,客户端将会使用 grpclb 策略, 否则 客户端将会使用服务器配置的负载均衡策略。 如果服务没有配置负载均衡策略,客户端将默认使用第一个可用的服务器地址)
  • 负载均衡策略将为每一个服务器地址创建一个子通道
    • 对于除了 grpclb 之外的, 除了均衡器的地址之外,所有后端服务都有一个子通道。
    • 对于grpclb 而言
      • 客户端从解析器返回的均衡器地址中打开一个流,客户端向均衡器请求一组可以直连的服务器地址
      • gRPC 后端服务向均衡器报告自己的负载情况
      • 均衡器返回一组服务列表, grpclb 再为每一个后端服务创建一个子通道
  • 对于每一个 RPC 请求,负载均衡策略决定 RPC 请求将被送到哪一个子通道
    • 对于grpclb 而言, 客户端将会按照grpclb 返回的后端服务顺序依次发送每一个请求。

demo

我们用 grpc 可以很方便的实现服务发现和域名解析, 此处使用的是外部负载均衡

客户端

import (
    "github.com/coreos/etcd/clientv3"
    etcdnaming "github.com/coreos/etcd/clientv3/naming"

    "google.golang.org/grpc"
)

...

cli, cerr := clientv3.NewFromURL("http://localhost:2379")
r := &etcdnaming.GRPCResolver{Client: cli}
b := grpc.RoundRobin(r)
conn, gerr := grpc.Dial("my-service", grpc.WithBalancer(b))

在上面的例子中,客户端代码很简洁,用 etcd 实现命名解析,并用轮询的方式实现负载均衡。 服务的名字叫 “my-service”

服务端

  • 可以用手动的方式注册服务

    ETCDCTL_API=3 etcdctl put my-service/1.2.3.4:5678 '{"Addr":"1.2.3.4:5678","Metadata":"..."}'
    
  • 只要往 etcd 中写入 以 “my-service”为前缀的key,客户端就会根据 value 中的 Addr 进行拨号

    r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Add, Addr: "1.2.3.4", Metadata: "..."})
    
  • 需要删除服务时, 将 etcd 中的 key 删除即可

  • 通常在生产环境中,我们需要当服务启动时将服务添加到可用列表中, 当服务挂掉之后要及时的从可用列表中删除, 当服务端不可用时,负载均衡器需要及时感知。 通常的策略是 启动时申请一个key, 然后不断的更新 key 的租约。

type Service struct {
    Addr         string `json:"Addr"`
    Metadate     string `json:"Metadate"`
}

// Register
func Register(name string, host string, port int, addr string, interval time.Duration, ttl int) error {
    service  := Service{
        Addr: fmt.Sprintf("%s:%d", host, port),
        Metadate: "...",
    }
    bts, err := json.Marshal(service)
    if err != nil{
        return err
    }

    serviceValue := string(bts)
    serviceKey = fmt.Sprintf("%s/%s", name, serviceValue)

    // get endpoints for register dial address
    client, err := etcd3.New(etcd3.Config{
        Endpoints: []string{addr},
    })
    if err != nil {
        return fmt.Errorf("grpclb: create etcd3 client failed: %v", err)
    }

    go func() {
        // invoke self-register with ticker
        ticker := time.NewTicker(interval)
        for {
            // minimum lease TTL is ttl-second
            resp, _ := client.Grant(context.TODO(), int64(ttl))
            // should get first, if not exist, set it
            _, err := client.Get(context.Background(), serviceKey)
            if err != nil {
                if err == rpctypes.ErrKeyNotFound {
                    if _, err := client.Put(context.TODO(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err != nil {
                        log.Printf("grpclb: set service '%s' with ttl to etcd3 failed: %s", name, err.Error())
                    }
                } else {
                    log.Printf("grpclb: service '%s' connect to etcd3 failed: %s", name, err.Error())
                }
            } else {
                // refresh set to true for not notifying the watcher
                if _, err := client.Put(context.Background(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err != nil {
                    log.Printf("grpclb: refresh service '%s' with ttl to etcd3 failed: %s", name, err.Error())

                }
            }
            select {
            case <-stopSignal:
                return
            case <-ticker.C:
            }
        }
    }()

    return nil
}

// UnRegister delete registered service from etcd
func UnRegister() error {
    stopSignal <- true
    stopSignal = make(chan bool, 1) // just a hack to avoid multi UnRegister deadlock
    var err error
    if _, err := client.Delete(context.Background(), serviceKey); err != nil {
        log.Printf("grpclb: deregister '%s' failed: %s", serviceKey, err.Error())
    } else {
        log.Printf("grpclb: deregister '%s' ok.", serviceKey)
    }
    return err
}

写完之后,本地运行多个服务端,可以看到 lb 将 RPC 请求依次转发到每一个服务端。 动态的添加或删除服务端进程,客服端能通过 lb 感知到服务端的变化。

参考文档

如果你觉得本文对你有帮助,欢迎打赏!