• Data Ingestion Service

dis
  1. Help Center
  2. Data Ingestion Service
  3. User Guide
  4. DIS SDK Usage Guide
  5. Downloading Streaming Data

Downloading Streaming Data

Sample Code

Use the initialized client instance to obtain data through the DIS stream. The sample code is available in the ConsumerDemo.java file under the dis-sdk-demo\src\main\java\com\bigdata\dis\sdk\demo directory.

The value of streamName must be the same as that of Stream Name configured in Step 1: Creating a DIS Stream.

//Configure the stream name.
String streamName = "streamName";    
// Configure the ID of the partition for data download.
String partitionId = "0";    
//Configure the sequence number for data download.
String startingSequenceNumber = "0";    
//Configure the data download mode.
String cursorType = PartitionCursorTypeEnum.AT_SEQUENCE_NUMBER.name();    
try 
{
//Obtain data cursors.
      GetPartitionCursorRequest request = new GetPartitionCursorRequest();
      request.setStreamName(streamName);
      request.setPartitionId(partitionId);
      request.setStartingSequenceNumber(startingSequenceNumber);
      request.setCursorType(cursorType);
      GetPartitionCursorResult response = dic.getPartitionCursor(request);
      String cursor = response.getPartitionCursor();
            log.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor);
            GetRecordsRequest recordsRequest = newGetRecordsRequest();
      GetRecordsResult recordResponse = null;
      while (true)
      {
          recordsRequest.setPartitionCursor(cursor);
          recordsRequest.setLimit(2);
          recordResponse = dic.getRecords(recordsRequest);
//Obtain the next-batch data cursors.
          cursor = recordResponse.getNextPartitionCursor();
                    for (Record record : recordResponse.getRecords())
          {
              log.info("Get Record [{}], partitionKey [{}], sequenceNumber [{}].",
                  new String(record.getData().array()),
                  record.getPartitionKey(),
                  record.getSequenceNumber());
          }
          Thread.sleep(1000);
      }
  }  
catch (DISClientException e)
{
    log.error("Failed to get a normal response, please check params and retry. Error message [{}]", e.getMessage(), e);
}
catch (ResourceAccessException e)
{
    log.error("Failed to access endpoint. Error message [{}]", e.getMessage(), e);
}
catch (Exception e)
{
    log.error(e.getMessage(), e);
}

Running the Program

Right-click the program and choose Run As > 1 Java Application from the shortcut menu. If the program runs successfully, the information similar to the following is displayed on the console:

14:55:42.954 [main] INFOcom.bigdata.dis.sdk.DISConfig - get from classLoader
14:55:44.103 [main] INFOcom.bigdata.dis.sdk.util.config.ConfigurationUtils - get from classLoader
14:55:44.105 [main] INFOcom.bigdata.dis.sdk.util.config.ConfigurationUtils - propertyMapFromFile size : 2
14:55:45.235 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get stream streamName[partitionId=0] cursor success : eyJnZXRJdGVyYXRvclBhcmFtIjp7InN0cmVhbS1uYW1lIjoiZGlzLTEzbW9uZXkiLCJwYXJ0aXRpb24taWQiOiIwIiwiY3Vyc29yLXR5cGUiOiJBVF9TRVFVRU5DRV9OVU1CRVIiLCJzdGFydGluZy1zZXF1ZW5jZS1udW1iZXIiOiIxMDY4OTcyIn0sImdlbmVyYXRlVGltZXN0YW1wIjoxNTEzNjY2NjMxMTYxfQ
14:55:45.305 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [964885], sequenceNumber [0].
14:55:45.305 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [910960], sequenceNumber [1].
14:55:46.359 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [528377], sequenceNumber [2].