• MapReduce Service

mrs
  1. Help Center
  2. MapReduce Service
  3. User Guide
  4. Using MRS
  5. Using Flume
  6. Flume Configuration Parameter Description

Flume Configuration Parameter Description

Scenario

This section describes how to configure the sources, channels, and sinks of Flume, and modify the configuration items of each module.

NOTE:

You must input encrypted information for some configurations. For details on how to encrypt information, see Using the Encryption Tool of the Flume Client.

Common Source Configurations

  • Avro Source

    An Avro source listens to the Avro port, receives data from the external Avro client, and places data into configured channels. Common configurations are as follows.

    Table 1 Common configurations of an Avro source

    Parameter

    Default Value

    Description

    channels

    -

    Channel connected to the source. Multiple channels can be configured but must be separated by spaces.

    To define the flow within a single agent, you need to link the sources and sinks via a channel. A source instance can specify multiple channels, but a sink instance can only specify one channel.

    The format is as follows:

    <Agent >.sources.<Source>.channels = <channel1> <channel2> <channel3>...

    <Agent >.sinks.<Sink>.channels = <channel1>

    type

    avro

    Type, which is set to avro. The type of each source is fixed.

    bind

    -

    Bind to the host name or IP address that is associated with the source.

    port

    -

    Bound port

    ssl

    false

    Indicates whether to use SSL encryption.

    • true
    • false

    truststore-type

    JKS

    Java truststore type. Enter JKS or other supported Java truststore type.

    truststore

    -

    Java truststore file.

    truststore-password

    -

    Java truststore password.

    keystore-type

    JKS

    Keystore type. Enter JKS or other supported Java keystore type.

    keystore

    -

    Keystore file.

    keystore-password

    -

    Keystore password.

  • Spooling Source

    A Spooling source monitors and transmits new files that have been added to directories in quasi-real-time mode. Common configurations are as follows.

    Table 2 Common configurations of a Spooling source

    Parameter

    Default Value

    Description

    channels

    -

    Channel connected to the source. Multiple channels can be configured.

    type

    spooldir

    Type, which is set to spooldir.

    monTime

    0 (disabled)

    Thread monitoring threshold. When the update time (seconds) exceeds the threshold, the source is restarted.

    spoolDir

    -

    Monitoring directory.

    fileSuffix

    .COMPLETED

    Suffix added after file transmission is complete.

    deletePolicy

    never

    Source file deletion policy after file transmission is complete. The value can be either never or immediate.

    ignorePattern

    ^$

    Regular expression of a file to be ignored.

    trackerDir

    .flumespool

    Metadata storage directory during transmission.

    batchSize

    1000

    Source transmission granularity.

    decodeErrorPolicy

    FAIL

    Code error policy

    The options are FAILREPLACE, and IGNORE.

    FAIL: Throw an exception and make resolution fail.

    REPLACE: Replace unidentified characters with other characters (typically, U+FFFD).

    IGNORE: Directly discard character strings that fail to be resolved.

    NOTE:

    If a code error occurs in the file, set decodeErrorPolicy to REPLACE or IGNORE. Flume will skip the code error and continue to collect subsequent logs.

    deserializer

    LINE

    File parser. The value can be either LINE or BufferedLine.

    • When the value is set to LINE, characters read from the file are transcoded one by one.
    • When the value is set to BufferedLine, one line or multiple lines of characters read from the file are transcoded in batches, which delivers better performance.

    deserializer.maxLineLength

    2048

    Maximum length for resolution by line.

    The value ranges from 0 to 2,147,483,647.

    deserializer.maxBatchLine

    1

    Maximum number of lines for resolution by line. If multiple lines are set, maxLineLength must be set to a corresponding multiplier.

    For example, if maxBatchLine is set to 2, maxLineLength is set to 4096 (2048 x 2) accordingly.

    selector.type

    replicating

    Selector type. The value can be either replicating or multiplexing.

    • replicating indicates that the same content is sent to every channel.
    • multiplexing indicates that content is selectively sent to some channels according to the replicating distribution rule.

    interceptors

    -

    Interceptor

    For details about configuration, see Flume User Guide.

    NOTE:

    The Spooling source ignores the last line feed character of each event when data is read by line. Therefore, Flume does not calculate the data volume counters used by the last line feed character.

  • Kafka Source

    A Kafka source consumes data from Kafka topics. Multiple sources can consume data of the same topic, and the sources consume different partitions of the topic. Common configurations are as follows.

    Table 3 Common configurations of a Kafka source

    Parameter

    Default Value

    Description

    channels

    -

    Channel connected to the source. Multiple channels can be configured.

    type

    org.apache.flume.source.kafka.KafkaSource

    Type, which is set to org.apache.flume.source.kafka.KafkaSource.

    monTime

    0 (disabled)

    Thread monitoring threshold. When the update time (seconds) exceeds the threshold, the source is restarted.

    nodatatime

    0 (disabled)

    Alarm threshold. An alarm is triggered when the duration (seconds) that Kafka does not release data to subscribers exceeds the threshold.

    batchSize

    1000

    Number of events written into a channel at a time.

    batchDurationMillis

    1000

    Maximum duration of topic data consumption at a time. The unit is millisecond.

    keepTopicInHeader

    false

    Indicates whether to save topics in the event header. If topics are saved, topics configured in Kafka sinks become invalid.

    • true
    • false

    keepPartitionInHeader

    false

    Indicates whether to save partition IDs in the event header. If partition IDs are saved, Kafka sinks write data to the corresponding partitions.

    • true
    • false

    kafka.bootstrap.servers

    -

    List of Broker addresses, which are separated by commas.

    kafka.consumer.group.id

    -

    Kafka consumer group ID.

    kafka.topics

    -

    List of subscribed Kafka topics, which are separated by commas.

    kafka.topics.regex

    -

    Subscribed topics that comply with regular expressions. kafka.topics.regex has a higher priority than kafka.topics and will overwrite kafka.topics.

    kafka.security.protocol

    SASL_PLAINTEXT

    Security protocol of Kafka. The value must be set to PLAINTEXT for clusters in which Kerberos authentication is disabled.

    Other Kafka Consumer Properties

    -

    Other Kafka configurations. This parameter can be set to any consumption configuration supported by Kafka, and the .kafka prefix must be added to the configuration.

  • Taildir Source

    A Taildir source monitors file changes in a directory and automatically reads the file content. In addition, it can transmit data in real time. Common configurations are as follows.

    Table 4 Common configurations of a Taildir source

    Parameter

    Default Value

    Description

    channels

    -

    Channel connected to the source. Multiple channels can be configured.

    type

    taildir

    Type, which is set to taildir.

    filegroups

    -

    Group name of a collection file directory. Group names are separated by spaces.

    filegroups.<filegroupName>.parentDir

    -

    Parent directory. The value must be an absolute path.

    filegroups.<filegroupName>.filePattern

    -

    Relative file path of the file group's parent directory. Directories can be included and regular expressions are supported. It must be used together with parentDir.

    positionFile

    -

    Metadata storage directory during transmission.

    headers.<filegroupName>.<headerKey>

    -

    Key-value of an event when data of a group is being collected.

    byteOffsetHeader

    false

    Indicates whether each event header should contain the location information about the event in the source file. The location information is saved in the byteoffset variable.

    skipToEnd

    false

    Indicates whether Flume can locate the latest location of a file and read the latest data after restart.

    idleTimeout

    120000

    Idle period during file reading, expressed in milliseconds. If the file data is not changed in this idle period, the source closes the file. If data is written into this file after it is closed, the source opens the file and reads data.

    writePosInterval

    3000

    Interval for writing metadata to a file, expressed in milliseconds.

    batchSize

    1000

    Number of events written into a channel in a batch.

    monTime

    0 (disabled)

    Thread monitoring threshold. When the update time (seconds) exceeds the threshold, the source is restarted.

  • HTTP Source

    An HTTP source receives data from an external HTTP client and sends the data to the configured channels. Common configurations are as follows.

    Table 5 Common configurations of an HTTP source

    Parameter

    Default Value

    Description

    channels

    -

    Channel connected to the source. Multiple channels can be configured.

    type

    http

    Type, which is set to http.

    bind

    -

    Name or IP address of the bound host

    port

    -

    Bound port

    handler

    org.apache.flume.source.http.JSONHandler

    Message parsing method of an HTTP request. The following methods are supported:

    • org.apache.flume.source.http.JSONHandler: JSON
    • org.apache.flume.sink.solr.morphline.BlobHandler: BLOB

    handler.*

    -

    Handler parameters.

    enableSSL

    false

    Indicates whether SSL is enabled in HTTP.

    keystore

    -

    Keystore path after SSL is enabled in HTTP.

    keystorePassword

    -

    Keystore password after SSL is enabled in HTTP.

  • OBS Source

    An OBS source monitors and transmits new files that have been added to specified buckets in quasi-real-time mode. Common configurations are as follows.

    Table 6 Common configurations of an OBS source

    Parameter

    Default Value

    Description

    channels

    -

    Channel connected to the source. Multiple channels can be configured.

    type

    http

    Type, which is set to org.apache.flume.source.s3.OBSSource.

    bucketName

    -

    OBS bucket name.

    prefix

    -

    Monitored OBS path of the specified bucket. The path cannot start with a slash (/). If this parameter is not set, the root directory of the bucket will be monitored by default.

    accessKey

    -

    User AK information.

    secretKey

    -

    User SK information in ciphertext.

    backingDir

    -

    Metadata storage directory during transmission.

    endPoint

    -

    OBS access address. The address must be in the same region as MRS. The value can be either a domain name or an IP address.

    basenameHeader

    false

    Indicates whether to save file names in the event header. false indicates that file names are not saved.

    basenameHeaderKey

    basename

    Name of the field that the event header uses to save a file name, which is also called the key name.

    batchSize

    1000

    Source transmission granularity.

    decodeErrorPolicy

    FAIL

    Code error policy

    NOTE:

    If a code error occurs in the file, set decodeErrorPolicy to REPLACE or IGNORE. Flume will skip the code error and continue to collect subsequent logs.

    deserializer

    LINE

    File parser. The value can be either LINE or BufferedLine.

    • When the value is set to LINE, characters read from the file are transcoded one by one.
    • When the value is set to BufferedLine, one line or multiple lines of characters read from the file are transcoded in batches, which delivers better performance.

    deserializer.maxLineLength

    2048

    Maximum length for resolution by line.

    deserializer.maxBatchLine

    1

    Maximum number of lines for resolution by line. If multiple lines are set, maxLineLength must be set to a corresponding multiplier.

    selector.type

    replicating

    Selector type. The value can be either replicating or multiplexing.

    interceptors

    -

    Interceptor

Common Channel Configurations

  • Memory Channel

    A memory channel uses memory as the cache. Events are stored in memory queues. Common configurations are as follows.

    Table 7 Common configurations of a memory channel

    Parameter

    Default Value

    Description

    type

    -

    Type, which is set to memory.

    capacity

    10000

    Maximum number of events cached in a channel.

    transactionCapacity

    1000

    Maximum number of events accessed each time.

    channelfullcount

    10

    Channel full count. When the count reaches the threshold, an alarm is reported.

  • File Channel

    A file channel uses local disks as the cache. Events are stored in the folder specified by dataDirs. Common configurations are as follows.

    Table 8 Common configurations of a file channel

    Parameter

    Default Value

    Description

    type

    -

    Type, which is set to file.

    checkpointDir

    ${BIGDATA_DATA_HOME}/flume/checkpoint

    Checkpoint storage directory.

    dataDirs

    ${BIGDATA_DATA_HOME}/flume/data

    Data cache directory. Multiple directories can be configured to improve performance. The directories are separated by commas (,).

    maxFileSize

    2146435071

    Maximum size of a single cache file. The unit is byte.

    minimumRequiredSpace

    524288000

    Minimum idle space in the cache. The unit is byte.

    capacity

    1000000

    Maximum number of events cached in a channel.

    transactionCapacity

    10000

    Maximum number of events accessed each time.

    channelfullcount

    10

    Channel full count. When the count reaches the threshold, an alarm is reported.

  • Memory File Channel

    A memory file channel uses both memory and local disks as its cache and supports message persistence. It provides similar performance as a memory channel and better performance than a file channel. Common configurations are as follows.

    Table 9 Common configurations of a memory file channel

    Parameter

    Default Value

    Description

    type

    org.apache.flume.channel.MemoryFileChannel

    Type, which is set to org.apache.flume.channel.MemoryFileChannel.

    capacity

    50000

    Channel cache: maximum number of events cached in a channel.

    transactionCapacity

    5000

    Transaction cache: maximum number of events processed by a transaction.

    • The parameter value must be greater than the batchSize of the source and sink.
    • The value of transactionCapacity must be less than or equal to that of capacity.

    subqueueByteCapacity

    20971520

    Maximum size (bytes) of events that can be stored in a subqueue.

    A memory file channel uses both queues and subqueues to cache data. Events are stored in a subqueue, and subqueues are stored in a queue.

    subqueueCapacity and subqueueInterval determine the size of events that can be stored in a subqueue. subqueueCapacity specifies the capacity of a subqueue, and subqueueInterval specifies the duration that a subqueue can store events. Events in a subqueue are sent to the destination only after the subqueue reaches the upper limit of subqueueCapacity or subqueueInterval.

    NOTE:

    The value of subqueueByteCapacity must be greater than the number of events specified by batchSize.

    subqueueInterval

    2000

    Maximum duration (milliseconds) that a subqueue can store events.

    keep-alive

    3

    Waiting time of the Put and Take threads when the transaction or channel cache is full. The unit is second.

    dataDir

    -

    Cache directory for local files.

    byteCapacity

    80% of the maximum JVM memory

    Channel cache capacity. Unit: byte

    compression-type

    None

    Message compression format. The value can be either None or Snappy. When the format is Snappy, event message bodies that are compressed in the Snappy format can be decompressed.

    channelfullcount

    10

    Channel full count. When the count reaches the threshold, an alarm is reported.

    The following is a configuration example of a memory file channel:

    server.channels.c1.type = org.apache.flume.channel.MemoryFileChannel
    server.channels.c1.dataDir = /opt/flume/mfdata
    server.channels.c1.subqueueByteCapacity = 20971520
    server.channels.c1.subqueueInterval=2000
    server.channels.c1.capacity = 500000
    server.channels.c1.transactionCapacity = 40000
  • Kafka Channel
    A Kafka channel uses a Kafka cluster as the cache. Kafka provides high availability and multiple copies to prevent data from being immediately consumed by sinks when Flume or Kafka Broker crashes.
    Table 10 Common configurations of a Kafka channel

    Parameter

    Default Value

    Description

    type

    -

    Type, which is set to org.apache.flume.channel.kafka.KafkaChannel.

    kafka.bootstrap.servers

    -

    List of Brokers in the Kafka cluster.

    kafka.topic

    flume-channel

    Kafka topic used by the channel to cache data.

    kafka.consumer.group.id

    flume

    Kafka consumer group ID.

    parseAsFlumeEvent

    true

    Indicates whether data is parsed into Flume events.

    migrateZookeeperOffsets

    true

    Indicates whether to search for offsets in ZooKeeper and submits them to Kafka when there is no offset in Kafka.

    kafka.consumer.auto.offset.reset

    latest

    Consumes data from the specified location when there is no offset.

    kafka.producer.security.protocol

    SASL_PLAINTEXT

    Kafka producer security protocol.

    kafka.consumer.security.protocol

    SASL_PLAINTEXT

    Kafka consumer security protocol.

Common Sink Configurations

  • HDFS Sink

    An HDFS sink writes data into HDFS. Common configurations are as follows.

    Table 11 Common configurations of an HDFS sink

    Parameter

    Default Value

    Description

    channel

    -

    Channel connected to the sink.

    type

    hdfs

    Type, which is set to hdfs.

    monTime

    0 (disabled)

    Thread monitoring threshold. When the update time (seconds) exceeds the threshold, the sink is restarted.

    hdfs.path

    -

    HDFS path.

    hdfs.inUseSuffix

    .tmp

    Suffix of the HDFS file being written.

    hdfs.rollInterval

    30

    Interval for file rolling. The unit is second.

    hdfs.rollSize

    1024

    Size for file rolling. The unit is byte.

    hdfs.rollCount

    10

    Number of events for file rolling.

    hdfs.idleTimeout

    0

    Timeout interval for closing idle files automatically. The unit is second.

    hdfs.batchSize

    1000

    Number of events written into HDFS at a time.

    hdfs.kerberosPrincipal

    -

    Kerberos username for HDFS authentication. This parameter is not required for a cluster in which Kerberos authentication is disabled.

    hdfs.kerberosKeytab

    -

    Kerberos keytab for HDFS authentication. This parameter is not required for a cluster in which Kerberos authentication is disabled.

    hdfs.fileCloseByEndEvent

    true

    Indicates whether the file is closed when the last event is received.

    hdfs.batchCallTimeout

    -

    Timeout control duration (milliseconds) each time events are written into HDFS.

    If this parameter is not specified, the timeout duration is controlled when each event is written into HDFS. When the value of hdfs.batchSize is greater than 0, configure this parameter to improve the performance of writing data into HDFS.

    NOTE:

    The value of hdfs.batchCallTimeout depends on hdfs.batchSize. A greater hdfs.batchSize requires a larger hdfs.batchCallTimeout. If the value of hdfs.batchCallTimeout is too small, writing events to HDFS may fail.

    serializer.appendNewline

    true

    Indicates whether to add a line feed character (\n) after an event is written to HDFS. If a line feed character is added, the data volume counters used by the line feed character will not be calculated by HDFS sinks.

  • Avro Sink

    An Avro sink converts events into Avro events and sends them to the monitoring ports of the hosts. Common configurations are as follows.

    Table 12 Common configurations of an Avro sink

    Parameter

    Default Value

    Description

    channel

    -

    Channel connected to the sink.

    type

    -

    Type, which is set to avro.

    hostname

    -

    Name or IP address of the bound host

    port

    -

    Monitoring port

    batch-size

    1000

    Number of events sent in a batch.

    ssl

    false

    Indicates whether to use SSL encryption.

    truststore-type

    JKS

    Java truststore type.

    truststore

    -

    Java truststore file.

    truststore-password

    -

    Java truststore password.

    keystore-type

    JKS

    Keystore type.

    keystore

    -

    Keystore file.

    keystore-password

    -

    Keystore password.

  • HBase Sink

    An HBase sink writes data into HBase. Common configurations are as follows.

    Table 13 Common configurations of an HBase sink

    Parameter

    Default Value

    Description

    channel

    -

    Channel connected to the sink.

    type

    -

    Type, which is set to hbase.

    table

    -

    HBase table name.

    monTime

    0 (disabled)

    Thread monitoring threshold. When the update time (seconds) exceeds the threshold, the sink is restarted.

    columnFamily

    -

    HBase column family.

    batchSize

    1000

    Number of events written into HBase at a time.

    kerberosPrincipal

    -

    Kerberos username for HBase authentication. This parameter is not required for a cluster in which Kerberos authentication is disabled.

    kerberosKeytab

    -

    Kerberos keytab for HBase authentication. This parameter is not required for a cluster in which Kerberos authentication is disabled.

  • Kafka Sink

    A Kafka sink writes data into Kafka. Common configurations are as follows.

    Table 14 Common configurations of a Kafka sink

    Parameter

    Default Value

    Description

    channel

    -

    Channel connected to the sink.

    type

    -

    Type, which is set to org.apache.flume.sink.kafka.KafkaSink.

    kafka.bootstrap.servers

    -

    List of Kafka Brokers, which are separated by commas.

    monTime

    0 (disabled)

    Thread monitoring threshold. When the update time (seconds) exceeds the threshold, the sink is restarted.

    kafka.topic

    default-flume-topic

    Topic where data is written.

    flumeBatchSize

    1000

    Number of events written into Kafka at a time.

    kafka.security.protocol

    SASL_PLAINTEXT

    Security protocol of Kafka. The value must be set to PLAINTEXT for clusters in which Kerberos authentication is disabled.

    Other Kafka Producer Properties

    -

    Other Kafka configurations. This parameter can be set to any production configuration supported by Kafka, and the .kafka prefix must be added to the configuration.

  • OBS Sink

    An OBS sink writes data into OBS. As OBS sink and HDFS sink use the same file system interface, their parameter configurations are similar. The following table provides common configurations of an OBS sink:

    Table 15 Common configurations of an OBS sink

    Parameter

    Default Value

    Description

    channel

    -

    Channel connected to the sink.

    type

    hdfs

    Type, which is set to hdfs.

    monTime

    0 (disabled)

    Thread monitoring threshold. When the update time (seconds) exceeds the threshold, the sink is restarted.

    hdfs.path

    -

    OBS path in the s3a://AK:SK@Bucket/Path/ format, for example, s3a://AK:SK@obs-nemon-sink/obs-sink/

    hdfs.inUseSuffix

    .tmp

    Suffix of the OBS file being written.

    hdfs.rollInterval

    30

    Interval for file rolling. The unit is second.

    hdfs.rollSize

    1024

    Size for file rolling. The unit is byte.

    hdfs.rollCount

    10

    Number of events for file rolling.

    hdfs.idleTimeout

    0

    Timeout interval for closing idle files automatically. The unit is second.

    hdfs.batchSize

    1000

    Number of events written into OBS at a time.

    hdfs.calltimeout

    10000

    Timeout interval for interaction with OBS. The unit is millisecond. The timeout interval must be as maximum as possible, for example, 1000000, because files are copied when some operations (such as OBS renaming) are performed, which requires a long time.

    hdfs.fileCloseByEndEvent

    true

    Indicates whether the file is closed when the last event is received.

    hdfs.batchCallTimeout

    -

    Timeout control duration (milliseconds) each time events are written into OBS.

    If this parameter is not specified, the timeout duration is controlled when each event is written into OBS. When the value of hdfs.batchSize is greater than 0, configure this parameter to improve the performance of writing data into OBS.

    NOTE:

    The value of hdfs.batchCallTimeout depends on hdfs.batchSize. A greater hdfs.batchSize requires a larger hdfs.batchCallTimeout. If the value of hdfs.batchCallTimeout is too small, writing events to OBS may fail.

    serializer.appendNewline

    true

    Indicates whether to add a line feed character (\n) after an event is written to OBS. If a line feed character is added, the data volume counters used by the line feed character will not be calculated by OBS sinks.