Flume 收集Log4j日志并且发送到kafka简单示例:
第一:配置flume ,启动Flume; flume 配置文件如下:
a1.sources = r1a1.sinks = k1a1.channels = c1a1.sources.r1.type=avroa1.sources.r1.bind=localhosta1.sources.r1.port=4444a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.topic = testa1.sinks.k1.brokerList =192.168.1.12:9092a1.sinks.k1.requiredAcks = 1a1.sinks.k1.batchSize = 20kafka.producer.type=synckafka.partitioner.class=org.apache.flume.plugins.SinglePartitiona1.channels.c1.type=memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100a1.sources.r1.channels=c1a1.sinks.k1.channel=c1
第二:测试工程:
创建工程,引入flume-ng-log4jappender-1.6.0-jar-with-dependencies.jar。
代码:
package com.ls.flume;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;public class FlumeLogAppender { private static final Log logger = LogFactory.getLog(FlumeLogAppender.class); public static void main(String[] args) { int i = 0; while (true) { logger.info("Hello world ! 这是一个测试消息" + i); System.out.println(i++); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } }}
2. log4j.properties
log4j.category.com.ls=INFO,flumelog4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppenderlog4j.appender.flume.Hostname = localhostlog4j.appender.flume.Port = 4444log4j.appender.flume.UnsafeMode = falselog4j.appender.flume.layout=org.apache.log4j.PatternLayoutlog4j.appender.flume.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p [%c:%L] - %m%n
第三:遇到的错误:
编译时报以下错误,不过不影响执行效果。
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
解决方法:添加slf4j-nop-1.7.12.jar ;