java 连接ACTIVE MQ 进行队列读写操作的例子

MQ 用过好多年了,不过以前用 WEBSPHERE MQ 比较多,IBM 的,商业版。其实开源的 ACTIVE MQ 也算不错了,最近刚好用到,写了个简单的测试例子,在项目中用到的测试例子而已。
1. 向队列中保存消息
2. 从队列中获取消息

向队列中保存消息
程序代码 程序代码

public class SaveMsg {

    static final String BROKER_URL = "tcp://192.168.1.111:61616";
    static ActiveMQConnectionFactory factory;
    protected static final Logger log = LogManager.getLogger(SaveMsg.class);
    

    static {
        factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
        factory.setExceptionListener(
                new ExceptionListener() {
                    @Override
                    public void onException(JMSException ex) {
                       log.warn("监听到ACTIVEMQ JSM 异常" + ex);
                    }
                }
        );
    }

    final ExecutorService es = Executors.newSingleThreadExecutor();
    String _DESTINATION;
    

    public static void main(String[] args) throws Exception {
        SaveMsg mi = new SaveMsg();      
        for (int i=0; i<100; i++) {            
            mi.sendRoomChat(8888, 80001, 80001, "<FONT style=\\\"FONT-FAMILY:宋体;FONT-SIZE:12px; COLOR:#000000\\\">收到来自android的测试信息" + i + "</FONT>");  
            log.debug("send message,index is :" + i);
        }
        mi.conn.close();
        
      
    }

    public SaveMsg() throws Exception {
        _initConn();
      
    }
  
    public static boolean stat = true;

    private boolean isRunning() {
        return stat;
    }
    Connection conn;

    private void _initConn() throws JMSException {
        conn = factory.createConnection();
        conn.start();
    }
  
    /**
     *
     * @param roomId 房间ID
     * @param srcId 消息发送ID
     * @param toId 消息接收者
     * @param data 聊天消息
     */
    public void sendRoomChat(int roomId, int srcId, int toId, String data) {
        Session session;
        try {
            session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列
            Destination destination = session.createQueue("live.test.rsp");//amq.projId.mode.trans
            // 创建消息制作者
            MessageProducer producer = session.createProducer(destination);
            {//非持久
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            }
            MapMessage msg = session.createMapMessage();
            {//数据封装
                msg.setInt("relay_cmd_id", 17);//
                msg.setInt("vcbid", roomId);//房间ID
                msg.setInt("srcid", srcId);//当前用户ID
                msg.setInt("toid", toId);//目标用户ID
                msg.setInt("msgtype", 0);//文字聊天消息
                msg.setInt("isprivate", 0);//是否私聊(1为私聊)
                msg.setString("content", data);
            }
            log.debug("android发送给pc的消息内容为:"+msg);
            producer.send(msg);
            session.close();
        } catch (JMSException ex) {
            log.warn("send android message to pc error:", ex);
        }
    }

    
}


从队列中获取消息
例子如下:
程序代码 程序代码


import java.util.Enumeration;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.DestinationSource;

/**
*
* @author Administrator
*/
public class ReadMsg {
    
      
    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.111:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
      
        //第一种情况
        System.out.println("***********************");
        while (true) {
            Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("live.test.rsp");
            MessageConsumer consumer = session.createConsumer(destination);
            MapMessage message = (MapMessage) consumer.receive();
            //session.commit();
            
            System.out .println("收到消息:" +message.getString("content"));
            session.close();            
        }
      
    }
    
}



这个进程不断的从队列中获取数据, 最简单的方式实现进程间的通信。说白了,MQ 也是ESB 的核心。

除非申明,文章均为一号门原创,转载请注明本文地址,谢谢!
文章来自: 本站原创
引用通告: 查看所有引用 | 我要引用此文章
Tags: active mq
相关日志:
评论: 0 | 引用: 0 | 查看次数: -
发表评论
昵 称:
密 码: 游客发言不需要密码.
内 容:
验证码: 验证码
选 项:
虽然发表评论不用注册,但是为了保护您的发言权,建议您注册帐号.