利用消费者将消息保存到数据库

上一节我们将消息数据存储到了 GraceMQ, 本节我们将利用消费者对消息进行消费,在获得到消息的时候我们将消息写入到数据库 :

package main

import (
	"fmt"
	"godemo/configs"
	"runtime"
	"time"

	"github.com/cnlesscode/gotool/db"
	"github.com/cnlesscode/graceMessageQueueClient/client"
)

// 全局变量
var addr string = "192.168.1.100:8881,192.168.1.102:8881"
var graceMQConnetionPool *client.GraceMQConnectionPool

type Messages struct {
	Id      int    `gorm:"column:id;primaryKey"`
	Message string `gorm:"column:message"`
}

func main() {
	// 观察协程
	go func() {
		for {
			fmt.Printf("协程数 : %v\n", runtime.NumGoroutine())
			time.Sleep(time.Second * 3)
		}
	}()

	// 初始化 TCP 连接池
	graceMQConnetionPool = client.Init(addr, 100)
	// 初始化数据库连接池
	db.Start(configs.DBConfig)

	// 循环消费消息并写入数据
	// 开启100个协程
	for i := 1; i <= 100; i++ {
		go func() {
			// 执行循环语句实现循环消费
			db := db.Init()
			for {
				res := graceMQConnetionPool.ConsumeMessage("topic1", "default")
				if res.Errcode == 0 && res.Data != "" {
					msg := Messages{
						Message: res.Data,
					}
					err := db.Create(&msg).Error
					if err != nil {
						fmt.Printf("err: %v\n", err)
					}
				}
			}
		}()
	}

	for {
	}
}

总结

通过上面的代码,我们开启利用 go 语言开启 100 个协程对消息进行消费,并将消息存储到数据库,消费10万条数据过程可控( 可以根据服务器承载能力来设置并发消费者数量 ),结果零误差。

至此,一个聊天数据存储实战过程讲解完毕。