flume install
flume 安装
1
2
3
4
5
6[root@10 app]# tar xzf apache-flume-1.7.0-bin.tar.gz
[root@10 app]# mv apache-flume-1.7.0-bin /mnt/app/flume
[root@10 app]# chown -R wisdom.wisdom /mnt/app/flume
[root@10 app]# mkdir -p /mnt/{data,log}/flume
[root@10 app]# chown -R wisdom.wisdom /mnt/{data,log}/flumeflume 环境参数配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37[root@10 app]# cp /mnt/app/flume/conf/{flume-env.sh,flume-env.sh.bak}
[root@10 app]# cat > /mnt/app/flume/conf/flume-env.sh <<EOF
> export JAVA_HOME=/mnt/app/java
> export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
> export JAVA_OPTS="$JAVA_OPTS -Dorg.apache.flume.log.rawdata=true -Dorg.apache.flume.log.printconfig=true "
> FLUME_CLASSPATH="/home/flume/flume/lib"
EOF
3. flume log配置
```bash
[root@10 ~]# cat /mnt/app/flume/conf/log4j.properties |grep -v ^#|grep -v ^$
flume.root.logger=INFO,LOGFILE
flume.log.dir=/mnt/log/flume //主要是这里
flume.log.file=flume.log
log4j.logger.org.apache.flume.lifecycle = INFO
log4j.logger.org.jboss = WARN
log4j.logger.org.mortbay = INFO
log4j.logger.org.apache.avro.ipc.NettyTransceiver = WARN
log4j.logger.org.apache.hadoop = INFO
log4j.logger.org.apache.hadoop.hive = ERROR
log4j.rootLogger=${flume.root.logger}
log4j.appender.LOGFILE=org.apache.log4j.RollingFileAppender
log4j.appender.LOGFILE.MaxFileSize=100MB
log4j.appender.LOGFILE.MaxBackupIndex=10
log4j.appender.LOGFILE.File=${flume.log.dir}/${flume.log.file}
log4j.appender.LOGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.LOGFILE.layout.ConversionPattern=%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %x - %m%n
log4j.appender.DAILY=org.apache.log4j.rolling.RollingFileAppender
log4j.appender.DAILY.rollingPolicy=org.apache.log4j.rolling.TimeBasedRollingPolicy
log4j.appender.DAILY.rollingPolicy.ActiveFileName=${flume.log.dir}/${flume.log.file}
log4j.appender.DAILY.rollingPolicy.FileNamePattern=${flume.log.dir}/${flume.log.file}.%d{yyyy-MM-dd}
log4j.appender.DAILY.layout=org.apache.log4j.PatternLayout
log4j.appender.DAILY.layout.ConversionPattern=%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %x - %m%n
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d (%t) [%p - %l] %m%nflume 配置文件参数
[root@10 app]# su - wisdom
[wisdom@10 ~]$ vim /mnt/app/flume/conf/test.conf定义参数
producer.sources = s_test
producer.channels = c_test
producer.sinks = r_test
定义通道
producer.channels.c_test.type = file
producer.channels.c_test.checkpointDir = /mnt/data/flume/test/filechannel/checkpointDir
producer.channels.c_test.dataDirs = /mnt/data/flume/test/filechannel/dataDirs
producer.channels.c_test.transactionCapacity = 40000
producer.channels.c_test.capacity = 2000000
producer.channels.c_test.maxFileSize = 2146435071
producer.channels.c_test.minimumRequiredSpace = 524288000
producer.channels.c_test.checkpointInterval = 20000
定义输出到kafka
producer.sinks.r_test.type = org.apache.flume.sink.kafka.KafkaSink
producer.sinks.r_test.kafka.bootstrap.servers = 10.0.3.40:9092,10.0.3.41:9092,10.0.3.42:9092
producer.sinks.r_test.kafka.topic = index-test
producer.sinks.r_test.kafka.flumeBatchSize = 100
producer.sinks.r_test.kafka.producer.acks = 1
producer.sinks.r_test.kafka.producer.compression.type = snappy
producer.sinks.r_test.kafka.producer.max.request.size = 10000000
定义源数据库
producer.sources.s_test.type = TAILDIR
producer.sources.s_test.filegroups = f1
producer.sources.s_test.filegroups.f1 = /mnt/log/test/^test.log$
producer.sources.s_test.positionFile = /mnt/data/flume/test/filesource/test.json
sources 和 sinks 绑定 channel,实现一条通道
producer.sinks.r_test.channel = c_test
producer.sources.s_test.channels = c_test1
2
3
4
5. flume 执行任务
```bash
[wisdom@10 ~]$ /mnt/app/flume/bin/flume-ng agent -n producer --conf /mnt/app/flume/conf -f /mnt/app/flume/conf/service.properties &
- 补充
1
/mnt/app/flume2es/bin/flume-ng agent -n producer -f /mnt/app/flume2es/conf/test2.properties --conf /mnt/app/flume2es/conf -Dflume.root.logger=debug,console
通过flume 从kafka读取数据写入ES
1 | 使用flume经验: |