温馨提示×

Linux下Golang如何使用消息队列

小樊
38
2025-11-06 15:28:29
栏目: 编程语言

在Linux下,Golang可以使用多种消息队列技术。这里我们将介绍两种常用的消息队列:RabbitMQ和Kafka。

  1. RabbitMQ

RabbitMQ是一个基于AMQP协议的开源消息代理。要在Golang中使用RabbitMQ,你需要安装一个名为streadway/amqp的库。你可以使用以下命令安装:

go get github.com/streadway/amqp

以下是一个简单的示例,展示了如何在Golang中使用RabbitMQ发送和接收消息:

package main

import (
	"fmt"
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	q, err := ch.QueueDeclare(
		"hello", // name
		false,   // durable
		false,   // delete when unused
		false,   // exclusive
		false,   // no-wait
		nil,     // arguments
	)
	failOnError(err, "Failed to declare a queue")

	body := "Hello World!"
	err = ch.Publish(
		"",     // exchange
		q.Name, // routing key
		false,  // mandatory
		false,  // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	failOnError(err, "Failed to publish a message")
	fmt.Println(" [x] Sent", body)

	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")

	forever := make(chan bool)

	go func() {
		for d := range msgs {
			fmt.Printf("Received a message: %s\n", d.Body)
		}
	}()

	fmt.Println(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}
  1. Kafka

Kafka是一个分布式流处理平台,通常用于构建实时数据管道和流应用程序。要在Golang中使用Kafka,你需要安装一个名为confluentinc/confluent-kafka-go的库。你可以使用以下命令安装:

go get github.com/confluentinc/confluent-kafka-go/kafka

以下是一个简单的示例,展示了如何在Golang中使用Kafka发送和接收消息:

package main

import (
	"fmt"
	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
	broker := "localhost:9092"
	topic := "test"

	// Producer configuration
	p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})
	failOnError(err, "Failed to create producer")

	// Consumer configuration
	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": broker,
		"group.id":          "test-group",
		"auto.offset.reset": "earliest",
	})
	failOnError(err, "Failed to create consumer")

	// Subscribe to the topic
	err = c.SubscribeTopics([]string{topic}, nil)
	failOnError(err, "Failed to subscribe to topic")

	// Produce a message
	err = p.Produce(&kafka.Message{
		TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
		Value:          []byte("Hello, Kafka!"),
	}, nil)
	failOnError(err, "Failed to produce message")

	// Wait for messages
	n := 0
	for n < 1 {
		msg, err := c.ReadMessage(-1)
		failOnError(err, "Failed to read message")
		fmt.Printf("Received message: %s\n", string(msg.Value))
		n++
	}

	// Close producer and consumer
	p.Close()
	c.Close()
}

func failOnError(err error, msg string) {
	if err != nil {
		panic(fmt.Sprintf("%s: %s", msg, err))
	}
}

这些示例仅用于演示如何在Golang中使用RabbitMQ和Kafka。在实际应用中,你可能需要根据你的需求对这些示例进行修改。

0