• Data Ingestion Service

dis
  1. Help Center
  2. Data Ingestion Service
  3. User Guide
  4. DIS SDK Usage Guide
  5. Uploading Streaming Data

Uploading Streaming Data

Sample Code

Use the initialized client instance to upload your streaming data to DIS. The sample code is available in the ProducerDemo.java file in the dis-sdk-demo\src\com\bigdata\dis\sdk\demo directory.

The value of streamName must be the same as that of Stream Name configured in Step 1: Creating a DIS Stream.

The code for uploading streaming data is as follows:

//Initialize an asynchronous sending client.
DISConfig disConfig = new DISConfig().setAK("xxxx").setSK("xxxx").setProjectId("xxxx").setRegion("xxxx").setEndpoint("xxxx");
DISProducer producer = new DISProducer(disConfig);

//Configure the stream name.
String streamName = "streamName";
//Configure the data to be uploaded.
String message = "hello world.";
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
putRecordsRequestEntry.setData(buffer);
putRecordsRequestEntry.setPartitionKey(String.valueOf(ThreadLocalRandom.current().nextInt(1000000)));
log.info("========== BEGIN PUT ============");

int count = 10;
CountDownLatch cd = new CountDownLatch(count);
for (int i = 0; i < count; i++)
{
    putRecordsRequestEntry.setPartitionKey(String.valueOf(ThreadLocalRandom.current().nextInt(1000000)));
    try
    {
        producer.putRecordAsync(streamName, putRecordsRequestEntry, new AsyncHandler<PutRecordsResultEntry>()
        {
            @Override
            public void onSuccess(PutRecordsResultEntry result)
            {
                log.info(result.toString());
                cd.countDown();
            }
            
            @Override
            public void onError(Exception exception)
            {
                log.error(exception.getMessage(), exception);
                cd.countDown();
            }
        });
    }
    catch (Exception e)
    {
        log.error(e.getMessage(), e);
        cd.countDown();
    }
}

cd.await();
log.info("========== PUT OVER ============");
producer.close();

Running the Program

Right-click the program and choose Run As > 1 Java Application from the shortcut menu. If the program runs successfully, the information similar to the following is displayed on the console:

17:27:49.130 [main] INFO  com.bigdata.dis.sdk.DISConfig - get from classLoader
17:27:49.142 [Sender Thread] DEBUG com.bigdata.dis.sdk.producer.internals.Sender - Starting Kafka producer I/O thread.
17:27:49.145 [main] INFO  DISProducerDemo - ========== BEGIN PUT ============
17:27:49.202 [Sender Thread] DEBUG com.bigdata.dis.sdk.producer.internals.Sender - begin to send : 1
17:27:49.203 [Sender Thread] DEBUG com.bigdata.dis.sdk.producer.internals.Sender - batch size: 10, 120
17:27:50.197 [pool-2-thread-1] INFO  com.bigdata.dis.sdk.util.config.ConfigurationUtils - get from classLoader
17:27:50.197 [pool-2-thread-1] INFO  com.bigdata.dis.sdk.util.config.ConfigurationUtils - propertyMapFromFile size : 2
17:27:51.531 [pool-2-thread-1] DEBUG com.bigdata.dis.sdk.producer.internals.Sender - batches success. dis-alAR-nb, 10
17:27:51.532 [pool-2-thread-1] INFO  DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null]
17:27:51.532 [pool-2-thread-1] INFO  DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null]
17:27:51.532 [pool-2-thread-1] INFO  DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null]
17:27:51.532 [pool-2-thread-1] INFO  DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null]
17:27:51.532 [pool-2-thread-1] INFO  DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null]
17:27:51.532 [pool-2-thread-1] INFO  DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null]
17:27:51.532 [pool-2-thread-1] INFO  DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null]
17:27:51.532 [pool-2-thread-1] INFO  DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null]
17:27:51.532 [pool-2-thread-1] INFO  DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null]
17:27:51.532 [pool-2-thread-1] INFO  DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null]
17:27:51.533 [main] INFO  DISProducerDemo - ========== PUT OVER ============
17:27:51.571 [Sender Thread] DEBUG com.bigdata.dis.sdk.producer.internals.Sender - Beginning shutdown of Kafka producer I/O thread, sending remaining records.