


22/06/23 12:08:58 ERROR hdfs.HDFSEventSink: process failed
java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
	at com.googlemon.base.Preconditions.checkNotNull(
	at org.apache.flume.formatter.output.BucketPath.replaceShorthand(
	at org.apache.flume.formatter.output.BucketPath.escapeString(
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(
	at org.apache.flume.sink.DefaultSinkProcessor.process(
	at org.apache.flume.SinkRunner$
22/06/23 12:08:58 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(
	at org.apache.flume.sink.DefaultSinkProcessor.process(
	at org.apache.flume.SinkRunner$
Caused by: java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
	at com.googlemon.base.Preconditions.checkNotNull(
	at org.apache.flume.formatter.output.BucketPath.replaceShorthand(
	at org.apache.flume.formatter.output.BucketPath.escapeString(
	at org.apache.flume.sink.hdfs.HDFSEventSink.process(
	... 3 more
22/06/23 12:08:58 WARN source.SpoolDirectorySource: The channel is full, and cannot write data now. The source will try again after 4000 milliseconds
22/06/23 12:09:02 INFO avro.ReliableSpoolingFileEventReader: Last read was never committed - resetting mark position.


1. 22/06/23 12:08:58 ERROR hdfs.HDFSEventSink: process failed    sink进程失败. 

2. java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null           flume的日志头信息中缺少timestamp. 

3. 22/06/23 12:08:58 ERROR flume.SinkRunner: Unable to deliver event.      sinkRunner组件无法发送event. 

4. 22/06/23 12:08:58 WARN source.SpoolDirectorySource: The channel is full, and cannot write data now. The source will try again after 4000 milliseconds      channel堆积event满了,现在source无法写入。4秒后再试. 

从以上可知:   应该是sink的问题。跟timestamp的配置有关.    source和channel没有问题. 

以上是数据采集的目录,从这里可以看到,有一部分文件已经标识为 COMPLETED了,即source没有问题,但后面有几个文件没有变为COMPLETED,与前面的信息配置分析可知,这是因为channel满了,无法继续读取数据导致. 



a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /tmp/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = = hdfs://node1:8020/flume/upload/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 10
a3.sinks.k3.hdfs.roundUnit = minute
#a3.sinks.k3.hdfs.useLocalTimeStamp = true      #关键是这一句被 注释了

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3 = c3

#a3.sinks.k3.hdfs.useLocalTimeStamp = true    这一句话被 注释了, 无法生成timestamp. 

2. 改好后,运行又出现一错误: 

 java.URISyntaxException: Illegal character in scheme name at index 0: = hdfs://node1:8020/flume/upload/20220623/12/upload-.1655957677405.tmp

这是说hdfs的路径配置的第0个字母的  = 不合法.   然后到配置文件中一查,发现多写了一个  =   . 



  生成了大量的上传文件, 这是因为 时间间隔没有配置。 再修改sink的配置信息如下:

a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /tmp/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://node1:8020/flume/upload/%Y%m%d/%H
a3.sinks.k3.hdfs.filePrefix = upload-
a3.sinks.k3.hdfs.round = true
a3.sinks.k3.hdfs.roundValue = 10
a3.sinks.k3.hdfs.roundUnit = minute
a3.sinks.k3.hdfs.useLocalTimeStamp = true
a3.sinks.k3.hdfs.batchSize = 1000
a3.sinks.k3.hdfs.fileType = DataStream
a3.sinks.k3.hdfs.rollInterval = 600
a3.sinks.k3.hdfs.rollSize = 134217700
a3.sinks.k3.hdfs.rollCount = 0
a3.sinks.k3.hdfs.minBlockReplicas = 1

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 1000

# Bind the source and sink to the channel
a3.sources.r3.channels = c3 = c3




本文标签: 几个问题数据采集flume