温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

ActiveMQ要入门什么

发布时间:2021-10-20 09:16:09 来源:亿速云 阅读:118 作者:柒染 栏目:大数据

本篇文章给大家分享的是有关ActiveMQ要入门什么,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

1. 发布消息

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class BookProducer implements Runnable{
    public static final String BROKER_URL = "tcp://localhost:61616";
    @Override
    public void run() {
        try {
            //1.创建连接工厂,指定ip和端口
            ConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
            //2.使用连接工厂创建一个连接对象
            Connection connection = factory.createConnection();
            //3.开启连接(JMS会话)
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //使用会话创建目的地
            /**
             * ① 点对点(Point-to-Point)。在点对点的消息系统中,消息分发给一个单独的使用者。点对点消息往往与队列(javax.jms.Queue)相关联。
             * ② 发布/订阅(Publish/Subscribe)。发布/订阅消息系统支持一个事件驱动模型,消息生产者和消费者都参与消息的传递。生产者发布事件,
                  而使用者订阅感兴趣的事件,并使用事件。该类型消息一般与特定的主题(javax.jms.Topic)关联。
             */
            Destination destination = session.createQueue("book-broker")
            //创建生产者/消费者
            MessageProducer producer = session.createProducer(destination);
            // MessageConsumer consumer = session.createConsumer(destination);
            //consumer.receive();
            /**
             * 创建消息,支持的消息类型:
             *  TextMessage
             *  MapMessage
             *  ObjectMessage:对象需要实现序列化接口
             *  BytesMessage
             *  StreamMessage
             */
            Message message = session.createTextMessage("我是一个香蕉.......");
            //发送消息
            producer.send(message);
            //释放资源
            producer.close();
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

2. 接收消息

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;

public class BookConsumer implements Runnable {
    @Override
    public void run() {
        try {
            var connection = new ActiveMQConnectionFactory(BookProducer.BROKER_URL).createConnection();
            connection.start();
            /**
             * connection.createSession(boolean transacted, int acknowledgeMode);
             * transacted:是否使用事务
             * acknowledgeMode:应答模式
             *     AUTO_ACKNOWLEDGE:自动应答
             *          对于同步消费者,receive方法调用返回,且没有异常发生时,将自动对收到的消息予以确认.
             *          对于异步消息,当onMessage方法返回,且没有异常发生时,即对收到的消息自动确认.
             *     CLIENT_ACKNOWLEDGE:客户端手动应答
             *          这种方式要求客户端使用javax.jms.Message.acknowledge()方法完成确认.
             *     DUPS_OK_ACKNOWLEDGE:延时//批量通知
             *          这种确认方式允许JMS不必急于确认收到的消息,允许在收到多个消息之后一次完成确认,
                       与Auto_AcKnowledge相比,这种确认方式在某些情况下可能更有效,
                       因为没有确认,当系统崩溃或者网络出现故障的时候,消息可以被重新传递.
             *     使用事务消息确认模式:
             *     SESSION_TRANSACTED
             */
            var session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            var consumer = session.createConsumer(session.createQueue("tmall-queue"));
            var message = ((TextMessage)consumer.receive()).getText();
            System.out.println(message);
            session.close();
            connection.close();
    }
}

      或者设置监听器接收(消费者不用一直在线,监听到消息自动接收)

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;

public class BookConsumer implements Runnable {
    @Override
    public void run() {
        try {
            var connection = new ActiveMQConnectionFactory(BookProducer.BROKER_URL).createConnection();
            connection.start();
            var session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            var consumer = session.createConsumer(session.createQueue("tmall-queue"));
            consumer.setMessageListener(message -> {
                try {
                    System.out.println(((TextMessage) message).getText());
                }catch (JMSException e){
                    e.printStackTrace();
                }
            });
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

3. java内嵌ActiveMQ,自动启动一个ActiveMQ,不需要Linux启动

import org.apache.activemq.broker.BrokerService;

public class Broker {
    //导入依赖compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.9'
    public void producter(){
        BrokerService brokerService = new BrokerService();
        brokerService.setUseJmx(true);//设置Broker的服务是否应该公开给JMX
        try {
            brokerService.addConnector("tcp://localhost:61616");
            brokerService.start(); 
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

以上就是ActiveMQ要入门什么,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI