本文通过示例代码指导您使用多值模型写入数据。详细示例请参见Aliyun TSDB SDK for Java中的test/example/SampleOfMultiFieldtest/TestHiTSDBClientMultiFieldFeatures

注意:多值模型仅在0.2.0版本及以上的SDK支持。关于最新SDK版本的说明及下载,请参见SDK的《版本说明》。之前版本的SDK所支持的多值模型实际是模拟多值模型,不鼓励继续使用。之前通过 MultiValue 类写入的数据并不能通过最新的 SDK 多值模型相关类 (MultiField) 进行查询。

多值模型的数据写入

示例代码:

        /*
         *  通过 multiFieldPutSync() API。
         *  该 API 支持单点和多点(List)写入。
         *
         *  下面是必须写入时必须提供的信息:
         *  Metric: 数据指标的类别。相当于 influxdb 的 Measurement。
         *  Fields: 数据指标的度量信息(相当于指标下的子类别)。即一个 metric 支持多个 fields。如 metric 为 wind,该 metric 可以有多个 fields:direction, speed, direction, description 和 temperature。
         *  Timestamp: 数据点的时间戳
         *  Tags: 时间线额外的信息,如"型号=ABC123"、"出厂编号=1234567890"等。
         */
        MultiFieldPoint multiFieldPoint = MultiFieldPoint.metric("wind")
                .field("speed", 45.2)
                .field("level", 1.2)
                .field("direction", "S")
                .field("description", "Breeze")
                .tag("sensor", "95D8-7913")
                .tag("city", "hangzhou")
                .tag("province", "zhejiang")
                .timestamp(1537170208L)
                .build();

        // 同步写入
        tsdb.multiFieldPutSync(multiFieldPoint);

多值模型的异步写入

注意:多值模型异步仅在 0.2.1 版本及以上的 SDK 支持。关于最新 SDK 版本的说明及下载,请参见SDK的《版本说明》。

不使用回调方法

默认的TSDBConfig没有设置多值异步写入回调方法。示例代码:
TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
                // 批次写入时每次提交数据量
                .batchPutSize(500)
                // 单值异步写入线程数
                .batchPutConsumerThreadCount(1)
                // 多值异步写入缓存队列长度
                .multiFieldBatchPutBufferSize(10000)
                // 多值异步写入线程数
                .multiFieldBatchPutConsumerThreadCount(1)
                .config();

        // 特别注意,TSDB只需初始化一次即可
        TSDB tsdb = TSDBClientFactory.connect(config);

        MultiFieldPoint point = MultiFieldPoint
                .metric("test-test-test")
                .tag("a", "1")
                .tag("b", "2")
                .timestamp(System.currentTimeMillis())
                .field("f1", Math.random())
                .field("f2", Math.random())
                .build();
        tsdb.multiFieldPut(point);

        // 特别注意,在应用生命周期内不用关闭TSDB实例,直到应用结束时关闭即可
        tsdb.close();
    }

使用普通回调方法

普通回调方法仅报告当写入成功时的数据点列表,或者当写入失败时的数据点列表以及对应的写入异常。示例代码:
      // 创建一个普通回调方法,用于处理写入成功或者失败后的情况
      AbstractMultiFieldBatchPutCallback callback = new MultiFieldBatchPutCallback() {

            @Override
            public void response(String address, List<MultiFieldPoint> points, Result result) {
                int count = num.addAndGet(points.size());
                System.out.println(result);
                System.out.println("已处理" + count + "个点");
            }
            final AtomicInteger num = new AtomicInteger();
            @Override
            public void failed(String address, List<MultiFieldPoint> points, Exception ex) {
                System.err.println("业务回调出错!" + points.size() + " error!");
                ex.printStackTrace();
            }
        };

       TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
                .httpConnectTimeout(90)
                // 批次写入时每次提交数据量
                .batchPutSize(500)
                // 单值异步写入线程数
                .batchPutConsumerThreadCount(1)
                // 多值异步写入缓存队列长度
                .multiFieldBatchPutBufferSize(10000)
                // 多值异步写入线程数
                .multiFieldBatchPutConsumerThreadCount(1)
                // 多值异步写入回调方法
                .listenMultiFieldBatchPut(callback)
                .config();

        // 特别注意,TSDB只需初始化一次即可
        TSDB tsdb = TSDBClientFactory.connect(config);

        MultiFieldPoint point = MultiFieldPoint
                .metric("test-test-test")
                .tag("a", "1")
                .tag("b", "2")
                .timestamp(System.currentTimeMillis())
                .field("f1", Math.random())
                .field("f2", Math.random())
                .build();
        tsdb.multiFieldPut(point);

        // 特别注意,在应用生命周期内不用关闭TSDB实例,直到应用结束时关闭即可
        tsdb.close();
    }

使用摘要回调方法

摘要回调方法,当写入成功时,报告这次写入成功时的数据点列表,以及对应的写入摘要情况,包括成功点数,失败点数;当写入失败时,报告这次写入失败时的数据点列表以及对应的写入异常。示例代码:
      // 创建一个摘要回调方法,用于处理写入成功或者失败后的情况
      AbstractMultiFieldBatchPutCallback callback = new MultiFieldBatchPutSummaryCallback() {

            final AtomicInteger num = new AtomicInteger();
            @Override
            public void response(String address, List<MultiFieldPoint> points, SummaryResult result) {
                int count = num.addAndGet(points.size());
                System.out.println(result);
                System.out.println("已处理" + count + "个点");
            }



            @Override
            public void failed(String address, List<MultiFieldPoint> points, Exception ex) {
                System.err.println("业务回调出错!" + points.size() + " error!");
                ex.printStackTrace();
            }
        };

       TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
                .httpConnectTimeout(90)
                // 批次写入时每次提交数据量
                .batchPutSize(500)
                // 单值异步写入线程数
                .batchPutConsumerThreadCount(1)
                // 多值异步写入缓存队列长度
                .multiFieldBatchPutBufferSize(10000)
                // 多值异步写入线程数
                .multiFieldBatchPutConsumerThreadCount(1)
                // 多值异步写入回调方法
                .listenMultiFieldBatchPut(callback)
                .config();

        // 特别注意,TSDB只需初始化一次即可
        TSDB tsdb = TSDBClientFactory.connect(config);

        MultiFieldPoint point = MultiFieldPoint
                .metric("test-test-test")
                .tag("a", "1")
                .tag("b", "2")
                .timestamp(System.currentTimeMillis())
                .field("f1", Math.random())
                .field("f2", Math.random())
                .build();
        tsdb.multiFieldPut(point);

        // 特别注意,在应用生命周期内不用关闭TSDB实例,直到应用结束时关闭即可
        tsdb.close();
    }

使用详情回调方法

详情回调方法,当写入成功时,报告这次写入成功时的数据点列表,以及对应的写入详情,包括写入失败的点以及对应错误信息等;当写入失败时,报告这次写入失败时的数据点列表以及对应的写入异常。示例代码:
      // 创建一个详情回调方法,用于处理写入成功或者失败后的情况
      MultiFieldBatchPutDetailsCallback callback = new MultiFieldBatchPutDetailsCallback() {

            final AtomicInteger num = new AtomicInteger();

            @Override
            public void response(String address, List<MultiFieldPoint> points, DetailsResult result) {
                int count = num.addAndGet(points.size());
                System.out.println(result);
                System.out.println("已处理" + count + "个点");
            }

            @Override
            public void failed(String address, List<MultiFieldPoint> points, Exception ex) {
                System.err.println("业务回调出错!" + points.size() + " error!");
                ex.printStackTrace();
            }
        };

       TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
                .httpConnectTimeout(90)
                // 批次写入时每次提交数据量
                .batchPutSize(500)
                // 单值异步写入线程数
                .batchPutConsumerThreadCount(1)
                // 多值异步写入缓存队列长度
                .multiFieldBatchPutBufferSize(10000)
                // 多值异步写入线程数
                .multiFieldBatchPutConsumerThreadCount(1)
                // 多值异步写入回调方法
                .listenMultiFieldBatchPut(callback)
                .config();

        // 特别注意,TSDB只需初始化一次即可
        TSDB tsdb = TSDBClientFactory.connect(config);

        MultiFieldPoint point = MultiFieldPoint
                .metric("test-test-test")
                .tag("a", "1")
                .tag("b", "2")
                .timestamp(System.currentTimeMillis())
                .field("f1", Math.random())
                .field("f2", Math.random())
                .build();
        tsdb.multiFieldPut(point);

        // 特别注意,在应用生命周期内不用关闭TSDB实例,直到应用结束时关闭即可
        tsdb.close();
    }