数据AMQP方式推送

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)转储功能适用于生活物联网平台与企业服务器之间的消息流转。通过集成和使用AMQP SDK,即可实现身份认证、消息接收的能力。我们推荐使用AMQP的方式推送设备数据(如设备状态数据、设备控制记录等),用户信息数据等。

前提条件

开启设备数据同步,并配置要同步数据的产品。详细参见设置数据同步数据同步

说明

当开启数据同步后,集成AMQP客户端SDK来订阅数据。如果通过控制台关闭数据同步再开启时,客户端SDK需要重新进行连接流程,否则无法正常接收数据。如果切换不同的AMQP客户端,需要把之前的客户端断开,再连接新的客户端。否则新客户端无法正常接收数据。

AMQP SDK使用

  1. 引入依赖。

    AMQP SDK为开源SDK。如果您使用Java开发语言,推荐使用Apache Qpid JMS客户端。在项目中添加Maven依赖,Maven信息如下。

    说明

    目前生活物联网平台仅支持Java语言开发,暂未提供其他语言版本,请以生活物联网平台官网为主。

    <!-- amqp 1.0 qpid client -->
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-jms-client</artifactId>
       <version>0.47.0</version>
     </dependency>
     <!-- util for base64-->
     <dependency>
       <groupId>commons-codec</groupId>
      <artifactId>commons-codec</artifactId>
      <version>1.10</version>
    </dependency>           
  2. 认证身份信息。

    • 身份认证需要使用AppKey和AppSecret,该信息可以从控制台中获取。获取身份信息

    • 认证身份信息需要使用EndPoint、AppKey和AppSecret用于鉴权。

      其中,EndPoint是连接节点,具体取值如下表所示。

      区域

      End Point

      中国内地

      amqps://ilop.iot-amqp.cn-shanghai.aliyuncs.com:5671

      新加坡

      amqps://ilop.iot-amqp.ap-southeast-1.aliyuncs.com:5671

      美国(弗吉尼亚)

      amqps://ilop.iot-amqp.us-east-1.aliyuncs.com:5671

      德国(法兰克福)

      amqps://ilop.iot-amqp.eu-central-1.aliyuncs.com:5671

  3. 接收云端消息。

    首先需要创建消息接收的客户端对象client,并传入上面身份认证的profile信息。当消息接收的客户端和服务端建立连接后,服务端会立即向消息接收的客户端推送已订阅的消息,因此建立连接时需要提供默认消息接收的回调接口,用于处理云端推送的消息。

    完整代码示例如下。

    import java.net.URI;
    import java.util.Hashtable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import javax.crypto.Mac;
    import javax.crypto.spec.SecretKeySpec;
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.naming.Context;
    import javax.naming.InitialContext;
    import org.apache.commons.codec.binary.Base64;
    import org.apache.qpid.jms.JmsConnection;
    import org.apache.qpid.jms.JmsConnectionListener;
    import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class AmqpJavaClientDemo {
    
        private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClientDemo.class);
    
        //业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。
        private final static ExecutorService executorService = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(50000));
    
        public static void main(String[] args) throws Exception {
    
            String appKey = "${YourAppkey}"; //在控制台自有App,单击密钥对应的查看,可显示App的App Key和App Secret 创建App时系统
            String appSecret = "${YourAppSecret}";
            String consumerGroupId = "${YourAppkey}";
            long random = xxxxx;
            //建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。
            String clientId = "${YourClientId}";
    
            String userName = clientId + "|authMode=appkey"                 
                    + ",signMethod=" + "SHA256"
                    + ",random=" + random
                    + ",appKey=" + appKey
                    + ",groupId=" + consumerGroupId + "|";
            String signContent = "random=" + random;
            String password = doSign(signContent, appSecret, "HmacSHA256");
            String connectionUrlTemplate = "failover:(${AMQPEndPointUrl}?amqp.idleTimeout=80000)"
                    + "?failover.maxReconnectAttempts=10&failover.reconnectDelay=30";
    
            Hashtable<String, String> hashtable = new Hashtable<>();
            hashtable.put("connectionfactory.SBCF",connectionUrlTemplate);
            hashtable.put("queue.QUEUE", "default");
            hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
            Context context = new InitialContext(hashtable);
            ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
            Destination queue = (Destination)context.lookup("QUEUE");
            // 创建连接。
            Connection connection = cf.createConnection(userName, password);
            ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);
            // 创建会话。
            // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。
            // Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            connection.start();
            // 创建Receiver连接。
            MessageConsumer consumer = session.createConsumer(queue);
            consumer.setMessageListener(messageListener);
        }
    
        private static MessageListener messageListener = new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    //1.收到消息之后一定要ACK。
                    // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
                    // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
                    // message.acknowledge();
                    //2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
                    // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
                    executorService.submit(() -> processMessage(message));
                } catch (Exception e) {
                    logger.error("submit task occurs exception ", e);
                }
            }
        };
    
        /**
         * 在这里处理您收到消息后的具体业务逻辑。
         */
        private static void processMessage(Message message) {
            try {
                byte[] body = message.getBody(byte[].class);
                String content = new String(body);
                String topic = message.getStringProperty("topic");
                String messageId = message.getStringProperty("messageId");
                logger.info("receive message"
                    + ", topic = " + topic
                    + ", messageId = " + messageId
                    + ", content = " + content);
            } catch (Exception e) {
                logger.error("processMessage occurs error ", e);
            }
        }
    
        private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
            /**
             * 连接成功建立。
             */
            @Override
            public void onConnectionEstablished(URI remoteURI) {
                logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
            }
    
            /**
             * 尝试过最大重试次数之后,最终连接失败。
             */
            @Override
            public void onConnectionFailure(Throwable error) {
                logger.error("onConnectionFailure, {}", error.getMessage());
            }
    
            /**
             * 连接中断。
             */
            @Override
            public void onConnectionInterrupted(URI remoteURI) {
                logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
            }
    
            /**
             * 连接中断后又自动重连上。
             */
            @Override
            public void onConnectionRestored(URI remoteURI) {
                logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
            }
    
            @Override
            public void onInboundMessage(JmsInboundMessageDispatch envelope) {}
    
            @Override
            public void onSessionClosed(Session session, Throwable cause) {}
    
            @Override
            public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}
    
            @Override
            public void onProducerClosed(MessageProducer producer, Throwable cause) {}
        };
    
        /**
         * 计算签名,password组装方法,请参见AMQP客户端接入说明文档。
         */
        private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
            SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
            Mac mac = Mac.getInstance(signMethod);
            mac.init(signingKey);
            byte[] rawHmac = mac.doFinal(toSignString.getBytes());
            return Hex.encodeHexString(rawHmac);
        }
    }
    说明

    消息推送失败时,平台会重新推送,重试策略如下。

    • 如果对端不在线或未回复ack消息,则会造成消息堆积,堆积的消息转为离线消息。

    • 离线消息每隔1min重试推送一次(每次推送10条)。对端如果成功接收了消息,则重试策略会继续推送剩余的离线消息(推送失败的消息,下一次继续推送)。

    • 离线消息最多会保存 1 天,如果 1 天后仍然无法推送成功,则会被删除。

    • 离线消息会进入单独的队列,不会影响后续消息的实时推送。

消息格式

  • 物的属性变更消息

    topic:/${productKey}/${deviceName}/thing/event/property/post

    消息字段说明如下。

    参数

    子参数

    子参数

    类型

    含义

    deviceType

    String

    设备所属品类

    gmtCreate

    Long

    数据流转消息产生时间,自1970-1-1起流逝的毫秒值

    iotId

    String

    设备的唯一ID

    productKey

    String

    设备所属产品的唯一标识符

    deviceName

    String

    设备名称

    items

    JSON

    变更的状态列表

    attribute

    String

    发生变更的属性,具体取值由具体情况确定

    value

    具体数据类型由具体情况确定

    变更值

    time

    Long

    设备属性发生变化的时间,自1970-1-1起流逝的毫秒值

    消息示例如下。

    {
        "deviceType": "SmartDoor",
        "iotId": "Xzf15db9xxxxxxxxWR001046b400",
        "productKey": "a17xxxxTYNA",
        "gmtCreate": 153xxxx145304,
        "deviceName": "Xzf15xxxxucTHBgUo6WR",
        "items": {
            "WIFI_Rx_Rate": {
                "value": 74274,
                "time": 1534299145344
            }
        }
    }  
  • 物的事件变更消息

    topic:/${productKey}/${deviceName}/thing/event/{tsl.event.identifier}/post

    消息字段说明如下。

    参数

    子参数

    类型

    含义

    deviceType

    String

    设备所属品类

    iotId

    String

    设备的唯一ID

    productKey

    String

    设备所属产品的唯一标识符

    deviceName

    String

    设备名称

    identifier

    String

    事件标识符,对应事件的identifier

    name

    String

    事件名称

    type

    String

    事件类型

    time

    Long

    设备上报value对应的时间,自1970-1-1起流逝的毫秒值

    value

    JSON

    变更的事件属性列表:key-value键值对

    key

    String

    属性key

    value

    具体数据类型由具体情况确定

    属性取值

    消息示例如下。

    {
        "deviceType": "SmartDoor",
        "identifier": "Doorxxxxication",
        "iotId": "Xzf15db9xxxxxxxxx01046b400",
        "name": "开门通知",
        "time": 1534319108982,
        "type": "info",
        "productKey": "a17xxxxTYNA",
        "deviceName": "Xzf15xxxxucTHBgUo6WR",
        "value": {
            "KeyID": "x8xxxxxkDY",
            "LockType": 3
        }
    }
  • 设备服务返回消息

    topic:/${productKey}/${deviceName}/thing/downlink/reply/message

    消息字段说明如下。

    参数

    类型

    含义

    gmtCreate

    Long

    数据流转消息产生时间,自1970-1-1起流逝的毫秒值

    iotId

    String

    设备的唯一ID

    productKey

    String

    设备所属产品的唯一标识符

    deviceName

    String

    设备名称

    requestId

    String

    阿里云产生和设备通信的信息ID

    code

    Integer

    调用的结果信息

    message

    String

    结果信息说明

    topic

    String

    服务调用下行时使用的topic

    data

    Object

    设备返回的结果,非透传之间返回设备结果,透传则需要经过脚本转换

    消息示例如下。

    {
      "gmtCreate": 151xxxx39881,
      "iotId": "4z819VQHxxxxxxxxxxxx7ee200",
      "productKey": "p1gxxxxBd",
      "deviceName": "xxxxxxxxxx",
      "requestId": "1234",
      "code": 200,
      "message": "success",
      "topic": "/sys/p1gxxxxeUBd/xxxxxxxxxx/thing/service/property/set",
      "data": {}
    }           
  • 物的状态变更消息

    为了提高消息有效性,设备上下线过于频繁时,会对消息进行筛检。

    topic:/as/mqtt/status/{pk}/{dn}

    消息字段说明如下。

    参数

    类型

    含义

    status

    String

    设备状态。online:上线。offline:离线。

    iotId

    String

    设备在平台内的唯一标识。

    offlineReasonCode

    Integer

    设备下线时,返回的错误码。详细说明,请参见设备行为错误码

    productKey

    String

    设备所属产品的唯一标识。

    deviceName

    String

    设备名称。

    lastTime

    String

    该参数为历史存量字段,已无实际意义。

    utcLastTime

    String

    time

    String

    设备上、下线的时间。

    utcTime

    String

    设备上、下线的UTC时间。

    clientIp

    String

    设备公网出口IP。

    消息示例如下。

    {
        "status":"offline",
        "iotId":"4z819VQHk6VSLmmBJfrf00107e****",
        "offlineReasonCode":427,
        "productKey":"al12345****",
        "deviceName":"deviceName1234",
        "time":"2018-08-31 15:32:28.205",
        "utcTime":"2018-08-31T07:32:28.205Z",
        "lastTime":"2018-08-31 15:32:28.195",
        "utcLastTime":"2018-08-31T07:32:28.195Z",
        "clientIp":"192.0.2.1"
    } 
  • 用户绑定变更消息

    用户绑定/解绑设备产生的回流消息,用于同步用户与设备的绑定、解绑。

    topic:/${productKey}/${deviceName}/thing/awss/enrollee/user

    消息字段说明如下。

    参数

    子参数

    类型

    含义

    bind

    bool

    true-绑定;false-解绑

    productKey

    String

    设备所属产品的唯一标识符

    deviceName

    String

    设备名称

    iotId

    String

    设备的唯一ID

    messageCreateTime

    Long

    消息创建时间

    identityInfos

    list

    用户信息列表

    identityId

    String

    用户身份ID

    scopeId

    String

    隔离ID

    tenantId

    String

    租户ID

    owned

    Integer

    拥有标记

    • 0:分享者

    • 1:拥有者

    params

    Map

    扩展参数(暂未使用)

    {
      "bind":true,
      "productKey": "123xxxx569",
      "deviceName": "deviceNamexxxx34",
      "iotId": "",
      "messageCreateTime": 151xxxx9881,
      "identityInfos":[
         {
           "identityId":"50xxxxxxxxxxxx62060259",
           "scopeId":"",
           "tenantId":"1D89B5xxxxxxxxxxxxxxxx861678FF",
           "owned":1
         }
      ],
      "params":{
      }
    }