Data Ingestion Service (DIS)

Data Ingestion Service (DIS) addresses the challenge of transmitting data from outside the cloud to inside the cloud. DIS builds data intake streams for custom applications capable of processing or analyzing streaming data. DIS continuously captures, transmits, and stores terabytes of data from hundreds of thousands of sources every hour, such as logs, social media feeds, website clickstreams, and location-tracking events.

DIS Stream

List DIS Streams

This interface is used to query DIS stream list. Stream.

import openstack


openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

print(list(conn.dis.streams()))

Create DIS Stream

This interface is used to create a DIS stream with parameters. Stream.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

attrs = {
    "stream_name": "newstream",
    "partition_count": 3,
    "data_duration": 24
}
result = conn.dis.create_stream(**attrs)
print(result)

Get DIS Stream

This interface is used to get a DIS stream by stream name. Stream.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

stream_name = 'test-stream'
resp = conn.dis.get_stream(stream_name)
print(resp)

Delete DIS Stream

This interface is used to delete a DIS Stream by Stream name. Stream.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

stream_name = 'test-stream'
conn.dis.delete_stream(stream_name)

Update DIS Stream Partition Count

This interface is used to update partition count of an existing DIS stream. Stream.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

stream_name = 'test-stream'
new_partition_count = 5
result = conn.dis.update_stream_partition(stream_name, new_partition_count)
print(result)

DIS Consumption App

List Consumption Apps

This interface is used to query list of DIS consumption apps. App.

import openstack


openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

print(list(conn.dis.apps()))

Get Consumption App

This interface is used to query consumption app by app name.. Stream.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

app_name = 'test-app'
resp = conn.dis.get_app(app_name)
print(resp)

Create Consumption App

This interface is used to create a consumption app with provided parameters. App.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

attrs = {
    "app_name": "newapp"
}
result = conn.dis.create_app(**attrs)
print(result)

Delete Consumption App

This interface is used to delete a consumption app by app name. App.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

app_name = 'newapp'
conn.dis.delete_app(app_name)

List App Consumptions

This interface is list of app consumptions.. App.

import openstack


openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

stream_name = 'test-stream'
app_name = 'test-app'

print(list(conn.dis.app_consumptions(stream_name, app_name)))

DIS Dump Task

List Dump Tasks

This interface is used to query list of DIS Dump Tasks. DumpTask.

import openstack


openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

stream_name = 'test-stream'

print(list(conn.dis.dump_tasks(stream_name)))

Create Dump Task

This interface is used to create a DIS Dump Task with parameters. DumpTask.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

stream_name = 'test-stream'
attrs = {
    "destination_type": "OBS",
    "obs_destination_descriptor": {
        "task_name": "newtask",
        "consumer_strategy": "LATEST",
        "agency_name": "dis_admin_agency",
        "destination_file_type": "text",
        "obs_bucket_path": "obsbucket",
        "file_prefix": "",
        "partition_format": "yyyy/MM/dd/HH/mm",
        "record_delimiter": "|",
        "deliver_time_interval": 30
    }
}
result = conn.dis.create_dump_task(stream_name, **attrs)
print(result)

Get Dump Task

This interface is used to get a DIS Stream Dump Task by task name. DumpTask.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

stream_name = 'test-stream'
task_name = 'test-dump-task'
resp = conn.dis.get_dump_task(stream_name, task_name)
print(resp)

Delete Dump Task

This interface is used to delete a DIS Stream dump task by task name. DumpTask.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

stream_name = 'test-stream'
task_name = 'test-dump-task'
conn.dis.delete_dump_task(stream_name, task_name)

Start Dump Task

This interface is used to start DIS Stream dump task. DumpTask.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

stream_name = 'test-stream'
task_ids = ['task1-id', 'task2-id']
conn.dis.start_dump_task(stream_name, task_ids)

Pause Dump Task

This interface is used to pause DIS Stream dump task. DumpTask.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

stream_name = 'test-stream'
task_ids = ['task1-id', 'task2-id']
conn.dis.pause_dump_task(stream_name, task_ids)

Data Management

Upload Data

This interface is used to upload data to DIS Stream. Data.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

stream_name = 'test-stream'
records = [
    {
        "data": "MTExMTExMTExMTExMTExMTExMTExMTExMTExMTExMTE=",
        "partition_id": "0"
    },
    {
        "data": "aGVsbG8gT1RDCg==",
        "partition_id": "1"
    }
]
result = conn.dis.upload_data(stream_name, records=records)
print(result)

Obtain Data Cursor

This interface is query Data Cursor by stream parameters. Data.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

stream_name = "test-stream"
partition_id = "0"

resp = conn.dis.get_data_cursor(stream_name, partition_id)
print(resp)

Download Data

This interface is used to download data from DIS Stream. Data.

import openstack

openstack.enable_logging(True)
conn = openstack.connect(cloud='otc')

partition_cursor = "eyJpdGVyR2VuVGltZSI6MTQ5MDk1MD.."
result = conn.dis.download_data(partition_cursor)
print(result)

# To download Data to a CSV File
filename = "my_data.csv"
conn.dis.download_data(partition_cursor, filename=filename)