java 连接ACTIVE MQ 进行队列读写操作的例子
By:Roy.LiuLast updated:2016-04-14
MQ 用过好多年了,不过以前用 WEBSPHERE MQ 比较多,IBM 的,商业版。其实开源的 ACTIVE MQ 也算不错了,最近刚好用到,写了个简单的测试例子,在项目中用到的测试例子而已。
1. 向队列中保存消息
2. 从队列中获取消息
向队列中保存消息
从队列中获取消息
例子如下:
这个进程不断的从队列中获取数据, 最简单的方式实现进程间的通信。说白了,MQ 也是ESB 的核心。
1. 向队列中保存消息
2. 从队列中获取消息
向队列中保存消息
public class SaveMsg { static final String BROKER_URL = "tcp://192.168.1.111:61616"; static ActiveMQConnectionFactory factory; protected static final Logger log = LogManager.getLogger(SaveMsg.class); static { factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL); factory.setExceptionListener( new ExceptionListener() { @Override public void onException(JMSException ex) { log.warn("监听到ACTIVEMQ JSM 异常" + ex); } } ); } final ExecutorService es = Executors.newSingleThreadExecutor(); String _DESTINATION; public static void main(String[] args) throws Exception { SaveMsg mi = new SaveMsg(); for (int i=0; i<100; i++) { mi.sendRoomChat(8888, 80001, 80001, "收到来自android的测试信息" + i + ""); log.debug("send message,index is :" + i); } mi.conn.close(); } public SaveMsg() throws Exception { _initConn(); } public static boolean stat = true; private boolean isRunning() { return stat; } Connection conn; private void _initConn() throws JMSException { conn = factory.createConnection(); conn.start(); } /** * * @param roomId 房间ID * @param srcId 消息发送ID * @param toId 消息接收者 * @param data 聊天消息 */ public void sendRoomChat(int roomId, int srcId, int toId, String data) { Session session; try { session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建一个消息队列 Destination destination = session.createQueue("live.test.rsp");//amq.projId.mode.trans // 创建消息制作者 MessageProducer producer = session.createProducer(destination); {//非持久 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } MapMessage msg = session.createMapMessage(); {//数据封装 msg.setInt("relay_cmd_id", 17);// msg.setInt("vcbid", roomId);//房间ID msg.setInt("srcid", srcId);//当前用户ID msg.setInt("toid", toId);//目标用户ID msg.setInt("msgtype", 0);//文字聊天消息 msg.setInt("isprivate", 0);//是否私聊(1为私聊) msg.setString("content", data); } log.debug("android发送给pc的消息内容为:"+msg); producer.send(msg); session.close(); } catch (JMSException ex) { log.warn("send android message to pc error:", ex); } } }
从队列中获取消息
例子如下:
import java.util.Enumeration; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.QueueBrowser; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.advisory.DestinationSource; /** * * @author Administrator */ public class ReadMsg { public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.111:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); //第一种情况 System.out.println("***********************"); while (true) { Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("live.test.rsp"); MessageConsumer consumer = session.createConsumer(destination); MapMessage message = (MapMessage) consumer.receive(); //session.commit(); System.out .println("收到消息:" +message.getString("content")); session.close(); } } }
这个进程不断的从队列中获取数据, 最简单的方式实现进程间的通信。说白了,MQ 也是ESB 的核心。
From:一号门
Previous:Spring4 + Quartz Scheduler 执行定时任务例子
COMMENTS