Consume Messages with SDK
<p class="shortdesc">After messages are sent successfully, consumers need to consume messages. This article describes how to consume messages with SDK by taking an example of calling Java SDK in TCP protocol.</p>
<section><div class="tasklabel"><h2 class="doc-tairway">Procedure</h2></div><ol class="ol steps"><li class="li step stepexpand">
<span class="ph cmd">Set the relevant parameters according to the following instructions and run the
sample code to consume messages.</span>
<div class="itemgroup info">
<pre class="pre codeblock"><code>public static Consumer initConsumer() throws FCException {
// Prepare required parameters.
Properties properties = new Properties();
// The business system can take out the properties value from the configuration file and write it in the demo.
properties.setProperty(FCConstant.NAME_SERVER_ADDRESS,"NAME_SERVER_ADDR-demo");// Must configure, environment IP address.
properties.setProperty(FCConstant.CONSUMER_ID, Constant.CID);// Must configure, and create it in the console.
// Set ak/sk, get it from the console.
properties.setProperty(FCConstant.ACCESS_KEY, "ACCESSKEY-demo");
properties.setProperty(FCConstant.SECRET_KEY, "SECRETKEY-demo");
properties.setProperty(FCConstant.INSTANCE_NAME, UUID.randomUUID().toString());//The INSTANCE_NAME properties as host ip+random string.
properties.setProperty("CONSUMER_THREAD_NUM", "10");
Consumer consumer = FCFactory.createConsumer(properties);
// Set the time when the message is consumed for the first time. Here, the setting starts from the last message.
// If you do not set this value, messages are consumed from the start of the queue by default.
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
return consumer;
}
public static void subscribe(final Consumer consumer, FCMessageFilter messageFilter, String topic) throws FCException {
final SimpleDateFormat smf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss sss");
// Subscribe to topic messages (the topic needs to be created in the console).
consumer.subscribe(topic, messageFilter, new FCMessageListener() {
// Under the PUSH consumption mode, the client package will start a backend thread to continuously pull messages from MQCP (quasi-real-time mode, millisecond delay).
// After the business system receives the message, it needs to implement FCMessageListener to process the message.
@Override
public FCConsumeStatus pushMessage(List<FCMessage> messageList) {
// After listening to the arrival of a message, the business system needs to traverse the messageList object to obtain the message.
// The default size of messageList is 1, and the messages are pushed to the client one by one.
for (FCMessage msg : messageList) {
// MessageList FCMessage object.
// It is recommended that the business system decouple the logic of pulling and processing messages from the messaging platform.
// Only messages are listened to in the consumer listener. It is recommended to simply parse and store the obtained messages, and then return.
// FCConsumeStatus.CONSUME_OK
// The backend can process the received messages asynchronously.
log.info(consumer+"--------【{}】:{},\ntime :",new Object[]{new String(msg.getConent()),msg.getMsgId(),smf.format(new Date())});
}
return FCConsumeStatus.CONSUME_OK;
}
});
}
</code></pre>
</div>
</li><li class="li step stepexpand">
<span class="ph cmd">You can check the consumer status in the console. If the consumer is online, the consumer has successfully started. For more information, see <a class="xref" href="https://pinganyun.com/ssr/help/middleware/pamq/manual.common_operations.subscribe_manage.consumer_state" target="_blank">Consumer Status</a>.</span>
</li></ol></section>
Did the above content solve your problem?
Yes
No
Submitted successfully! Thank you for your feedback, we will try our best to do better and better!