etcd 服务注册与发现

etcd 是一个分布式一致性的 key-value 存储技术, 被用来做配置共享和服务发现。有着以下优点:

  • 简单: 使用 gRPC 实现了 面向用户的API(v3), v2版本提供 CURL 可访问的 API( HTTP+JSON )
  • 安全: 自动使用 TLS 并可选客户端验证
  • 快速: 10,000 次写操作/秒
  • 可靠: 使用 Raft 算法实现

etcd 安装

获取etcd

运行 etcd

运行单机 etcd 集群

首先安装 goreman

go get github.com/mattn/goreman

编辑 Profile 文件。 vim Procfile

# Use goreman to run `go get github.com/mattn/goreman`
etcd1: bin/etcd --name infra1 --listen-client-urls http://127.0.0.1:2379 --advertise-client-urls http://127.0.0.1:2379 --listen-peer-urls http://127.0.0.1:12380 --initial-advertise-peer-urls http://127.0.0.1:12380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof
etcd2: bin/etcd --name infra2 --listen-client-urls http://127.0.0.1:22379 --advertise-client-urls http://127.0.0.1:22379 --listen-peer-urls http://127.0.0.1:22380 --initial-advertise-peer-urls http://127.0.0.1:22380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof
etcd3: bin/etcd --name infra3 --listen-client-urls http://127.0.0.1:32379 --advertise-client-urls http://127.0.0.1:32379 --listen-peer-urls http://127.0.0.1:32380 --initial-advertise-peer-urls http://127.0.0.1:32380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof
# in future, use proxy to listen on 2379
#proxy: bin/etcd --name infra-proxy1 --proxy=on --listen-client-urls http://127.0.0.1:2378 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --enable-pprof

启动集群

goreman start

上述脚本生成一个三个 etcd 成员 infra1 infra2 infra3 和一个 etcd 代理 proxy, 共同组成了一个本地的集群.

每一个集群成员 和 代理 都接受 key-value 的读写请求。

etcd 实现共享配置

  • 理论上,应用唯一需要的配置的 ETCD_HOST 环境变量,用于指定ETCD地址。通常使用 docker。
  • 应用启动时从 etcd 获取一次配置信息,(获取该 key 的 value)
  • 同时系统在 etcd 节点上注册一个 Watcher,这样每次配置发生改变之后,应用都能通过 etcd 实时获取到最新配置。

etcd 服务注册与发现

随着 docker 的火爆, 微服务架构中的服务注册和发现变的越来越重要,而 etcd 就是为了为这些情况提供解决方案的。因为 etcd 有以下特性:

  • 强一致和高可用。 采用了 Raft 算法的 etcd 能勾为我们提供强一致性和高可用性的保证
  • 提供了注册服务和监听服务状态的机制 etcd 可以对某个 key 设置 TTL,可以通过监听服务的心跳来检测服务状态。
  • 提供了一种查找和连接服务的机制。当监听到添加了某个服务之后,可以去连接该服务。 有时候我们可以在每一台提供服务的机器上部署一个 proxy 模式的 etcd 节点。 每个服务只要访问自己的 etcd 节点即可。

example

现在假设我们需要开发一套可以水平扩展的长连接服务,需要能动态添加和删除服务器。

etcd

  • 每一台 server 启动成功后,去向 etcd 注册服务,并定时向etcd 发送心跳
  • 同时网关 agent 监听服务目录(对key注册Watcher),当监听到新的服务时,将服务器添加到可用服务器列表中。
  • 当有的 server 关闭后,agent 检测到某个 key 的超时,可以根据自己的策略来决定是否删除掉该server。
  • 客户端连接 server 之前,先向 agent 请求一台 server 地址,agent 可以根据自己的策略给 client 分配服务。 常用的有一致性hash 等算法。 client 更根据分配的 server 地址去连接长连接服务

client 服务注册

type Service struct {
    ProcessId int            // 进程ID , 单机调试时用来标志每一个服务
    info      Serverinfo    // 服务端信息
    KeysAPI   client.KeysAPI // API client, 此处用的是 V2 版本的API,是基于     http 的。 V3版本的是基于grpc的API
}
// workerInfo is the service register information to etcd
type Serverinfo struct {
    Id   int32    `json:"id"`   // 服务器ID
    IP   string   `json:"ip"`   // 对外连接服务的 IP
    Port int32    `json:"port"` // 对外服务端口,本机或者端口映射后得到的
}



// 注册服务
func RegisterService( endpoints []string) {
        cfg := client.Config{
            Endpoints:               endpoints,
            Transport:               client.DefaultTransport,
            HeaderTimeoutPerRequest: time.Second,
        }

    etcdClient, err := client.New(cfg)
    if err != nil {
        log.Fatal("Error: cannot connec to etcd:", err)
    }

    s := &Service{
        ProcessId: os.Getpid(),
        info:      Serverinfo{Id: 1024, IP: "127.0.0.1", Port: 100},
        KeysAPI:   client.NewKeysAPI(etcdClient),
    }
    go s.HeartBeat() // 定时发送心跳
}

func (s *Service) HeartBeat() {
    api := s.KeysAPI
    for {
        key := "lc_server/p_" + strconv.Itoa(s.ProcessId) // 先用 pid 来标识每一个服务, 通常应该用 IP 等来标识。
        // etcd 之所以适合用来做服务发现,是因为它是带目录结构的。 注册一类服务,
        // 只需要 key 在同一个目录下,此处 lc_sercer 目录下,p_{pid} 
        value, _ := json.Marshal(s.info)

        _, err := api.Set(context.Background(), key, string(value), &client.SetOptions{
            TTL: time.Second * 20,
        })    // 调用 API, 设置该 key TTL 为20秒。

        if err != nil {
            log.Println("Error update workerInfo:", err)
        }
        time.Sleep(time.Second * 10)
    }
}

agent 监听服务

type Server struct {
    info   Serverinfo // 服务器信息
    Status int         // 服务器状态
}

// all server
type server_pool struct {
    services map[string]*Server    // server 列表
    client   etcdclient.Client
    mu       sync.RWMutex
}

var (
    Default_pool server_pool
    once          sync.Once
)


func Init( endpoints []string) {
    once.Do(func() { Default_pool.init(endpoints) })
}

func (p *server_pool) init( hosts []string) {
    // init etcd client
    cfg := etcdclient.Config{
        Endpoints: hosts, //
        Transport: etcdclient.DefaultTransport,
    }
    etcdcli, err := etcdclient.New(cfg)
    if err != nil {
        log.Panic(err)
        os.Exit(-1)
    }
    p.client = etcdcli
    // init
    p.services = make(map[string]*Server)

    go p.watcher()
}


// watcher for data change in etcd directory
func (p *server_pool) watcher() error {
    kAPI := etcdclient.NewKeysAPI(p.client)
    w := kAPI.Watcher("lc_server/", &etcdclient.WatcherOptions{Recursive: true})
    // 监听 "lc_server/" 当 "lc_server/" 子目录改变时能收到通知
    for {
        resp, err := w.Next(context.Background())
        if err != nil {
            log.Println(err)
            continue
        }
        if resp.Node.Dir {
            continue
        }
        switch resp.Action {
        case "set", "create", "update", "compareAndSwap":
            p.add_server(resp.Node.Key, resp.Node.Value)
            // 添加或更新 server 信息
        case "delete", "compareAndDelete","expire":
            p.remove_server(resp.PrevNode.Key)
            // 当过期之后删除 server
        }
    }
}

以上是一个简单的长连接服务注册与发现的简单 demo。 服务注册与发现通常被用在微服务架构中,基本思想是通过职能划分,能够做到更合理的调动服务器资源。

上述是一个是注册的长连接服务,消耗的是带宽,还有一些其他服务消耗IO, 内存,CPU,GPU等。 可以通过服务的动态添加合理的配置每中服务的资源数量。

参考文档

如果你觉得本文对你有帮助,欢迎打赏!