package com.njq.junit;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.text.SimpleDateFormat;
import java.util.Date;
public class RocketProducer {
public static void main(String[] args) throws InterruptedException {
// 需要一个producer group名字作为构造方法的参数,这里为producer1
DefaultMQProducer producer = new DefaultMQProducer("SELF_TEST_P_GROUP");
// 设置NameServer地址,此处应改为实际NameServer地址,多个地址之间用;分隔
// NameServer的地址必须有,但是也可以通过环境变量的方式设置,不一定非得写死在代码里
producer.setNamesrvAddr("111.11.11.111:9876");
producer.setInstanceName(String.valueOf(System.currentTimeMillis()));
// 解决rocketmq生产者发送消息超时
// 毫秒单位,重要的点!!!!!!!!!!!
producer.setSendMsgTimeout(15000);
// 为避免程序启动的时候报错,添加此代码,可以让rocketMq自动创建topickey
try {
producer.start();
} catch (MQClientException e1) {
e1.printStackTrace();
System.out.println("start失败:" + e1.toString());
}
try {
producer.createTopic("123", "abc-test", 4);
} catch (MQClientException e) {
e.printStackTrace();
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd hh:MM:sss");
try {
Message message = new Message("threezto-test", "tag", "key",
("美丽中国-2").getBytes(RemotingHelper.DEFAULT_CHARSET));
System.out.println("开始发送:" + sdf.format(new Date()));
SendResult sendResult = producer.send(message);
System.out.println("发送完成:" + sdf.format(new Date()));
System.out.println("发送的消息ID:" + sendResult.getMsgId() + "--- 发送消息的状态:" + sendResult.getSendStatus());
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
}
package com.njq.junit;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class QueueConsumer {
private static final Map<MessageQueue, Long> offsetTable = new HashMap<>();
public static void main(String[] args) throws Exception {
offsetTable.clear();
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("pullConsumer");
consumer.setNamesrvAddr("111.11.11.111:9876");
consumer.start();
try {
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
for (MessageQueue mq : mqs) {
// System.out.println("Consume from the queue: " + mq);
System.out.println("当前获取的消息的归属队列是: " + mq.getQueueId());
// if (mq.getQueueId() == 0) {
//System.out.println("我是从第1个队列获取消息的");
// long offset = consumer.fetchConsumeOffset(mq, true);
// PullResultExt pullResult
// =(PullResultExt)consumer.pull(mq,
// null, getMessageQueueOffset(mq), 32);
// 消息未到达默认是阻塞10秒,private long consumerPullTimeoutMillis =
// 1000 *
// 10;
PullResultExt pullResult = (PullResultExt) consumer.pullBlockIfNotFound(mq, null,
getMessageQueueOffset(mq), 1);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> messageExtList = pullResult.getMsgFoundList();
for (MessageExt m : messageExtList) {
System.out.println(m.getKeys());
System.out.println("收到了消息:" + new String(m.getBody()));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break;
case OFFSET_ILLEGAL:
break;
default:
break;
}
}
// }
} catch (MQClientException e) {
e.printStackTrace();
}
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
offsetTable.put(mq, offset);
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = offsetTable.get(mq);
if (offset != null)
return offset;
return 0;
}
}
package com.njq.junit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class RocketConsumer {
public static void main(String[] args) throws MQClientException {
//设置消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_LRW_DEV_SUBS");
consumer.setVipChannelEnabled(false);
String ADDR = "111.11.11.111:9876";
consumer.setNamesrvAddr(ADDR);
//设置消费者端消息拉取策略,表示从哪里开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置消费者拉取消息的策略,*表示消费该topic下的所有消息,也可以指定tag进行消息过滤
consumer.subscribe("threezto-test", "*");
//用此方式可以订阅topic下的tag1和tag2和tag3的消息
// consumer.subscribe("threezto-test", "tag1||tag2||tag3");
// consumer.setMessageModel(MessageModel.BROADCASTING);
//消费者端启动消息监听,一旦生产者发送消息被监听到,就打印消息,和rabbitmq中的handlerDelivery类似
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
String topic = messageExt.getTopic();
String tag = messageExt.getTags();
String msg = new String(messageExt.getBody());
System.out.println("*********************************");
System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + msg + ", tag:" + tag + ", topic:" + topic);
System.out.println("*********************************");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//调用start()方法启动consumer
consumer.start();
System.out.println("Consumer Started....");
}
private static void custom(){
}
}