Downloading Streaming Data

Sample Code

Use the initialized client instance to obtain data through the DIS stream. The sample code is available in the ConsumerDemo.java file under the dis-sdk-demo\src\main\java\com\bigdata\dis\sdk\demo directory.

The value of streamName must be the same as that of Stream Name configured in Step 1: Creating a DIS Stream.

//Configure the stream name.
String streamName = "streamName";
// Configure the ID of the partition for data download.
String partitionId = "0";
//Configure the sequence number for data download.
String startingSequenceNumber = "0";
//Configure the data download mode.
String cursorType = PartitionCursorTypeEnum.AT_SEQUENCE_NUMBER.name();
try
{
//Obtain data cursors.
      GetPartitionCursorRequest request = new GetPartitionCursorRequest();
      request.setStreamName(streamName);
      request.setPartitionId(partitionId);
      request.setStartingSequenceNumber(startingSequenceNumber);
      request.setCursorType(cursorType);
      GetPartitionCursorResult response = dic.getPartitionCursor(request);
      String cursor = response.getPartitionCursor();
            log.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor);
            GetRecordsRequest recordsRequest = newGetRecordsRequest();
      GetRecordsResult recordResponse = null;
      while (true)
      {
          recordsRequest.setPartitionCursor(cursor);
          recordsRequest.setLimit(2);
          recordResponse = dic.getRecords(recordsRequest);
//Obtain the next-batch data cursors.
          cursor = recordResponse.getNextPartitionCursor();
                    for (Record record : recordResponse.getRecords())
          {
              log.info("Get Record [{}], partitionKey [{}], sequenceNumber [{}].",
                  new String(record.getData().array()),
                  record.getPartitionKey(),
                  record.getSequenceNumber());
          }
          Thread.sleep(1000);
      }
  }
catch (DISClientException e)
{
    log.error("Failed to get a normal response, please check params and retry. Error message [{}]", e.getMessage(), e);
}
catch (ResourceAccessException e)
{
    log.error("Failed to access endpoint. Error message [{}]", e.getMessage(), e);
}
catch (Exception e)
{
    log.error(e.getMessage(), e);
}

Running the Program

Right-click the program and choose Run As > 1 Java Application from the shortcut menu. If the program runs successfully, the information similar to the following is displayed on the console:

14:55:42.954 [main] INFOcom.bigdata.dis.sdk.DISConfig - get from classLoader
14:55:44.103 [main] INFOcom.bigdata.dis.sdk.util.config.ConfigurationUtils - get from classLoader
14:55:44.105 [main] INFOcom.bigdata.dis.sdk.util.config.ConfigurationUtils - propertyMapFromFile size : 2
14:55:45.235 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get stream streamName[partitionId=0] cursor success : eyJnZXRJdGVyYXRvclBhcmFtIjp7InN0cmVhbS1uYW1lIjoiZGlzLTEzbW9uZXkiLCJwYXJ0aXRpb24taWQiOiIwIiwiY3Vyc29yLXR5cGUiOiJBVF9TRVFVRU5DRV9OVU1CRVIiLCJzdGFydGluZy1zZXF1ZW5jZS1udW1iZXIiOiIxMDY4OTcyIn0sImdlbmVyYXRlVGltZXN0YW1wIjoxNTEzNjY2NjMxMTYxfQ
14:55:45.305 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [964885], sequenceNumber [0].
14:55:45.305 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [910960], sequenceNumber [1].
14:55:46.359 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [528377], sequenceNumber [2].