<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
Kafka 是一種高吞吐量的分散式釋出訂閱訊息系統,它可以處理消費者規模的網站中的所有動作流資料,具有高效能、持久化、多副本備份、橫向擴充套件等特點。本文介紹瞭如何使用 Go 語言傳送和接收 kafka 訊息。
Go 語言中連線 kafka 使用第三方庫:github.com/Shopify/sar…。
go get github.com/Shopify/sarama
sarama
v1.20 之後的版本加入了zstd
壓縮演演算法,需要用到 cgo,在 Windows 平臺編譯時會提示類似如下錯誤:
# github.com/DataDog/zstd exec: "gcc":executable file not found in %PATH%
所以在 Windows 平臺請使用 v1.19 版本的 sarama。
package main import ( "fmt" "github.com/Shopify/sarama" ) // 基於sarama第三方庫開發的kafka client func main() { config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll // 傳送完資料需要leader和follow都確認 config.Producer.Partitioner = sarama.NewRandomPartitioner // 新選出一個partition config.Producer.Return.Successes = true // 成功交付的訊息將在success channel返回 // 構造一個訊息 msg := &sarama.ProducerMessage{} msg.Topic = "web_log" msg.Value = sarama.StringEncoder("this is a test log") // 連線kafka client, err := sarama.NewSyncProducer([]string{"192.168.1.7:9092"}, config) if err != nil { fmt.Println("producer closed, err:", err) return } defer client.Close() // 傳送訊息 pid, offset, err := client.SendMessage(msg) if err != nil { fmt.Println("send msg failed, err:", err) return } fmt.Printf("pid:%v offset:%vn", pid, offset) }
package main import ( "fmt" "github.com/Shopify/sarama" ) // kafka consumer func main() { consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil) if err != nil { fmt.Printf("fail to start consumer, err:%vn", err) return } partitionList, err := consumer.Partitions("web_log") // 根據topic取到所有的分割區 if err != nil { fmt.Printf("fail to get list of partition:err%vn", err) return } fmt.Println(partitionList) for partition := range partitionList { // 遍歷所有的分割區 // 針對每個分割區建立一個對應的分割區消費者 pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest) if err != nil { fmt.Printf("failed to start consumer for partition %d,err:%vn", partition, err) return } defer pc.AsyncClose() // 非同步從每個分割區消費資訊 go func(sarama.PartitionConsumer) { for msg := range pc.Messages() { fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value) } }(pc) } }
這裡使用官方的etcd/clientv3包來連線etcd並進行相關操作。
go get go.etcd.io/etcd/clientv3
put
命令用來設定鍵值對資料,get
命令用來根據key獲取值。
package main import ( "context" "fmt" "time" "go.etcd.io/etcd/clientv3" ) func main(){ cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"122.51.79.172:2379"}, DialTimeout: 5 * time.Second, }) if err != nil { // handle error! fmt.Printf("connect to etcd failed, err:%vn", err) return } fmt.Println("connect to etcd success") defer cli.Close() // put ctx, cancel := context.WithTimeout(context.Background(), time.Second) _, err = cli.Put(ctx, "coolops", "test") cancel() if err != nil { fmt.Printf("put to etcd failed, err:%vn", err) return } // get ctx, cancel = context.WithTimeout(context.Background(), time.Second) resp, err := cli.Get(ctx, "coolops") cancel() if err != nil { fmt.Printf("get from etcd failed, err:%vn", err) return } for _, ev := range resp.Kvs { fmt.Printf("%s:%sn", ev.Key, ev.Value) } }
watch
用來獲取未來更改的通知。
package main import ( "context" "fmt" "time" "go.etcd.io/etcd/clientv3" ) func main(){ cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{"122.51.79.172:2379"}, DialTimeout: 5 * time.Second, }) if err != nil { // handle error! fmt.Printf("connect to etcd failed, err:%vn", err) return } fmt.Println("connect to etcd success") defer cli.Close() // watch 操作,返回的是一個通道 rch := cli.Watch(context.Background(), "coolops") // <-chan WatchResponse for wresp := range rch { for _, ev := range wresp.Events { fmt.Printf("Type: %s Key:%s Value:%sn", ev.Type, ev.Kv.Key, ev.Kv.Value) } } }
go: finding github.com/coreos/pkg latest # github.com/coreos/etcd/clientv3/balancer/resolver/endpoint E:DEVGopkgmodgithub.comcoreosetcd@v3.3.19+incompatibleclientv3balancerresolverendpointendpoint.go:114:78: undefined: resolver.BuildOption E:DEVGopkgmodgithub.comcoreosetcd@v3.3.19+incompatibleclientv3balancerresolverendpointendpoint.go:182:31: undefined: resolver.ResolveNowOption # github.com/coreos/etcd/clientv3/balancer/picker E:DEVGopkgmodgithub.comcoreosetcd@v3.3.19+incompatibleclientv3balancerpickererr.go:37:44: undefined: balancer.PickOptions E:DEVGopkgmodgithub.comcoreosetcd@v3.3.19+incompatibleclientv3balancerpickerroundrobin_balanced.go:55:54: undefined: balancer.PickOptions
解決: 將go.mod裡的prpc改為1.26.0版本
google.golang.org/grpc v1.26.0
以上就是Go操作Kafka和Etcd方法詳解的詳細內容,更多關於Go操作Kafka Etcd的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45