上一节我们将消息数据存储到了 GraceMQ, 本节我们将利用消费者对消息进行消费,在获得到消息的时候我们将消息写入到数据库 :
package main
import (
"fmt"
"godemo/configs"
"runtime"
"time"
"github.com/cnlesscode/gotool/db"
"github.com/cnlesscode/graceMessageQueueClient/client"
)
// 全局变量
var addr string = "192.168.1.100:8881,192.168.1.102:8881"
var graceMQConnetionPool *client.GraceMQConnectionPool
type Messages struct {
Id int `gorm:"column:id;primaryKey"`
Message string `gorm:"column:message"`
}
func main() {
// 观察协程
go func() {
for {
fmt.Printf("协程数 : %v\n", runtime.NumGoroutine())
time.Sleep(time.Second * 3)
}
}()
// 初始化 TCP 连接池
graceMQConnetionPool = client.Init(addr, 100)
// 初始化数据库连接池
db.Start(configs.DBConfig)
// 循环消费消息并写入数据
// 开启100个协程
for i := 1; i <= 100; i++ {
go func() {
// 执行循环语句实现循环消费
db := db.Init()
for {
res := graceMQConnetionPool.ConsumeMessage("topic1", "default")
if res.Errcode == 0 && res.Data != "" {
msg := Messages{
Message: res.Data,
}
err := db.Create(&msg).Error
if err != nil {
fmt.Printf("err: %v\n", err)
}
}
}
}()
}
for {
}
}
通过上面的代码,我们开启利用 go 语言开启 100 个协程对消息进行消费,并将消息存储到数据库,消费10万条数据过程可控( 可以根据服务器承载能力来设置并发消费者数量 ),结果零误差。
至此,一个聊天数据存储实战过程讲解完毕。