Data Ingestion Service (DIS)¶
Data Ingestion Service (DIS) addresses the challenge of transmitting data from outside the cloud to inside the cloud. DIS builds data intake streams for custom applications capable of processing or analyzing streaming data. DIS continuously captures, transmits, and stores terabytes of data from hundreds of thousands of sources every hour, such as logs, social media feeds, website clickstreams, and location-tracking events.
DIS Stream¶
List DIS Streams¶
This interface is used to query DIS stream list.
Stream
.
import openstack
openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')
print(list(conn.dis.streams()))
Create DIS Stream¶
This interface is used to create a DIS stream with
parameters.
Stream
.
import openstack
openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')
attrs = {
"stream_name": "newstream",
"partition_count": 3,
"data_duration": 24
}
result = conn.dis.create_stream(**attrs)
print(result)
Get DIS Stream¶
This interface is used to get a DIS stream by
stream name.
Stream
.
import openstack
openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')
stream_name = 'test-stream'
resp = conn.dis.get_stream(stream_name)
print(resp)
Delete DIS Stream¶
This interface is used to delete a DIS Stream by
Stream name.
Stream
.
import openstack
openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')
stream_name = 'test-stream'
conn.dis.delete_stream(stream_name)
Update DIS Stream Partition Count¶
This interface is used to update partition count
of an existing DIS stream.
Stream
.
import openstack
openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')
stream_name = 'test-stream'
new_partition_count = 5
result = conn.dis.update_stream_partition(stream_name, new_partition_count)
print(result)
DIS Consumption App¶
List Consumption Apps¶
This interface is used to query list of DIS consumption apps.
App
.
import openstack
openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')
print(list(conn.dis.apps()))
Get Consumption App¶
This interface is used to query consumption app
by app name..
Stream
.
import openstack
openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')
app_name = 'test-app'
resp = conn.dis.get_app(app_name)
print(resp)
Create Consumption App¶
This interface is used to create a consumption app
with provided parameters.
App
.
import openstack
openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')
attrs = {
"app_name": "newapp"
}
result = conn.dis.create_app(**attrs)
print(result)
Delete Consumption App¶
This interface is used to delete a consumption app by
app name.
App
.
import openstack
openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')
app_name = 'newapp'
conn.dis.delete_app(app_name)
List App Consumptions¶
This interface is list of app consumptions..
App
.
import openstack
openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')
stream_name = 'test-stream'
app_name = 'test-app'
print(list(conn.dis.app_consumptions(stream_name, app_name)))
DIS Dump Task¶
List Dump Tasks¶
This interface is used to query list of DIS Dump Tasks.
DumpTask
.
import openstack
openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')
stream_name = 'test-stream'
print(list(conn.dis.dump_tasks(stream_name)))
Create Dump Task¶
This interface is used to create a DIS Dump Task with
parameters.
DumpTask
.
import openstack
openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')
stream_name = 'test-stream'
attrs = {
"destination_type": "OBS",
"obs_destination_descriptor": {
"task_name": "newtask",
"consumer_strategy": "LATEST",
"agency_name": "dis_admin_agency",
"destination_file_type": "text",
"obs_bucket_path": "obsbucket",
"file_prefix": "",
"partition_format": "yyyy/MM/dd/HH/mm",
"record_delimiter": "|",
"deliver_time_interval": 30
}
}
result = conn.dis.create_dump_task(stream_name, **attrs)
print(result)
Get Dump Task¶
This interface is used to get a DIS Stream Dump Task by
task name.
DumpTask
.
import openstack
openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')
stream_name = 'test-stream'
task_name = 'test-dump-task'
resp = conn.dis.get_dump_task(stream_name, task_name)
print(resp)
Delete Dump Task¶
This interface is used to delete a DIS Stream dump task by
task name.
DumpTask
.
import openstack
openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')
stream_name = 'test-stream'
task_name = 'test-dump-task'
conn.dis.delete_dump_task(stream_name, task_name)
Start Dump Task¶
This interface is used to start DIS Stream dump task.
DumpTask
.
import openstack
openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')
stream_name = 'test-stream'
task_ids = ['task1-id', 'task2-id']
conn.dis.start_dump_task(stream_name, task_ids)
Pause Dump Task¶
This interface is used to pause DIS Stream dump task.
DumpTask
.
import openstack
openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')
stream_name = 'test-stream'
task_ids = ['task1-id', 'task2-id']
conn.dis.pause_dump_task(stream_name, task_ids)
Data Management¶
Upload Data¶
This interface is used to upload data to DIS Stream.
Data
.
import openstack
openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')
stream_name = 'test-stream'
records = [
{
"data": "MTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTE=",
"partition_id": "0"
},
{
"data": "aGVsbG8gT1RDCg==",
"partition_id": "1"
}
]
result = conn.dis.upload_data(stream_name, records=records)
print(result)
Obtain Data Cursor¶
This interface is query Data Cursor by stream parameters.
Data
.
import openstack
openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')
stream_name = "test-stream"
partition_id = "0"
resp = conn.dis.get_data_cursor(stream_name, partition_id)
print(resp)
Download Data¶
This interface is used to download data from DIS Stream.
Data
.
import openstack
openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')
partition_cursor = "eyJpdGVyR2VuVGltZSI6MTQ5MDk1MD.."
result = conn.dis.download_data(partition_cursor)
print(result)
# To download Data to a CSV File
filename = "my_data.csv"
conn.dis.download_data(partition_cursor, filename=filename)