etcd 是一个分布式一致性的 key-value 存储技术, 被用来做配置共享和服务发现。有着以下优点:
- 简单: 使用 gRPC 实现了 面向用户的API(v3), v2版本提供 CURL 可访问的 API( HTTP+JSON )
- 安全: 自动使用 TLS 并可选客户端验证
- 快速: 10,000 次写操作/秒
- 可靠: 使用 Raft 算法实现
etcd 安装
获取etcd
- 最简单的方式是从 Github release page 下载编译好的二进制文件。
- etcd 是用 go 开发的, 也可以从 https://github.com/coreos/etcd 处直接下载源码, 下载完成之后在源码目录执行 ./build 编译项目
运行 etcd
运行单机 etcd 集群
首先安装 goreman
go get github.com/mattn/goreman
编辑 Profile 文件。 vim Procfile
# Use goreman to run `go get github.com/mattn/goreman`
etcd1: bin/etcd --name infra1 --listen-client-urls http://127.0.0.1:2379 --advertise-client-urls http://127.0.0.1:2379 --listen-peer-urls http://127.0.0.1:12380 --initial-advertise-peer-urls http://127.0.0.1:12380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof
etcd2: bin/etcd --name infra2 --listen-client-urls http://127.0.0.1:22379 --advertise-client-urls http://127.0.0.1:22379 --listen-peer-urls http://127.0.0.1:22380 --initial-advertise-peer-urls http://127.0.0.1:22380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof
etcd3: bin/etcd --name infra3 --listen-client-urls http://127.0.0.1:32379 --advertise-client-urls http://127.0.0.1:32379 --listen-peer-urls http://127.0.0.1:32380 --initial-advertise-peer-urls http://127.0.0.1:32380 --initial-cluster-token etcd-cluster-1 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --initial-cluster-state new --enable-pprof
# in future, use proxy to listen on 2379
#proxy: bin/etcd --name infra-proxy1 --proxy=on --listen-client-urls http://127.0.0.1:2378 --initial-cluster 'infra1=http://127.0.0.1:12380,infra2=http://127.0.0.1:22380,infra3=http://127.0.0.1:32380' --enable-pprof
启动集群
goreman start
上述脚本生成一个三个 etcd 成员 infra1 infra2 infra3 和一个 etcd 代理 proxy, 共同组成了一个本地的集群.
每一个集群成员 和 代理 都接受 key-value 的读写请求。
etcd 实现共享配置
- 理论上,应用唯一需要的配置的 ETCD_HOST 环境变量,用于指定ETCD地址。通常使用 docker。
- 应用启动时从 etcd 获取一次配置信息,(获取该 key 的 value)
- 同时系统在 etcd 节点上注册一个 Watcher,这样每次配置发生改变之后,应用都能通过 etcd 实时获取到最新配置。
etcd 服务注册与发现
随着 docker 的火爆, 微服务架构中的服务注册和发现变的越来越重要,而 etcd 就是为了为这些情况提供解决方案的。因为 etcd 有以下特性:
- 强一致和高可用。 采用了 Raft 算法的 etcd 能勾为我们提供强一致性和高可用性的保证
- 提供了注册服务和监听服务状态的机制 etcd 可以对某个 key 设置 TTL,可以通过监听服务的心跳来检测服务状态。
- 提供了一种查找和连接服务的机制。当监听到添加了某个服务之后,可以去连接该服务。 有时候我们可以在每一台提供服务的机器上部署一个 proxy 模式的 etcd 节点。 每个服务只要访问自己的 etcd 节点即可。
example
现在假设我们需要开发一套可以水平扩展的长连接服务,需要能动态添加和删除服务器。
- 每一台 server 启动成功后,去向 etcd 注册服务,并定时向etcd 发送心跳
- 同时网关 agent 监听服务目录(对key注册Watcher),当监听到新的服务时,将服务器添加到可用服务器列表中。
- 当有的 server 关闭后,agent 检测到某个 key 的超时,可以根据自己的策略来决定是否删除掉该server。
- 客户端连接 server 之前,先向 agent 请求一台 server 地址,agent 可以根据自己的策略给 client 分配服务。 常用的有一致性hash 等算法。 client 更根据分配的 server 地址去连接长连接服务
client 服务注册
type Service struct {
ProcessId int // 进程ID , 单机调试时用来标志每一个服务
info Serverinfo // 服务端信息
KeysAPI client.KeysAPI // API client, 此处用的是 V2 版本的API,是基于 http 的。 V3版本的是基于grpc的API
}
// workerInfo is the service register information to etcd
type Serverinfo struct {
Id int32 `json:"id"` // 服务器ID
IP string `json:"ip"` // 对外连接服务的 IP
Port int32 `json:"port"` // 对外服务端口,本机或者端口映射后得到的
}
// 注册服务
func RegisterService( endpoints []string) {
cfg := client.Config{
Endpoints: endpoints,
Transport: client.DefaultTransport,
HeaderTimeoutPerRequest: time.Second,
}
etcdClient, err := client.New(cfg)
if err != nil {
log.Fatal("Error: cannot connec to etcd:", err)
}
s := &Service{
ProcessId: os.Getpid(),
info: Serverinfo{Id: 1024, IP: "127.0.0.1", Port: 100},
KeysAPI: client.NewKeysAPI(etcdClient),
}
go s.HeartBeat() // 定时发送心跳
}
func (s *Service) HeartBeat() {
api := s.KeysAPI
for {
key := "lc_server/p_" + strconv.Itoa(s.ProcessId) // 先用 pid 来标识每一个服务, 通常应该用 IP 等来标识。
// etcd 之所以适合用来做服务发现,是因为它是带目录结构的。 注册一类服务,
// 只需要 key 在同一个目录下,此处 lc_sercer 目录下,p_{pid}
value, _ := json.Marshal(s.info)
_, err := api.Set(context.Background(), key, string(value), &client.SetOptions{
TTL: time.Second * 20,
}) // 调用 API, 设置该 key TTL 为20秒。
if err != nil {
log.Println("Error update workerInfo:", err)
}
time.Sleep(time.Second * 10)
}
}
agent 监听服务
type Server struct {
info Serverinfo // 服务器信息
Status int // 服务器状态
}
// all server
type server_pool struct {
services map[string]*Server // server 列表
client etcdclient.Client
mu sync.RWMutex
}
var (
Default_pool server_pool
once sync.Once
)
func Init( endpoints []string) {
once.Do(func() { Default_pool.init(endpoints) })
}
func (p *server_pool) init( hosts []string) {
// init etcd client
cfg := etcdclient.Config{
Endpoints: hosts, //
Transport: etcdclient.DefaultTransport,
}
etcdcli, err := etcdclient.New(cfg)
if err != nil {
log.Panic(err)
os.Exit(-1)
}
p.client = etcdcli
// init
p.services = make(map[string]*Server)
go p.watcher()
}
// watcher for data change in etcd directory
func (p *server_pool) watcher() error {
kAPI := etcdclient.NewKeysAPI(p.client)
w := kAPI.Watcher("lc_server/", &etcdclient.WatcherOptions{Recursive: true})
// 监听 "lc_server/" 当 "lc_server/" 子目录改变时能收到通知
for {
resp, err := w.Next(context.Background())
if err != nil {
log.Println(err)
continue
}
if resp.Node.Dir {
continue
}
switch resp.Action {
case "set", "create", "update", "compareAndSwap":
p.add_server(resp.Node.Key, resp.Node.Value)
// 添加或更新 server 信息
case "delete", "compareAndDelete","expire":
p.remove_server(resp.PrevNode.Key)
// 当过期之后删除 server
}
}
}
以上是一个简单的长连接服务注册与发现的简单 demo。 服务注册与发现通常被用在微服务架构中,基本思想是通过职能划分,能够做到更合理的调动服务器资源。
上述是一个是注册的长连接服务,消耗的是带宽,还有一些其他服务消耗IO, 内存,CPU,GPU等。 可以通过服务的动态添加合理的配置每中服务的资源数量。