admin管理员组文章数量:1550528
在这种情况下,我们有3个kafka主题(每个主题有50个分区),它们具有不同的消息,而所有这些消息都具有“用户名”字段,
topic_1 --> Message01 {String username; ...}, about 50,000 messages per minute
topic_2 --> Message02 {String username; ...}, about 3,000,000 messages per minute
topic_3 --> Message03 {String username; ...}, about 70,000 messages per minute
我们定义了一个包装器类,
MessageWrapper{
List list01;
List list02;
List list03;
}
我们有一个flatMap,可以将原始消息“转换”为元组3,
String field --> username
Integer field --> type
MessageWrapper field --> the wrapper object
所有3个流均由类似的flatMap()函数处理,
public void flatMap(Message01 value, Collector> out)
throws Exception {
String name = value.getUsername();
if (!StringUtils.isBlank(name)) {
MessageWrapper wrapper = new MessageWrapper();
List list = new ArrayList<>();
list.add(value);
wrapper.setList01(list);
out.collect(new Tuple3<>(name, 1, wrapper));
}
}
在flatMap()之后,我们合并了这3个流,
stream1.union(stream2, stream3).keyBy(0).timeWindow(Time.seconds(300))
.process(
new ProcessWindowFunction, MessageWrapper, Tuple, TimeWindow>() {
@Override
public void process(Tuple key,
ProcessWindowFunction, MessageWrapper, Tuple, TimeWindow>.Context ctx,
Iterable> elements,
Collector out) throws Exception {
// merge all entities which have same username, to get a big fat wrapper object
MessageWrapper w = new MessageWrapper();
for (Tuple3 t3 : elements) {
MessageWrapper ret = t3.f2;
Integer type = t3.f1;
if (type == 1) {
// add to list01
} else if (type == 2) {
// add to list02
} else if (type == 3) {
// add to list03
}
}
if (all 3 lists are not empty) {
out.collect(ret);
}
}
});
目前我们使用20个taskmanager,每个4核+ 16G,总共80个插槽,我们使用50个并行度。
我们总是会遇到由于过多的gc导致taskmanager无法响应的问题,
Connecting to remote task manager + 'xxxxxxxxxxxxx' has failed. This might indicate that the remote task manager has been lost".
如果我们将时间范围从5分钟减少到1分钟,一切都很好。 据此看来,flink集群似乎没有足够的资源,但是80核+ 320G可以容纳几百万条消息(每条消息的大小约为5KB),应该足够吗?
任何人都可以在这里找到一些启示吗? 或者代码中可能存在一些问题?
版权声明:本文标题:Flink服务器无响应,apache-flink 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://m.elefans.com/dianzi/1727247676a1104832.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论