Flume Service Configuration Guide¶
This section applies to MRS 3.x or later clusters.
This configuration guide describes how to configure common Flume services. For non-common Source, Channel, and Sink configuration, see the user manual provided by the Flume community.
Note
Parameters in bold in the following tables are mandatory.
The value of BatchSize of the Sink must be less than that of transactionCapacity of the Channel.
Only some parameters of Source, Channel, and Sink are displayed on the Flume configuration tool page. For details, see the following configurations.
The Customer Source, Customer Channel, and Customer Sink displayed on the Flume configuration tool page need to be configured based on self-developed code. The following common configurations are not displayed.
Common Source Configurations¶
Avro Source
An Avro source listens to the Avro port, receives data from the external Avro client, and places data into configured channels. Common configurations are as follows:
¶ Parameter
Default Value
Description
channels
-
Specifies the channel connected to the source. Multiple channels can be configured.
type
avro
Specifies the type of the avro source, which must be avro.
bind
-
Specifies the listening host name/IP address.
port
-
Specifies the bound listening port. Ensure that this port is not occupied.
threads
-
Specifies the maximum number of source threads.
compression-type
none
Specifies the message compression format, which can be set to none or deflate. none indicates that data is not compressed, while deflate indicates that data is compressed.
compression-level
6
Specifies the data compression level, which ranges from 1 to 9. The larger the value is, the higher the compression rate is.
ssl
false
Specifies whether to use SSL encryption. If this parameter is set to true, the values of keystore and keystore-password must be specified.
truststore-type
JKS
Specifies the Java trust store type, which can be set to JKS or PKCS12.
Note
Different passwords are used to protect the key store and private key of JKS, while the same password is used to protect the key store and private key of PKCS12.
truststore
-
Specifies the Java trust store file.
truststore-password
-
Specifies the Java trust store password.
keystore-type
JKS
Specifies the keystore type set after SSL is enabled, which can be set to JKS or PKCS12.
Note
Different passwords are used to protect the key store and private key of JKS, while the same password is used to protect the key store and private key of PKCS12.
keystore
-
Specifies the keystore file path set after SSL is enabled. This parameter is mandatory if SSL is enabled.
keystore-password
-
Specifies the keystore password set after SSL is enabled. This parameter is mandatory if SSL is enabled.
trust-all-certs
false
Specifies whether to disable the check for the SSL server certificate. If this parameter is set to true, the SSL server certificate of the remote source is not checked. You are not advised to perform this operation during the production.
exclude-protocols
SSLv3
Specifies the excluded protocols. The entered protocols must be separated by spaces. The default value is SSLv3.
ipFilter
false
Specifies whether to enable the IP address filtering.
ipFilter.rules
-
Specifies the rules of N network ipFilters. Host names or IP addresses must be separated by commas (,). If this parameter is set to true, there are two configuration rules: allow and forbidden. The configuration format is as follows:
ipFilterRules=allow:ip:127.*, allow:name:localhost, deny:ip:*
SpoolDir Source
SpoolDir Source monitors and transmits new files that have been added to directories in real-time mode. Common configurations are as follows:
¶ Parameter
Default Value
Description
channels
-
Specifies the channel connected to the source. Multiple channels can be configured.
type
spooldir
Specifies the type of the spooling source, which must be set to spooldir.
spoolDir
-
Specifies the monitoring directory of the Spooldir source. A Flume running user must have the read, write, and execution permissions on the directory.
monTime
0 (Disabled)
Specifies the thread monitoring threshold. When the update time exceeds the threshold, the source is restarted. Unit: second
fileSuffix
.COMPLETED
Specifies the suffix added after file transmission is complete.
deletePolicy
never
Specifies the source file deletion policy after file transmission is complete. The value can be either never or immediate. never indicates that the source file is not deleted after file transmission is complete, while immediate indicates that the source file is immediately deleted after file transmission is complete.
ignorePattern
^$
Specifies the regular expression of a file to be ignored. The default value is ^$, indicating that spaces are ignored.
includePattern
^.*$
Specifies the regular expression that contains a file. This parameter can be used together with ignorePattern. If a file meets both ignorePattern and includePattern, the file is ignored. In addition, when a file starts with a period (.), the file will not be filtered.
trackerDir
.flumespool
Specifies the metadata storage path during data transmission.
batchSize
1000
Specifies the number of events written to the channel in batches.
decodeErrorPolicy
FAIL
Specifies the code error policy.
Note
If a code error occurs in the file, set decodeErrorPolicy to REPLACE or IGNORE. Flume will skip the code error and continue to collect subsequent logs.
deserializer
LINE
Specifies the file parser. The value can be either LINE or BufferedLine.
When the value is set to LINE, characters read from the file are transcoded one by one.
When the value is set to BufferedLine, one line or multiple lines of characters read from the file are transcoded in batches, which delivers better performance.
deserializer.maxLineLength
2048
Specifies the maximum length for resolution by line.
deserializer.maxBatchLine
1
Specifies the maximum number of lines for resolution by line. If multiple lines are set, maxLineLength must be set to a corresponding multiplier.
Note
When configuring the Interceptor, take the multi-line combination into consideration to avoid data loss. If the Interceptor cannot process combined lines, set this parameter to 1.
selector.type
replicating
Specifies the selector type. The value can be either replicating or multiplexing. replicating indicates that data is replicated and then transferred to each channel so that each channel receives the same data, while multiplexing indicates that a channel is selected based on the value of the header in the event and each channel has different data.
interceptors
-
Specifies the interceptor. Multiple interceptors are separated by spaces.
inputCharset
UTF-8
Specifies the encoding format of a read file. The encoding format must be the same as that of the data source file that has been read. Otherwise, an error may occur during character parsing.
fileHeader
false
Specifies whether to add the file name (including the file path) to the event header.
fileHeaderKey
-
Specifies that the data storage structure in header is set in the <key,value> mode. Parameters fileHeaderKey and fileHeader must be used together. Following is an example if fileHeader is set to true:
Define fileHeaderKey as file. When the /root/a.txt file is read, fileHeaderKey exists in the header in the file=/root/a.txt format.
basenameHeader
false
Specifies whether to add the file name (excluding the file path) to the event header.
basenameHeaderKey
-
Specifies that the data storage structure in header is set in the <key,value> mode. Parameters basenameHeaderKey and basenameHeader must be used together. Following is an example if basenameHeader is set to true:
Define basenameHeaderKey as file. When the a.txt file is read, fileHeaderKey exists in the header in the file=a.txt format.
pollDelay
500
Specifies the delay for polling new files in the monitoring directory. Unit: milliseconds
recursiveDirectorySearch
false
Specifies whether to monitor new files in the subdirectory of the configured directory.
consumeOrder
oldest
Specifies the consumption order of files in a directory. If this parameter is set to oldest or youngest, the sequence of files to be read is determined by the last modification time of files in the monitored directory. If there are a large number of files in the directory, it takes a long time to search for oldest or youngest files. If this parameter is set to random, an earlier created file may not be read for a long time. If this parameter is set to oldest or youngest, it takes a long time to find the latest and the earliest file. The options are as follows: random, youngest, and oldest.
maxBackoff
4000
Specifies the maximum time to wait between consecutive attempts to write to a channel if the channel is full. If the time exceeds the threshold, an exception is thrown. The corresponding source starts to write at a smaller time value. Each time the source attempts, the digital exponent increases until the current specified value is reached. If data cannot be written, the data write fails. Unit: second
emptyFileEvent
true
Specifies whether to collect empty file information and send it to the sink end. The default value is true, indicating that empty file information is sent to the sink end. This parameter is valid only for HDFS Sink. Taking HDFS Sink as an example, if this parameter is set to true and an empty file exists in the spoolDir directory, an empty file with the same name will be created in the hdfs.path directory of HDFS.
Note
SpoolDir Source ignores the last line feed character of each event when data is reading by row. Therefore, Flume does not calculate the data volume counters used by the last line feed character.
Kafka Source
A Kafka source consumes data from Kafka topics. Multiple sources can consume data of the same topic, and the sources consume different partitions of the topic. Common configurations are as follows:
¶ Parameter
Default Value
Description
channels
-
Specifies the channel connected to the source. Multiple channels can be configured.
type
org.apache.flume.source.kafka.KafkaSource
Specifies the type of the Kafka source, which must be set to org.apache.flume.source.kafka.KafkaSource.
kafka.bootstrap.servers
-
Specifies the bootstrap address port list of Kafka. If Kafka has been installed in the cluster and the configuration has been synchronized to the server, you do not need to set this parameter on the server. The default value is the list of all brokers in the Kafka cluster. This parameter must be configured on the client. Use commas (,) to separate multiple values of IP address:Port number. The rules for matching ports and security protocols must be as follows: port 21007 matches the security mode (SASL_PLAINTEXT), and port 9092 matches the common mode (PLAINTEXT).
kafka.topics
-
Specifies the list of subscribed Kafka topics, which are separated by commas (,).
kafka.topics.regex
-
Specifies the subscribed topics that comply with regular expressions. kafka.topics.regex has a higher priority than kafka.topics and will overwrite kafka.topics.
monTime
0 (Disabled)
Specifies the thread monitoring threshold. When the update time exceeds the threshold, the source is restarted. Unit: second
nodatatime
0 (Disabled)
Specifies the alarm threshold. An alarm is triggered when the duration that Kafka does not release data to subscribers exceeds the threshold. Unit: second This parameter can be configured in the properties.properties file.
batchSize
1000
Specifies the number of events written to the channel in batches.
batchDurationMillis
1000
Specifies the maximum duration of topic data consumption at a time, expressed in milliseconds.
keepTopicInHeader
false
Specifies whether to save topics in the event header. If the parameter value is true, topics configured in Kafka Sink become invalid.
setTopicHeader
true
If this parameter is set to true, the topic name defined in topicHeader is stored in the header.
topicHeader
topic
When setTopicHeader is set to true, this parameter specifies the name of the topic received by the storage device. If the property is used with that of Kafka Sink topicHeader, be careful not to send messages to the same topic cyclically.
useFlumeEventFormat
false
By default, an event is transferred from a Kafka topic to the body of the event in the form of bytes. If this parameter is set to true, the Avro binary format of Flume is used to read events. When used together with the parseAsFlumeEvent parameter with the same name in KafkaSink or KakfaChannel, any set header generated from the data source is retained.
keepPartitionInHeader
false
Specifies whether to save partition IDs in the event header. If the parameter value is true, Kafka Sink writes data to the corresponding partition.
kafka.consumer.group.id
flume
Specifies the Kafka consumer group ID. Sources or proxies having the same ID are in the same consumer group.
kafka.security.protocol
SASL_PLAINTEXT
Specifies the Kafka security protocol. The parameter value must be set to PLAINTEXT in a common cluster. The rules for matching ports and security protocols must be as follows: port 21007 matches the security mode (SASL_PLAINTEXT), and port 9092 matches the common mode (PLAINTEXT).
Other Kafka Consumer Properties
-
Specifies other Kafka configurations. This parameter can be set to any consumption configuration supported by Kafka, and the .kafka prefix must be added to the configuration.
Taildir Source
A Taildir source monitors file changes in a directory and automatically reads the file content. In addition, it can transmit data in real time. Common configurations are as follows:
¶ Parameter
Default Value
Description
channels
-
Specifies the channel connected to the source. Multiple channels can be configured.
type
TAILDIR
Specifies the type of the taildir source, which must be set to TAILDIR.
filegroups
-
Specifies the group name of a collection file directory. Group names are separated by spaces.
filegroups.<filegroupName>.parentDir
-
Specifies the parent directory. The value must be an absolute path.
filegroups.<filegroupName>.filePattern
-
Specifies the relative file path of the file group's parent directory. Directories can be included and regular expressions are supported. It must be used together with parentDir.
positionFile
-
Specifies the metadata storage path during data transmission.
headers.<filegroupName>.<headerKey>
-
Specifies the key-value of an event when data of a group is being collected.
byteOffsetHeader
false
Specifies whether each event header contains the event location information in the source file. If the parameter value is true, the location information is saved in the byteoffset variable.
maxBatchCount
Long.MAX_VALUE
Specifies the maximum number of batches that can be consecutively read from a file. If the monitored directory reads multiple files consecutively and one of the files is written at a rapid rate, other files may fail to be processed. This is because the file that is written at a high speed will be in an infinite read loop. In this case, set this parameter to a smaller value.
skipToEnd
false
Specifies whether Flume can locate the latest location of a file and read the latest data after restart. If the parameter value is true, Flume locates and reads the latest file data after restart.
idleTimeout
120000
Specifies the idle duration during file reading, expressed in milliseconds. If file content is not changed in the preset time duration, close the file. If data is written to this file after the file is closed, open the file and read data.
writePosInterval
3000
Specifies the interval for writing metadata to a file, expressed in milliseconds.
batchSize
1000
Specifies the number of events written to the channel in batches.
monTime
0 (Disabled)
Specifies the thread monitoring threshold. When the update time exceeds the threshold, the source is restarted. Unit: second
fileHeader
false
Specifies whether to add the file name (including the file path) to the event header.
fileHeaderKey
file
Specifies that the data storage structure in header is set in the <key,value> mode. Parameters fileHeaderKey and fileHeader must be used together. Following is an example if fileHeader is set to true:
Define fileHeaderKey as file. When the /root/a.txt file is read, fileHeaderKey exists in the header in the file=/root/a.txt format.
Http Source
An HTTP source receives data from an external HTTP client and sends the data to the configured channels. Common configurations are as follows:
¶ Parameter
Default Value
Description
channels
-
Specifies the channel connected to the source. Multiple channels can be configured.
type
http
Specifies the type of the http source, which must be set to http.
bind
-
Specifies the listening host name/IP address.
port
-
Specifies the bound listening port. Ensure that this port is not occupied.
handler
org.apache.flume.source.http.JSONHandler
Specifies the message parsing method of an HTTP request. Two formats are supported: JSON (org.apache.flume.source.http.JSONHandler) and BLOB (org.apache.flume.sink.solr.morphline.BlobHandler).
handler.*
-
Specifies handler parameters.
exclude-protocols
SSLv3
Specifies the excluded protocols. The entered protocols must be separated by spaces. The default value is SSLv3.
include-cipher-suites
-
Specifies the included protocols. The entered protocols must be separated by spaces. If this parameter is left empty, all protocols are supported by default.
enableSSL
false
Specifies whether SSL is enabled in HTTP. If this parameter is set to true, the values of keystore and keystore-password must be specified.
keystore-type
JKS
Specifies the keystore type, which can be JKS or PKCS12.
keystore
-
Specifies the keystore path set after SSL is enabled in HTTP.
keystorePassword
-
Specifies the keystore password set after SSL is enabled in HTTP.
Thrift Source
Thrift Source monitors the thrift port, receives data from the external Thrift clients, and puts the data into the configured channel. Common configurations are as follows:
Parameter
Default Value
Description
channels
-
Specifies the channel connected to the source. Multiple channels can be configured.
type
thrift
Specifies the type of the thrift source, which must be set to thrift.
bind
-
Specifies the listening host name/IP address.
port
-
Specifies the bound listening port. Ensure that this port is not occupied.
threads
-
Specifies the maximum number of worker threads that can be run.
kerberos
false
Specifies whether Kerberos authentication is enabled.
agent-keytab
-
Specifies the address of the keytab file used by the server. The machine-machine account must be used. You are advised to use flume/conf/flume_server.keytab in the Flume service installation directory.
agent-principal
-
Specifies the principal of the security user used by the server. The principal must be a machine-machine account. You are advised to use the default user of Flume: flume_server/hadoop.<system domain name>@<system domain name>
Note
flume_server/hadoop.<system domain name> is the username. All letters in the system domain name contained in the username are lowercase letters. For example, Local Domain is set to 9427068F-6EFA-4833-B43E-60CB641E5B6C.COM, and the username is flume_server/hadoop.9427068f-6efa-4833-b43e-60cb641e5b6c.com.
compression-type
none
Specifies the message compression format, which can be set to none or deflate. none indicates that data is not compressed, while deflate indicates that data is compressed.
ssl
false
Specifies whether to use SSL encryption. If this parameter is set to true, the values of keystore and keystore-password must be specified.
keystore-type
JKS
Specifies the keystore type set after SSL is enabled.
keystore
-
Specifies the keystore file path set after SSL is enabled. This parameter is mandatory if SSL is enabled.
keystore-password
-
Specifies the keystore password set after SSL is enabled. This parameter is mandatory if SSL is enabled.
Common Channel Configurations¶
Memory Channel
A memory channel uses memory as the cache. Events are stored in memory queues. Common configurations are as follows:
¶ Parameter
Default Value
Description
type
-
Specifies the type of the memory channel, which must be set to memory.
capacity
10000
Specifies the maximum number of events cached in a channel.
transactionCapacity
1000
Specifies the maximum number of events accessed each time.
Note
The parameter value must be greater than the batchSize of the source and sink.
The value of transactionCapacity must be less than or equal to that of capacity.
channelfullcount
10
Specifies the channel full count. When the count reaches the threshold, an alarm is reported.
keep-alive
3
Specifies the waiting time of the Put and Take threads when the transaction or channel cache is full. Unit: second
byteCapacity
80% of the maximum JVM memory
Specifies the total bytes of all event bodies in a channel. The default value is the 80% of the maximum JVM memory (indicated by -Xmx). Unit: bytes
byteCapacityBufferPercentage
20
Specifies the percentage of bytes in a channel (%).
File Channel
A file channel uses local disks as the cache. Events are stored in the folder specified by dataDirs. Common configurations are as follows:
¶ Parameter
Default Value
Description
type
-
Specifies the type of the file channel, which must be set to file.
checkpointDir
${BIGDATA_DATA_HOME}/hadoop/data1~N/flume/checkpoint
Note
This path is changed with the custom data path.
Specifies the checkpoint storage directory.
dataDirs
${BIGDATA_DATA_HOME}/hadoop/data1~N/flume/data
Note
This path is changed with the custom data path.
Specifies the data cache directory. Multiple directories can be configured to improve performance. The directories are separated by commas (,).
maxFileSize
2146435071
Specifies the maximum size of a single cache file, expressed in bytes.
minimumRequiredSpace
524288000
Specifies the minimum idle space in the cache, expressed in bytes.
capacity
1000000
Specifies the maximum number of events cached in a channel.
transactionCapacity
10000
Specifies the maximum number of events accessed each time.
Note
The parameter value must be greater than the batchSize of the source and sink.
The value of transactionCapacity must be less than or equal to that of capacity.
channelfullcount
10
Specifies the channel full count. When the count reaches the threshold, an alarm is reported.
useDualCheckpoints
false
Specifies the backup checkpoint. If this parameter is set to true, the backupCheckpointDir parameter value must be set.
backupCheckpointDir
-
Specifies the path of the backup checkpoint.
checkpointInterval
30000
Specifies the check interval, expressed in seconds.
keep-alive
3
Specifies the waiting time of the Put and Take threads when the transaction or channel cache is full. Unit: second
use-log-replay-v1
false
Specifies whether to enable the old reply logic.
use-fast-replay
false
Specifies whether to enable the queue reply.
checkpointOnClose
true
Specifies that whether a checkpoint is created when a channel is disabled.
Memory File Channel
A memory file channel uses both memory and local disks as its cache and supports message persistence. It provides similar performance as a memory channel and better performance than a file channel. This channel is currently experimental and not recommended for use in production. The following table describes common configuration items: Common configurations are as follows:
¶ Parameter
Default Value
Description
type
org.apache.flume.channel.MemoryFileChannel
Specifies the type of the memory file channel, which must be set to org.apache.flume.channel.MemoryFileChannel.
capacity
50000
Specifies the maximum number of events cached in a channel.
transactionCapacity
5000
Specifies the maximum number of events processed by a transaction.
Note
The parameter value must be greater than the batchSize of the source and sink.
The value of transactionCapacity must be less than or equal to that of capacity.
subqueueByteCapacity
20971520
Specifies the maximum size of events that can be stored in a subqueue, expressed in bytes.
A memory file channel uses both queues and subqueues to cache data. Events are stored in a subqueue, and subqueues are stored in a queue.
subqueueCapacity and subqueueInterval determine the size of events that can be stored in a subqueue. subqueueCapacity specifies the capacity of a subqueue, and subqueueInterval specifies the duration that a subqueue can store events. Events in a subqueue are sent to the destination only after the subqueue reaches the upper limit of subqueueCapacity or subqueueInterval.
Note
The value of subqueueByteCapacity must be greater than the number of events specified by batchSize.
subqueueInterval
2000
Specifies the maximum duration that a subqueue can store events, expressed in milliseconds.
keep-alive
3
Specifies the waiting time of the Put and Take threads when the transaction or channel cache is full.
Unit: second
dataDir
-
Specifies the cache directory for local files.
byteCapacity
80% of the maximum JVM memory
Specifies the channel cache capacity.
Unit: bytes
compression-type
None
Specifies the message compression format, which can be set to none or deflate. none indicates that data is not compressed, while deflate indicates that data is compressed.
channelfullcount
10
Specifies the channel full count. When the count reaches the threshold, an alarm is reported.
The following is a configuration example of a memory file channel:
server.channels.c1.type = org.apache.flume.channel.MemoryFileChannel server.channels.c1.dataDir = /opt/flume/mfdata server.channels.c1.subqueueByteCapacity = 20971520 server.channels.c1.subqueueInterval=2000 server.channels.c1.capacity = 500000 server.channels.c1.transactionCapacity = 40000
Kafka Channel
A Kafka channel uses a Kafka cluster as the cache. Kafka provides high availability and multiple copies to prevent data from being immediately consumed by sinks when Flume or Kafka Broker crashes.
¶ Parameter
Default Value
Description
type
-
Specifies the type of the Kafka channel, which must be set to org.apache.flume.channel.kafka.KafkaChannel.
kafka.bootstrap.servers
-
Specifies the bootstrap address port list of Kafka.
If Kafka has been installed in the cluster and the configuration has been synchronized to the server, you do not need to set this parameter on the server. The default value is the list of all brokers in the Kafka cluster. This parameter must be configured on the client. Use commas (,) to separate multiple values of IP address:Port number. The rules for matching ports and security protocols must be as follows: port 21007 matches the security mode (SASL_PLAINTEXT), and port 9092 matches the common mode (PLAINTEXT).
kafka.topic
flume-channel
Specifies the Kafka topic used by the channel to cache data.
kafka.consumer.group.id
flume
Specifies the data group ID obtained from Kafka. This parameter cannot be left blank.
parseAsFlumeEvent
true
Specifies whether data is parsed into Flume events.
migrateZookeeperOffsets
true
Specifies whether to search for offsets in ZooKeeper and submit them to Kafka when there is no offset in Kafka.
kafka.consumer.auto.offset.reset
latest
Specifies where to consume if there is no offset record, which can be set to earliest, latest, or none. earliest indicates that the offset is reset to the initial point, latest indicates that the offset is set to the latest position, and none indicates that an exception is thrown if there is no offset.
kafka.producer.security.protocol
SASL_PLAINTEXT
Specifies the Kafka producer security protocol. The rules for matching ports and security protocols must be as follows: port 21007 matches the security mode (SASL_PLAINTEXT), and port 9092 matches the common mode (PLAINTEXT).
Note
If the parameter is not displayed, click + in the lower left corner of the dialog box to display all parameters.
kafka.consumer.security.protocol
SASL_PLAINTEXT
Specifies the Kafka consumer security protocol. The rules for matching ports and security protocols must be as follows: port 21007 matches the security mode (SASL_PLAINTEXT), and port 9092 matches the common mode (PLAINTEXT).
pollTimeout
500
Specifies the maximum timeout interval for the consumer to invoke the poll function. Unit: milliseconds
ignoreLongMessage
false
Specifies whether to discard oversized messages.
messageMaxLength
1000012
Specifies the maximum length of a message written by Flume to Kafka.
Common Sink Configurations¶
HDFS Sink
An HDFS sink writes data into HDFS. Common configurations are as follows:
¶ Parameter
Default Value
Description
channel
-
Specifies the channel connected to the sink.
type
hdfs
Specifies the type of the hdfs sink, which must be set to hdfs.
hdfs.path
-
Specifies the data storage path in HDFS. The value must start with hdfs://hacluster/.
monTime
0 (Disabled)
Specifies the thread monitoring threshold. When the update time exceeds the threshold, the sink is restarted. Unit: second
hdfs.inUseSuffix
.tmp
Specifies the suffix of the HDFS file to which data is being written.
hdfs.rollInterval
30
Specifies the interval for file rolling, expressed in seconds.
hdfs.rollSize
1024
Specifies the size for file rolling, expressed in bytes.
hdfs.rollCount
10
Specifies the number of events for file rolling.
Note
Parameters rollInterval, rollSize, and rollCount can be configured at the same time. The parameter meeting the requirements takes precedence for compression.
hdfs.idleTimeout
0
Specifies the timeout interval for closing idle files automatically, expressed in seconds.
hdfs.batchSize
1000
Specifies the number of events written into HDFS in batches.
hdfs.kerberosPrincipal
-
Specifies the Kerberos principal of HDFS authentication. This parameter is mandatory in a secure mode, but not required in a common mode.
hdfs.kerberosKeytab
-
Specifies the Kerberos keytab of HDFS authentication. This parameter is not required in a common mode, but in a secure mode, the Flume running user must have the permission to access keyTab path in the jaas.cof file.
hdfs.fileCloseByEndEvent
true
Specifies whether to close the HDFS file when the last event of the source file is received.
hdfs.batchCallTimeout
-
Specifies the timeout control duration when events are written into HDFS in batches. Unit: milliseconds
If this parameter is not specified, the timeout duration is controlled when each event is written into HDFS. When the value of hdfs.batchSize is greater than 0, configure this parameter to improve the performance of writing data into HDFS.
Note
The value of hdfs.batchCallTimeout depends on hdfs.batchSize. A greater hdfs.batchSize requires a larger hdfs.batchCallTimeout. If the value of hdfs.batchCallTimeout is too small, writing events to HDFS may fail.
serializer.appendNewline
true
Specifies whether to add a line feed character (\n) after an event is written to HDFS. If a line feed character is added, the data volume counters used by the line feed character will not be calculated by HDFS sinks.
hdfs.filePrefix
over_%{basename}
Specifies the file name prefix after data is written to HDFS.
hdfs.fileSuffix
-
Specifies the file name suffix after data is written to HDFS.
hdfs.inUsePrefix
-
Specifies the prefix of the HDFS file to which data is being written.
hdfs.fileType
DataStream
Specifies the HDFS file format, which can be set to SequenceFile, DataStream, or CompressedStream.
Note
If the parameter is set to SequenceFile or DataStream, output files are not compressed, and the codeC parameter cannot be configured. However, if the parameter is set to CompressedStream, the output files are compressed, and the codeC parameter must be configured together.
hdfs.codeC
-
Specifies the file compression format, which can be set to gzip, bzip2, lzo, lzop, or snappy.
hdfs.maxOpenFiles
5000
Specifies the maximum number of HDFS files that can be opened. If the number of opened files reaches this value, the earliest opened files are closed.
hdfs.writeFormat
Writable
Specifies the file write format, which can be set to Writable or Text.
hdfs.callTimeout
10000
Specifies the timeout control duration each time events are written into HDFS, expressed in milliseconds.
hdfs.threadsPoolSize
-
Specifies the number of threads used by each HDFS sink for HDFS I/O operations.
hdfs.rollTimerPoolSize
-
Specifies the number of threads used by each HDFS sink to schedule the scheduled file rolling.
hdfs.round
false
Specifies whether to round off the timestamp value. If this parameter is set to true, all time-based escape sequences (except %t) are affected.
hdfs.roundUnit
second
Specifies the unit of the timestamp value that has been rounded off, which can be set to second, minute, or hour.
hdfs.useLocalTimeStamp
true
Specifies whether to enable the local timestamp. The recommended parameter value is true.
hdfs.closeTries
0
Specifies the maximum attempts for the hdfs sink to stop renaming a file. If the parameter is set to the default value 0, the sink does not stop renaming the file until the file is successfully renamed.
hdfs.retryInterval
180
Specifies the interval of request for closing the HDFS file, expressed in seconds.
Note
For each closing request, there are multiple RPCs working on the NameNode back and forth, which may make the NameNode overloaded if the parameter value is too small. Also, when the parameter is set to 0, the Sink will not attempt to close the file, but opens the file or uses .tmp as the file name extension, if the first closing attempt fails.
hdfs.failcount
10
Specifies the number of times that data fails to be written to HDFS. If the number of times that the sink fails to write data to HDFS exceeds the parameter value, an alarm indicating abnormal data transmission is reported.
Avro Sink
An Avro sink converts events into Avro events and sends them to the monitoring ports of the hosts. Common configurations are as follows:
¶ Parameter
Default Value
Description
channel
-
Specifies the channel connected to the sink.
type
-
Specifies the type of the avro sink, which must be set to avro.
hostname
-
Specifies the bound host name or IP address.
port
-
Specifies the bound listening port. Ensure that this port is not occupied.
batch-size
1000
Specifies the number of events sent in a batch.
client.type
DEFAULT
Specifies the client instance type. Set this parameter based on the communication protocol used by the configured model. The options are as follows:
DEFAULT: The client instance of the AvroRPC type is returned.
OTHER: NULL is returned.
THRIFT: The client instance of the Thrift RPC type is returned.
DEFAULT_LOADBALANCING: The client instance of the LoadBalancing RPC type is returned.
DEFAULT_FAILOVER: The client instance of the Failover RPC type is returned.
ssl
false
Specifies whether to use SSL encryption. If this parameter is set to true, the values of keystore and keystore-password must be specified.
truststore-type
JKS
Specifies the Java trust store type, which can be set to JKS or PKCS12.
Note
Different passwords are used to protect the key store and private key of JKS, while the same password is used to protect the key store and private key of PKCS12.
truststore
-
Specifies the Java trust store file.
truststore-password
-
Specifies the Java trust store password.
keystore-type
JKS
Specifies the keystore type set after SSL is enabled.
keystore
-
Specifies the keystore file path set after SSL is enabled. This parameter is mandatory if SSL is enabled.
keystore-password
-
Specifies the keystore password after SSL is enabled. This parameter is mandatory if SSL is enabled.
connect-timeout
20000
Specifies the timeout for the first connection, expressed in milliseconds.
request-timeout
20000
Specifies the maximum timeout for a request after the first request, expressed in milliseconds.
reset-connection-interval
0
Specifies the interval between a connection failure and a second connection, expressed in seconds. If the parameter is set to 0, the system continuously attempts to perform a connection.
compression-type
none
Specifies the compression type of the batch data, which can be set to none or deflate. none indicates that data is not compressed, while deflate indicates that data is compressed. This parameter value must be the same as that of the AvroSource compression-type.
compression-level
6
Specifies the compression level of batch data, which can be set to 1 to 9. A larger value indicates a higher compression rate.
exclude-protocols
SSLv3
Specifies the excluded protocols. The entered protocols must be separated by spaces. The default value is SSLv3.
HBase Sink
An HBase sink writes data into HBase. Common configurations are as follows:
¶ Parameter
Default Value
Description
channel
-
Specifies the channel connected to the sink.
type
-
Specifies the type of the HBase sink, which must be set to hbase.
table
-
Specifies the HBase table name.
columnFamily
-
Specifies the HBase column family.
monTime
0 (Disabled)
Specifies the thread monitoring threshold. When the update time exceeds the threshold, the sink is restarted. Unit: second
batchSize
1000
Specifies the number of events written into HBase in batches.
kerberosPrincipal
-
Specifies the Kerberos principal of HBase authentication. This parameter is mandatory in a secure mode, but not required in a common mode.
kerberosKeytab
-
Specifies the Kerberos keytab of HBase authentication. This parameter is not required in a common mode, but in a secure mode, the Flume running user must have the permission to access keyTab path in the jaas.cof file.
coalesceIncrements
true
Specifies whether to perform multiple operations on the same hbase cell in a same processing batch. Setting this parameter to true improves performance.
Kafka Sink
A Kafka sink writes data into Kafka. Common configurations are as follows:
¶ Parameter
Default Value
Description
channel
-
Specifies the channel connected to the sink.
type
-
Specifies the type of the kafka sink, which must be set to org.apache.flume.sink.kafka.KafkaSink.
kafka.bootstrap.servers
-
Specifies the bootstrap address port list of Kafka. If Kafka has been installed in the cluster and the configuration has been synchronized to the server, you do not need to set this parameter on the server. The default value is the list of all brokers in the Kafka cluster. The client must be configured with this parameter. If there are multiple values, use commas (,) to separate the values. The rules for matching ports and security protocols must be as follows: port 21007 matches the security mode (SASL_PLAINTEXT), and port 9092 matches the common mode (PLAINTEXT).
monTime
0 (Disabled)
Specifies the thread monitoring threshold. When the update time exceeds the threshold, the sink is restarted. Unit: second
kafka.producer.acks
1
Successful write is determined by the number of received acknowledgement messages about replicas. The value 0 indicates that no confirm message needs to be received, the value 1 indicates that the system is only waiting for only the acknowledgement information from a leader, and the value -1 indicates that the system is waiting for the acknowledgement messages of all replicas. If this parameter is set to -1, data loss can be avoided in some leader failure scenarios.
kafka.topic
-
Specifies the topic to which data is written. This parameter is mandatory.
flumeBatchSize
1000
Specifies the number of events written into Kafka in batches.
kafka.security.protocol
SASL_PLAINTEXT
Specifies the Kafka security protocol. The parameter value must be set to PLAINTEXT in a common cluster. The rules for matching ports and security protocols must be as follows: port 21007 matches the security mode (SASL_PLAINTEXT), and port 9092 matches the common mode (PLAINTEXT).
ignoreLongMessage
false
Specifies whether to discard oversized messages.
messageMaxLength
1000012
Specifies the maximum length of a message written by Flume to Kafka.
defaultPartitionId
-
Specifies the Kafka partition ID to which the events of a channel is transferred. The partitionIdHeader value overwrites this parameter value. By default, if this parameter is left blank, events will be distributed by the Kafka Producer's partitioner (by a specified key or a partitioner customized by kafka.partitioner.class).
partitionIdHeader
-
When you set this parameter, the sink will take the value of the field named using the value of this property from the event header and send the message to the specified partition of the topic. If the value does not have a valid partition, EventDeliveryException is thrown. If the header value already exists, this setting overwrites the defaultPartitionId parameter.
Other Kafka Producer Properties
-
Specifies other Kafka configurations. This parameter can be set to any production configuration supported by Kafka, and the .kafka prefix must be added to the configuration.
Thrift Sink
A Thrift sink converts events to Thrift events and sends them to the monitoring port of the configured host. Common configurations are as follows:
¶ Parameter
Default Value
Description
channel
-
Specifies the channel connected to the sink.
type
thrift
Specifies the type of the thrift sink, which must be set to thrift.
hostname
-
Specifies the bound host name or IP address.
port
-
Specifies the bound listening port. Ensure that this port is not occupied.
batch-size
1000
Specifies the number of events sent in a batch.
connect-timeout
20000
Specifies the timeout for the first connection, expressed in milliseconds.
request-timeout
20000
Specifies the maximum timeout for a request after the first request, expressed in milliseconds.
kerberos
false
Specifies whether Kerberos authentication is enabled.
client-keytab
-
Specifies the path of the client keytab file. The Flume running user must have the access permission on the authentication file.
client-principal
-
Specifies the principal of the security user used by the client.
server-principal
-
Specifies the principal of the security user used by the server.
compression-type
none
Specifies the compression type of data sent by Flume, which can be set to none or deflate. none indicates that data is not compressed, while deflate indicates that data is compressed.
maxConnections
5
Specifies the maximum size of the connection pool for Flume to send data.
ssl
false
Specifies whether to use SSL encryption.
truststore-type
JKS
Specifies the Java trust store type.
truststore
-
Specifies the Java trust store file.
truststore-password
-
Specifies the Java trust store password.
reset-connection-interval
0
Specifies the interval between a connection failure and a second connection, expressed in seconds. If the parameter is set to 0, the system continuously attempts to perform a connection.
Precautions¶
What are the reliability measures of Flume?
Use the transaction mechanisms between Source and Channel as well as between Channel and Sink.
Configure the failover and load_balance mechanisms for Sink Processor. The following shows a load balancing example.
server.sinkgroups=g1 server.sinkgroups.g1.sinks=k1 k2 server.sinkgroups.g1.processor.type=load_balance server.sinkgroups.g1.processor.backoff=true server.sinkgroups.g1.processor.selector=random
What are the precautions for the aggregation and cascading of multiple Flume agents?
Avro or Thrift protocol can be used for cascading.
When the aggregation end contains multiple nodes, evenly distribute the agents and do not aggregate all agents on a single node.