GO语言版本客户端工具

FirstMQ GO 客户端工具

我们为您准备了GO语言的客户端工具包 firstMQClient,工具包核心是实现一个 TCP 连接池,以此为基础,您可以快速实现对firstMQ的操作。

安装 firstMQClient

go get github.com/cnlesscode/firstMQClient

功能实现

1. 全局初始化连接池

New 函数参数

1. 集群接口地址 ( 主服务器的 8803 端口 )
2. 连接池容量( 初始化连 TCP 接数量 )
3. 用途( 字符串形式,用于单例模式实例化识别)

示例 :

mqPool, err := New(addr, 2, "CreateTopic")
if err != nil {
	panic(err.Error())
}

2. 查看话题

// go test -v -run=TestTopicList
func TestTopicList(t *testing.T) {
	mqPool, err := New(addr, 1, "test")
	if err != nil {
		panic(err.Error())
	}
	response, err := mqPool.Send(Message{
		Action: 4,
	})
	if err != nil {
		fmt.Printf("err: %v\n", err)
	} else {
		fmt.Printf("response.Data: %v\n", response.Data)
	}
}

3创建话题

// 创建话题
// go test -v -run=TestCreateATopic
func TestCreateATopic(t *testing.T) {
	// 创建话题
	mqPool, err := New(addr, 2, "CreateTopic")
	if err != nil {
		panic(err.Error())
	}
	response, err := mqPool.Send(Message{Action: 3, Topic: "test"})
	if err != nil {
		fmt.Printf("err: %v\n", err)
	} else {
		fmt.Printf(response.Data)
	}
}

4. 生产消息 - 单条

// go test -v -run=TestProductAMessage
func TestProductAMessage(t *testing.T) {
	mqPool, err := New(addr, 1, "ProductAMessage")
	if err != nil {
		panic(err.Error())
	}
	response, err := mqPool.Send(Message{
		Action: 1,
		Topic:  "test",
		Data:   []byte("a test message ..."),
	})
	if err != nil {
		fmt.Printf("err: %v\n", err)
	} else {
		fmt.Printf(response.Data)
	}
	for {
		time.Sleep(time.Second * 5)
	}
}

5. 生产消息 - 批量

// go test -v -run=TestProductMessages
func TestProductMessages(t *testing.T) {
	mqPool, err := New(addr, 1000, "ProductMessages")
	if err != nil {
		panic(err.Error())
	}
	go func() {
		for {
			time.Sleep(time.Second * 5)
			fmt.Printf("协程数 : %v\n", runtime.NumGoroutine())
		}
	}()
	// 循环批量生产消息
	for i := 0; i < 10; i++ {
		wg := sync.WaitGroup{}
		// 开始1w个协程,并发写入
		for ii := 1; ii <= 10000; ii++ {
			n := i*10000 + ii
			wg.Add(1)
			go func(iin int) {
				defer wg.Done()
				mqPool.Send(Message{
					Action: 1,
					Topic:  "test",
					Data:   []byte(strconv.Itoa(iin) + " test message ..."),
				})
			}(n)
		}
		wg.Wait()
		fmt.Printf("第%v次写入完成\n", i+1)
	}
	// 死循环
	for {
		time.Sleep(time.Second * 5)
	}
}

6. 消费消息

// go test -v -run=TestConsumeMessage
func TestConsumeMessage(t *testing.T) {
	mqPool, err := New(addr, 100, "ConsumeMessage")
	if err != nil {
		panic(err.Error())
	}
	mp := make(map[string]int, 0)
	step := 1
	for {
		response, err := mqPool.Send(Message{
			Action:        2,
			Topic:         "test",
			ConsumerGroup: "default",
		})
		if err != nil {
			fmt.Printf("err: %v\n", err)
			fmt.Printf("len(mp): %v\n", len(mp))
			time.Sleep(time.Second * 10)
		} else {
			fmt.Printf("step: %v\n", step)
			fmt.Printf("response.Data: %v\n", response.Data)
			mp[response.Data] = 1
			step++
		}
	}
}

7. 创建消费者组

// go test -v -run=TestCreateConsumeGroup
func TestCreateConsumeGroup(t *testing.T) {
	mqPool, err := New(addr, 1, "ConsumeMessage")
	if err != nil {
		panic(err.Error())
	}
	response, err := mqPool.Send(Message{
		Action:        7,
		Topic:         "test",
		ConsumerGroup: "default",
	})
	if err != nil {
		fmt.Printf("err: %v\n", err)
	} else {
		fmt.Printf(response.Data)
	}
}

8. 查看集群服务器列表

// go test -v -run=TestServerList
func TestServerList(t *testing.T) {
	mqPool, err := New(addr, 1, "ConsumeMessage")
	if err != nil {
		panic(err.Error())
	}
	response, err := mqPool.Send(Message{
		Action: 10,
	})
	if err != nil {
		fmt.Printf("err: %v\n", err)
	} else {
		fmt.Printf("response.Data: %v\n", response.Data)
		list := firstKV.FirstMQAddrs{}
		err := json.Unmarshal([]byte(response.Data), &list)
		if err == nil {
			fmt.Printf("list: %v\n", list)
		} else {
			fmt.Printf("not ok")
		}
	}
}