Go 语言操作 etcd

标签:etcd首次发布:2023-12-08最近修改:2024-06-14

安装

将 etcd 的客户端库添加到项目的 go.mod 文件中。

bash
go get go.etcd.io/etcd/client/v3

put、get 和 delete 操作

put命令用来设置键值对数据,get命令用来根据 key 获取值。

go
import (    "context"    "fmt"    clientV3 "go.etcd.io/etcd/client/v3"    "log"    "time")func main() {    // 连接etcd    cli, err := clientV3.New(clientV3.Config{        Endpoints:   []string{"192.168.11.10:2379"},        DialTimeout: 5 * time.Second,    })    if err != nil {        log.Fatalf("Failed to connect to etcd: %v", err)    }    defer func(cli *clientV3.Client) {        _ = cli.Close()    }(cli)    // 设置键值    ctx, cancel := context.WithTimeout(context.Background(), time.Second)    _, err = cli.Put(ctx, "age", "20")        // 执行put()后立即取消上下文    cancel()    if err != nil {        log.Fatalf("Failed to set key-value to etcd: %v", err)    }    // 获取键值    ctx, cancel = context.WithTimeout(context.Background(), time.Second)    resp, err := cli.Get(ctx, "age")    cancel()    if err != nil {        log.Fatalf("Failed to get key-value from etcd: %v", err)    }    for _, ev := range resp.Kvs {        fmt.Printf("%s:%s\n", ev.Key, ev.Value) // age:20    }    // 删除键值    ctx, cancel = context.WithTimeout(context.Background(), time.Second)    delResp, err := cli.Delete(ctx, "age")    cancel()    if err != nil {        log.Fatalf("Failed to delete key-value from etcd: %v", err)    }    fmt.Println(delResp.Deleted) // 1}

需要注意的是,Get 方法返回的是一个 *clientv3.GetResponse 类型的对象,而不是直接返回键值对。GetResponse 是一个结构体,其中包含了与请求相关的多个字段,如下所示(可能不是最新的定义,但应该反映了基本的概念):

go
type GetResponse struct {    Header *ResponseHeader    Kvs    []*mvccpb.KeyValue    More   bool    Count  int64}

在这个结构体中:

  • Header 包含了响应的一些元数据,如版本号、响应时间戳等。
  • Kvs 是一个 KeyValue 类型的切片,每个 KeyValue 表示一个键值对。
  • More 表示是否还有更多的数据可以通过后续的请求获取。
  • Count 表示查询匹配到的总键值对数量。

因此,当执行 Get 操作时,即使是请求单个键的值,您也可能得到一个包含多个 KeyValue 对象的切片,因为 etcd 允许一次请求多个键,或是请求一个键前缀匹配的所有键。每个 KeyValue 结构体包含了如下字段:

go
type KeyValue struct {    Key            []byte    CreateRevision int64    ModRevision    int64    Version        int64    Value          []byte    Lease          int64}

在这个结构体中,一般通常关心的是 KeyValue 字段,它们分别表示键和值。由于它们是 []byte 类型,所以有时候需要将它们转换为字符串或其他适当的类型来使用。

这就是为什么需要遍历 resp.Kvs 来获取每个键值对的原因。即使是请求的是单个键,按照上面的结构体定义,也需要从 Kvs 数组中取出相应的 KeyValue 结构体,并从中提取 KeyValue

由于 Kvs 是一个 KeyValue 数组,所以 get 支持同时获取多个键值对,因此可以使用一些前缀匹配来获取多个键值对。

go
resp, err := cli.Get(ctx, "a", clientV3.WithPrefix())

事务

事务是在一系列条件都满足的情况下执行一组操作,如果条件不满足,则可以执行另外一组操作。etcd 的客户端库进行事务操作是通过Txn接口实现的。etcd 的事务是原子的,要么全部执行,要么一个都不执行。如果事务中的某个条件失败了,那么"Then"子句中的操作将不会执行,而是执行"Else"子句中的操作。

但是,etcd 的事务并不像关系型数据库中的事务那样支持回滚能力。在关系型数据库中,如果事务失败或者调用回滚,所有已经执行的操作都会撤销,数据库状态回到事务开始前的状态。但是在 etcd 中,如果一个事务的操作部分成功了,这时候不能自动回滚已经成功的部分。

go
func main() {    cli, err := clientV3.New(clientV3.Config{        Endpoints:   []string{"192.168.11.10:2379"},        DialTimeout: 5 * time.Second,    })    if err != nil {        log.Fatalf("Failed to connect to etcd: %v", err)    }    defer func(cli *clientV3.Client) {        _ = cli.Close()    }(cli)    // 创建事务上下文    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)    defer cancel()    // 开始一个事务    txn := cli.Txn(ctx)    // 事务操作    txnResp, err := txn.        If(            // 如果条件: 检查键 "username" 的版本是否为 0, 为 0 表示键不存在            clientV3.Compare(clientV3.Version("username"), "=", 0),        ).        Then(            // 则操作: 设置一个键值对            clientV3.OpPut("exit", "false"),        ).        Else(            // 否则操作: 获取键 "username" 的值            clientV3.OpGet("username"),        ).        Commit() // 提交事务    if err != nil {        fmt.Printf("Failed to execute txn: %v", err)    } else {        if txnResp.Succeeded {            fmt.Println("执行了 'Then' 分支")            getResp, err := cli.Get(ctx, "username")            if err != nil {                fmt.Printf("Failed to get the value for 'username': %v", err)            } else {                for _, kv := range getResp.Kvs {                    fmt.Printf("%s:%s\n", kv.Key, kv.Value)                }            }        } else {            fmt.Println("执行了 'Else' 分支")            for _, resp := range txnResp.Responses {                if getResp := resp.GetResponseRange(); getResp != nil {                    for _, kv := range getResp.Kvs {                        fmt.Printf("%s:%s\n", kv.Key, kv.Value)                    }                }            }        }    }}

运行测试结果如下:

text
PS D:\GolandProjects\etcd\txn> go run .\main.go执行了'Then'分支exit:false

当在 etcd 中设置 username 后的测试结果如下:

text
PS D:\GolandProjects\etcd\txn> go run .\main.go执行了'Else'分支username:bing

需要注意的是:

  1. 如果事务执行的是OpPut操作,其不返回任何与它存储的键值对相关的信息,所以需要使用 Get 方法来得到事务的执行结果。这意味着在事务提交后,需要额外发起一个 Get 请求来检查 Put 操作的结果,例如查询键的当前值。
  2. 如果事务执行的是OpGet操作,它会返回相关的键值对信息。在事务提交后,可以直接从事务响应中读取这些信息。OpGet 操作在事务中用于条件判断或获取键的当前值。
  3. 如果事务执行的是OpDelete操作,它不会返回被删除键的值信息,但它会返回一些操作的元信息,比如删除操作影响的键的数量(通过 Deleted 字段)。如果需要检查特定键是否真的被删除,可能需要在事务之后进行一个 Get 操作来确认键不再存在。

watch

使用 etcd 客户端的 Watch 方法可以实现对特定键或键前缀的变化进行监听。Watch 方法返回一个 WatchChan 类型的通道,你可以从这个通道接收 WatchResponse的结构体,结构体的类型声明如下:

go
// 一个只读的通道type WatchChan <-chan WatchResponsetype WatchResponse struct {    Header pb.ResponseHeader    Events []*Event    CompactRevision int64    Canceled bool    Created bool}

每个 WatchResponse 包含以下信息:

  • Header: 包含响应的元数据,例如响应的修订版本号。
  • Events: 一系列发生的事件,每个 Event 包含了对应的键值更改信息。
  • CompactRevision: 如果这个字段非零,它表示了修订历史在这个版本之前已经被压缩,也就是说旧的事件历史已经不可用。
  • Canceled: 如果这个字段为 true,它表示监听被取消了,这通常是因为调用了 Watch 方法的取消函数。
  • Created: 如果这个字段为 true,它表示一个新的 watch 流已经被创建。

Watch 的典型应用场景是应用于系统配置的热加载,我们可以在系统读取到存储在 etcd key 中的配置后,用 Watch 监听 key 的变化。在单独的 goroutine 中接收 WatchChan 发送过来的数据,并将更新应用到系统设置的配置变量中,这样系统就实现了配置变量的热加载。

go
import (    "context"    "encoding/json"    "fmt"    clientV3 "go.etcd.io/etcd/client/v3"    "log"    "time")type Login struct {    Username string `json:"username"`    Password string `json:"password"`}func watchConfig(clt *clientV3.Client, key string, ss interface{}) {    watchCh := clt.Watch(context.TODO(), key)    go func() {        for res := range watchCh {            value := res.Events[0].Kv.Value            if err := json.Unmarshal(value, ss); err != nil {                fmt.Println("Watch config err:", err)                continue            }            fmt.Printf("Config changed:%#v\n", ss)        }    }()}func main() {    client, err := clientV3.New(clientV3.Config{        Endpoints:   []string{"192.168.11.10:2379"},        DialTimeout: 5 * time.Second,    })    if err != nil {        log.Fatal(err)    }    defer func(client *clientV3.Client) {        _ = client.Close()    }(client)        var login Login    // 开始监听配置变更    watchConfig(client, "login", &login)        // 为了演示和调试,需运行一段时间    select {    case <-time.After(5 * time.Minute):        fmt.Println("Finished watching config changes")    }}

当对 login 配置做修改的时候:

text
[root@centos ~]# etcdctl put login '{"username":"abc","password":"000"}'OK[root@centos ~]# etcdctl put login '{"username":"xyz","password":"111"}'OK

应用程序能检查到配置的变更,运行结果如下:

text
PS D:\GolandProjects\etcd\watch> go run main.goConfig changed:&main.Login{Username:"abc", Password:"000"}Config changed:&main.Login{Username:"xyz", Password:"111"}

租约

etcd 的租约(Lease)是 etcd 中一种很重要的机制,用于实现键值对的自动过期删除。这在分布式系统中非常有用,尤其是在处理失效节点或服务实例的时候。

租约的工作原理

一个租约有一个关联的时间周期,称为 TTL(Time-To-Live)。客户端可以申请一个租约,并将这个租约附加到一个或多个键值对上。一旦租约到期(如果没有通过 keep-alive 机制进行续约),与其关联的所有键值对都会自动从 etcd 中删除。

租约的用途

租约的概念在服务发现和配置管理等场景中非常实用。例如,在服务发现中,服务实例在启动时向 etcd 注册自己的存在,并维持一个租约。如果服务实例崩溃或无法续租,它的注册信息将会在租约到期时自动清除,这样客户端就不会尝试连接到失效的服务实例。

租约的操作

在 etcd 中与租约相关的操作通常包括:

  • 创建租约:客户端请求创建一个具有指定 TTL 的租约。
  • 续约租约:客户端定期发送请求以续约租约,从而防止租约到期。
  • 撤销租约:客户端可以主动撤销租约,这将删除租约并清除所有关联的键值对。
  • 将键值对附加到租约:在设置键值对时,可以指定一个租约 ID,使该键值对成为租约的一部分。

租约的好处

  • 容错性:如果客户端因为崩溃或网络分区而无法续租,相关键值对会自动清理,这有助于系统自愈。
  • 减少网络开销:客户端不需要定期发送删除请求来清理不再需要的键值对,只需维持租约即可。
  • 灵活性:租约可以用于实现锁和领导选举等分布式原语,服务实例可以使用租约来标示它们的存活状态。

实践

使用 go 语言来演示常见的租约操作。

go
func main() {    // etcd连接    cli, err := clientV3.New(clientV3.Config{        Endpoints:   []string{"192.168.11.10:2379"},        DialTimeout: 5 * time.Second,    })    if err != nil {        log.Fatal(err)    }    defer func(cli *clientV3.Client) {        _ = cli.Close()    }(cli)    // 申请租约    lease := clientV3.NewLease(cli)    leaseResp, err := lease.Grant(context.TODO(), 5) // TTL设置为5秒    if err != nil {        log.Fatalf("申请租约失败: %v", err)    }    fmt.Printf("租约创建成功, 租约ID: %v\n", leaseResp.ID)    // 将租约附加到键值对上    if _, err = cli.Put(context.TODO(), "username", "abc", clientV3.WithLease(leaseResp.ID)); err != nil {        log.Fatalf("设置键值对失败: %v", err)    }    fmt.Println("键值对设置成功,并附加了租约")    // 续约租约    keepAliveChan, err := lease.KeepAlive(context.Background(), leaseResp.ID)    if err != nil {        log.Fatalf("租约续约失败: %v", err)    }    // 在单独的goroutine中处理续约响应    go func() {        for {            select {            case ka, ok := <-keepAliveChan:                if !ok {                    fmt.Println("租约续约通道关闭,租约可能已经失效")                    return                } else if ka == nil {                    fmt.Println("租约已失效")                    return                } else {                    fmt.Printf("收到续约响应: 租约ID=%v, TTL=%d\n", ka.ID, ka.TTL)                }            }        }    }()    // 为了展示租约的工作过程,等待10秒    time.Sleep(10 * time.Second)}
  • lease.KeepAlive方法的作用是自动维护租约的活跃状态。当调用这个方法时,etcd 客户端会周期性地向 etcd 集群发送“续约”请求,这个周期通常比租约的 TTL 小很多,以确保即使某个续约请求失败,租约也不会意外过期。

  • lease.KeepAlive方法返回一个通道(keepAliveChan),这个通道用于接收续约响应。etcd 客户端会在后台监听这个通道,每当有新的续约响应到来时,它会将响应发送到这个通道,只要读取这个通道就能知道续约的结果。

  • 这里的 select 语句是非阻塞的,因为它在一个循环里。如果keepAliveChan中没有消息,循环就会继续等待,直到有消息到来。如果通道关闭或者出现错误、循环就会退出。

代码的运行结果如下:

text
PS D:\GolandProjects\etcd\lease> go run main.go租约创建成功, 租约ID: 7587875183241020472键值对设置成功,并附加了租约收到续约响应: 租约ID=7587875183241020472, TTL=5收到续约响应: 租约ID=7587875183241020472, TTL=5收到续约响应: 租约ID=7587875183241020472, TTL=5收到续约响应: 租约ID=7587875183241020472, TTL=5收到续约响应: 租约ID=7587875183241020472, TTL=5

从输出结果来看,大约两秒钟收到一次续约响应。其实这个行为与 etcd 客户端的内部实现有关。在 etcd 中,客户端会根据所提供的 TTL 值自动决定续约间隔。etcd 客户端尝试在租约过期前续约,通常是租约TTL的1/3到1/2。这是为了确保网络延迟或短暂的服务中断不会导致租约无法及时续约而意外过期。

因此,这个续约间隔并不是固定的,它会随着设置的 TTL 的大小而变化,以保持上述比例。并且这个机制是自动的,无需开发者手动干预。