Equipment access

Create the product, select the parameters (read the documentation for the parameters), use either the default topic or the custom topic, and then select the feature definition (the custom Topic model). The custom topic may not use the object model, but you need to select the data parsing, add the parsing script, and submit and publish the device. When reporting data using a custom topic, add it at the end of the topic link? _sn=default flag, which the iot platform will call parsing script to format the reported bytecode data and return a JSON to be consumed by the business service area in this topic. The device developer can download the SDK and certificate to the device by themselves, and also report data according to the way in which the MQTT module of Node is used to simulate the device in the document for the convenience of testing. Testing can also be done using on-line testing in monitoring operations or equipment simulations.

Rules engines flow data

The data reported by the device needs to be transferred to the business service or other storage for business analysis or operation. At this time, the rule engine is needed to be transferred, which can be generally divided into two categories. One is the way that the server subscribes to AMQP flow and MNS, and the other is transferred to other specified cloud products, such as RokcetMQ, RDS, DataHub, etc Connections to ecS cannot be transferred to cloud products other than the specified cloud products.

Create a consumer group, create a subscription, select the type of report, that is, choose which topic to report. The next step is to integrate the client.

Rely on

<! -- amqp 1.0 qPID client --> <dependency> <groupId>org.apache.qpid</groupId> <artifactId> qpID-jM-client </artifactId> The < version > 0.47.0 < / version > < / dependency > <! -- util for base64--> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> The < version > 1.10 < / version > < / dependency >Copy the code

Client code, preferably using the accesskey of the child account.

import java.util.Hashtable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmqpJavaClientDemo {

    private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClientDemo.class);

    private final static ExecutorService executorService = new ThreadPoolExecutor(
        Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
        new LinkedBlockingQueue(50000));

    public static void main(String[] args) throws Exception {
        String accessKey = "${YourAccessKey}";
        String accessSecret = "${YourAccessSecret}";
        String consumerGroupId = "${YourConsumerGroupId}";
        String iotInstanceId = "${YourIotInstanceId}";
        long timeStamp = System.currentTimeMillis();
        String signMethod = "hmacsha1";
        String clientId = "${YourClientId}";

        String userName = clientId + "|authMode=aksign"
            + ",signMethod=" + signMethod
            + ",timestamp=" + timeStamp
            + ",authId=" + accessKey
            + ",iotInstanceId=" + iotInstanceId
            + ",consumerGroupId=" + consumerGroupId
            + "|";
        String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;
        String password = doSign(signContent,accessSecret, signMethod);
        String connectionUrl = "failover:(amqps://${YourHost}:5671?amqp.idleTimeout=80000)"
            + "?failover.reconnectDelay=30";

        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("queue.QUEUE", "default");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
        Destination queue = (Destination)context.lookup("QUEUE");
        // 创建连接。
        Connection connection = cf.createConnection(userName, password);
        ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
        // 创建会话。
        // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
        // Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建Receiver连接。
        MessageConsumer consumer = session.createConsumer(queue);

    private static MessageListener messageListener = new MessageListener() {
        public void onMessage(final Message message) {
            try {
                // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
                // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
                // message.acknowledge();
                // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
                executorService.submit(new Runnable() {
                    public void run() {
            } catch (Exception e) {
                logger.error("submit task occurs exception ", e);

     * 在这里处理您收到消息后的具体业务逻辑。
    private static void processMessage(Message message) {
        try {
            byte[] body = message.getBody(byte[].class);
            String content = new String(body);
            String topic = message.getStringProperty("topic");
            String messageId = message.getStringProperty("messageId");
  "receive message"
                + ",\n topic = " + topic
                + ",\n messageId = " + messageId
                + ",\n content = " + content);
        } catch (Exception e) {
            logger.error("processMessage occurs error ", e);

    private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
         * 连接成功建立。
        public void onConnectionEstablished(URI remoteURI) {
  "onConnectionEstablished, remoteUri:{}", remoteURI);

         * 尝试过最大重试次数之后,最终连接失败。
        public void onConnectionFailure(Throwable error) {
            logger.error("onConnectionFailure, {}", error.getMessage());

         * 连接中断。
        public void onConnectionInterrupted(URI remoteURI) {
  "onConnectionInterrupted, remoteUri:{}", remoteURI);

         * 连接中断后又自动重连上。
        public void onConnectionRestored(URI remoteURI) {
  "onConnectionRestored, remoteUri:{}", remoteURI);

        public void onInboundMessage(JmsInboundMessageDispatch envelope) {}

        public void onSessionClosed(Session session, Throwable cause) {}

        public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}

        public void onProducerClosed(MessageProducer producer, Throwable cause) {}

     * 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
    private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
        SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
        Mac mac = Mac.getInstance(signMethod);
        byte[] rawHmac = mac.doFinal(toSignString.getBytes());
        return Base64.encodeBase64String(rawHmac);
