admin管理员组文章数量:1642335
错误信息如下:
2020-03-13 23:56:29,230 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
2020-03-13 23:56:29,230 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:171)] Starting Sink k1
2020-03-13 23:56:29,230 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source r1
2020-03-13 23:56:29,235 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - com.atguigu.MySink.process(MySink.java:44)] Header:{timestamp=1584114989234}atguigu:atguigu:4:go!
2020-03-13 23:56:29,236 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows.
java.lang.IllegalStateException: rollback() begin() called when transaction is COMPLETED!
at com.googlemon.base.Preconditions.checkState(Preconditions.java:172)
at org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:163)
at com.atguigu.MySink.process(MySink.java:48)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
^C2020-03-13 23:56:34,238 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows.
java.lang.IllegalStateException: rollback() called when transaction is COMPLETED!
at com.googlemon.base.Preconditions.checkState(Preconditions.java:172)
at org.apache.flume.channel.BasicTransactionSemantics.rollback(BasicTransactionSemantics.java:163)
at com.atguigu.MySink.process(MySink.java:48)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
2020-03-13 23:56:34,240 (agent-shutdown-hook) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.stop(LifecycleSupervisor.java:78)] Stopping lifecycle supervisor 12
2020-03-13 23:56:34,243 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:149)] Component type: CHANNEL, name: c1 stopped
2020-03-13 23:56:34,244 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:155)] Shutdown Metric for type: CHANNEL, name: c1. channel.start.time == 1584114989230
2020-03-13 23:56:34,244 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:161)] Shutdown Metric for type: CHANNEL, name: c1. channel.stop.time == 1584114994243
2020-03-13 23:56:34,244 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:177)] Shutdown Metric for type: CHANNEL, name: c1. channel.capacity == 1000
2020-03-13 23:56:34,244 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:177)] Shutdown Metric for type: CHANNEL, name: c1. channel.current.size == 9
2020-03-13 23:56:34,244 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:177)] Shutdown Metric for type: CHANNEL, name: c1. channel.event.put.attempt == 10
2020-03-13 23:56:34,244 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:177)] Shutdown Metric for type: CHANNEL, name: c1. channel.event.put.success == 10
2020-03-13 23:56:34,244 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:177)] Shutdown Metric for type: CHANNEL, name: c1. channel.event.take.attempt == 1
2020-03-13 23:56:34,244 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:177)] Shutdown Metric for type: CHANNEL, name: c1. channel.event.take.success == 1
2020-03-13 23:56:35,245 (agent-shutdown-hook) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.stop(PollingPropertiesFileConfigurationProvider.java:84)] Configuration provider stopping
定位问题:
当事务完成后没有被关闭。
发生原因:
http://flume.apache/releases/content/1.9.0/FlumeDeveloperGuide.html#sink
复制官网的这个自定义的Sink
并没有在 finally代码块 关闭事务
.
- 官网源代码如下:
public class MySink extends AbstractSink implements Configurable {
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
Event event = ch.take();
// Send the Event to the external repository.
// storeSomeData(e);
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
// Log exception, handle individual exceptions as needed
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error)t;
}
}
//+++++++++++++++++++++++++++++++++++++++++++++++++
// 这里没有对事务txn进行close()处理
// 因此添加上close即可。
finally {
if (txn != null) {
//关闭事务
txn.close();
}
}
//+++++++++++++++++++++++++++++++++++++++++++++++
return status;
}
}
本文标签: langJavaflumeIllegalStateExceptioncompleted
版权声明:本文标题:Flume产生java.lang.IllegalStateException: begin() called when transaction is COMPLETED! 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://m.elefans.com/dongtai/1729332692a1196596.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论