Uploading Streaming Data¶
Sample Code¶
Use the initialized client instance to upload your streaming data to DIS. The sample code is available in the ProducerDemo.java file in the dis-sdk-demo\src\com\bigdata\dis\sdk\demo directory.
The value of streamName must be the same as that of Stream Name configured in Step 1: Creating a DIS Stream.
The code for uploading streaming data is as follows:
//Initialize an asynchronous sending client.
DISConfig disConfig = new DISConfig().setAK("xxxx").setSK("xxxx").setProjectId("xxxx").setRegion("xxxx").setEndpoint("xxxx");
DISProducer producer = new DISProducer(disConfig);
//Configure the stream name.
String streamName = "streamName";
//Configure the data to be uploaded.
String message = "hello world.";
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
putRecordsRequestEntry.setData(buffer);
putRecordsRequestEntry.setPartitionKey(String.valueOf(ThreadLocalRandom.current().nextInt(1000000)));
log.info("========== BEGIN PUT ============");
int count = 10;
CountDownLatch cd = new CountDownLatch(count);
for (int i = 0; i < count; i++)
{
putRecordsRequestEntry.setPartitionKey(String.valueOf(ThreadLocalRandom.current().nextInt(1000000)));
try
{
producer.putRecordAsync(streamName, putRecordsRequestEntry, new AsyncHandler<PutRecordsResultEntry>()
{
@Override
public void onSuccess(PutRecordsResultEntry result)
{
log.info(result.toString());
cd.countDown();
}
@Override
public void onError(Exception exception)
{
log.error(exception.getMessage(), exception);
cd.countDown();
}
});
}
catch (Exception e)
{
log.error(e.getMessage(), e);
cd.countDown();
}
}
cd.await();
log.info("========== PUT OVER ============");
producer.close();
Running the Program¶
Right-click the program and choose Run As > 1 Java Application from the shortcut menu. If the program runs successfully, the information similar to the following is displayed on the console:
17:27:49.130 [main] INFO com.bigdata.dis.sdk.DISConfig - get from classLoader
17:27:49.142 [Sender Thread] DEBUG com.bigdata.dis.sdk.producer.internals.Sender - Starting Kafka producer I/O thread.
17:27:49.145 [main] INFO DISProducerDemo - ========== BEGIN PUT ============
17:27:49.202 [Sender Thread] DEBUG com.bigdata.dis.sdk.producer.internals.Sender - begin to send : 1
17:27:49.203 [Sender Thread] DEBUG com.bigdata.dis.sdk.producer.internals.Sender - batch size: 10, 120
17:27:50.197 [pool-2-thread-1] INFO com.bigdata.dis.sdk.util.config.ConfigurationUtils - get from classLoader
17:27:50.197 [pool-2-thread-1] INFO com.bigdata.dis.sdk.util.config.ConfigurationUtils - propertyMapFromFile size : 2
17:27:51.531 [pool-2-thread-1] DEBUG com.bigdata.dis.sdk.producer.internals.Sender - batches success. dis-alAR-nb, 10
17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null]
17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null]
17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null]
17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null]
17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null]
17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null]
17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null]
17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null]
17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null]
17:27:51.532 [pool-2-thread-1] INFO DISProducerDemo - PutRecordsResultEntry [partitionId=shardId-0000000000, sequenceNumber=76, errorCode=null, errorMessage=null]
17:27:51.533 [main] INFO DISProducerDemo - ========== PUT OVER ============
17:27:51.571 [Sender Thread] DEBUG com.bigdata.dis.sdk.producer.internals.Sender - Beginning shutdown of Kafka producer I/O thread, sending remaining records.