Consume Messages with SDK

<p class="shortdesc">After messages are sent successfully, consumers need to consume messages. This article describes how to consume messages with SDK by taking an example of calling Java SDK in TCP protocol.</p> <section><div class="tasklabel"><h2 class="doc-tairway">Procedure</h2></div><ol class="ol steps"><li class="li step stepexpand"> <span class="ph cmd">Set the relevant parameters according to the following instructions and run the sample code to consume messages.</span> <div class="itemgroup info"> <pre class="pre codeblock"><code>public static Consumer initConsumer() throws FCException { // Prepare required parameters. Properties properties = new Properties(); // The business system can take out the properties value from the configuration file and write it in the demo. properties.setProperty(FCConstant.NAME_SERVER_ADDRESS,"NAME_SERVER_ADDR-demo");// Must configure, environment IP address. properties.setProperty(FCConstant.CONSUMER_ID, Constant.CID);// Must configure, and create it in the console. // Set ak/sk, get it from the console. properties.setProperty(FCConstant.ACCESS_KEY, "ACCESSKEY-demo"); properties.setProperty(FCConstant.SECRET_KEY, "SECRETKEY-demo"); properties.setProperty(FCConstant.INSTANCE_NAME, UUID.randomUUID().toString());//The INSTANCE_NAME properties as host ip+random string. properties.setProperty("CONSUMER_THREAD_NUM", "10"); Consumer consumer = FCFactory.createConsumer(properties); // Set the time when the message is consumed for the first time. Here, the setting starts from the last message. // If you do not set this value, messages are consumed from the start of the queue by default. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); return consumer; } public static void subscribe(final Consumer consumer, FCMessageFilter messageFilter, String topic) throws FCException { final SimpleDateFormat smf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss sss"); // Subscribe to topic messages (the topic needs to be created in the console). consumer.subscribe(topic, messageFilter, new FCMessageListener() { // Under the PUSH consumption mode, the client package will start a backend thread to continuously pull messages from MQCP (quasi-real-time mode, millisecond delay). // After the business system receives the message, it needs to implement FCMessageListener to process the message. @Override public FCConsumeStatus pushMessage(List<FCMessage> messageList) { // After listening to the arrival of a message, the business system needs to traverse the messageList object to obtain the message. // The default size of messageList is 1, and the messages are pushed to the client one by one. for (FCMessage msg : messageList) { // MessageList FCMessage object. // It is recommended that the business system decouple the logic of pulling and processing messages from the messaging platform. // Only messages are listened to in the consumer listener. It is recommended to simply parse and store the obtained messages, and then return. // FCConsumeStatus.CONSUME_OK // The backend can process the received messages asynchronously. log.info(consumer+"--------【{}】:{},\ntime :",new Object[]{new String(msg.getConent()),msg.getMsgId(),smf.format(new Date())}); } return FCConsumeStatus.CONSUME_OK; } }); } </code></pre> </div> </li><li class="li step stepexpand"> <span class="ph cmd">You can check the consumer status in the console. If the consumer is online, the consumer has successfully started. For more information, see <a class="xref" href="https://pinganyun.com/ssr/help/middleware/pamq/manual.common_operations.subscribe_manage.consumer_state" target="_blank">Consumer Status</a>.</span> </li></ol></section>
Did the above content solve your problem? Yes No
Please complete information!

Call us

400-151-8800

Email us

cloud@pingan.com

Online customer service

Instant reply

Technical Support

cloud products