Async Hbase 时频繁遇到 too many open file 异常


在使用 Async Hbase 时频繁遇到 too many open file 异常,程序自动重启后会立即报错,具体报错日志如下:

  • flink版本:1.16.0
  • asynchbase版本:1.8.2
2023-03-08 16:15:39
org.jboss.netty.channel.ChannelException: Failed to create a selector.
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:343)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.<init>(AbstractNioSelector.java:100)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.<init>(AbstractNioWorker.java:52)
at org.jboss.netty.channel.socket.nio.NioWorker.<init>(NioWorker.java:45)
at org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45)
at org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28)
at org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:143)
at org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:81)
at org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:39)
at org.hbase.async.HBaseClient.defaultChannelFactory(HBaseClient.java:707)
at org.hbase.async.HBaseClient.<init>(HBaseClient.java:507)
at org.hbase.async.HBaseClient.<init>(HBaseClient.java:496)
at com.topgame.function.HbaseDimTrackerAsyncFunc.open(HbaseDimTrackerAsyncFunc.java:37)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:214)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Too many open files
at sun.nio.ch.IOUtil.makePipe(Native Method)
at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
at java.nio.channels.Selector.open(Selector.java:227)
at org.jboss.netty.channel.socket.nio.SelectorUtil.open(SelectorUtil.java:63)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:341)
... 25 more

对当前程序使用文件描述符数量进行监控,发现当程序抛出如下错误自动重启后,程序使用文件描述符数量激增。错误日志如下

java.io.IOException: Could not perform checkpoint 5 for operator async wait operator (2/9)#0.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1238)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:488)
at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 5 for operator async wait operator (2/9)#0. Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:345)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:228)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:213)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:726)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:363)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1281)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1269)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1226)
... 22 more
Caused by: java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:131)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.copy(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:308)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:106)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:75)
at org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:65)
at org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:79)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:77)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:36)
at org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:230)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:230)
... 33 more

对文件描述符分析后发现,发现绝大部分文件描述符如下(占比:15.4k/15.9k),怀疑是程序重启后之前的文件描述符没有释放

lsof -U 

COMMAND    PID USER   FD      TYPE             DEVICE  SIZE/OFF       NODE NAME
java    168258 yarn *648u  a_inode               0,10         0       8670 [eventpoll]
java    168258 yarn *652r     FIFO                0,9       0t0  850723636 pipe
java    168258 yarn *653w     FIFO                0,9       0t0  850723636 pipe
java    168258 yarn *654u  a_inode               0,10         0       8670 [eventpoll]
java    168258 yarn *655r     FIFO                0,9       0t0  850723637 pipe
java    168258 yarn *656w     FIFO                0,9       0t0  850723637 pipe
java    168258 yarn *657u  a_inode               0,10         0       8670 [eventpoll]
java    168258 yarn *658r     FIFO                0,9       0t0  850723638 pipe
java    168258 yarn *659w     FIFO                0,9       0t0  850723638 pipe
java    168258 yarn *660u  a_inode               0,10         0       8670 [eventpoll]
java    168258 yarn *661w     FIFO                0,9       0t0  850723639 pipe

pom

<dependency>
    <groupId>org.hbase</groupId>
    <artifactId>asynchbase</artifactId>
    <version>1.8.2</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>*</artifactId>
        </exclusion>
    </exclusions>
</dependency>

关键代码如下:


SingleOutputStreamOperator asyncFunc = AsyncDataStream
    .orderedWaitWithRetry(
         source, 
        new HbaseDimensionAsyncFunc(), 
        60, 
        TimeUnit.SECONDS, 
        300, 
        AsyFixedRetry()
    )
    .setParallelism(9)
    .uid("asyncFunc");

public class HbaseDimensionAsyncFunc extends RichAsyncFunction<Tuple2<String, ArrayList<HashMap<String, String>>>, ArrayList<HashMap<String, String>>> {
    private HBaseClient client = null;

    // open() 方法在 RichAsyncFunction 生命周期中只会调用一次,用于初始化 Hbase 客户端
    @Override
    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        client = new HBaseClient(PropUtils.getValue("bigdata.hosts"));
    }

    // asyncInvoke() 方法是异步调用的核心逻辑,用于执行 Hbase 查询操作
    @Override
    public void asyncInvoke(Tuple2<String, ArrayList<HashMap<String, String>>> o, ResultFuture<ArrayList<HashMap<String, String>>> resultFuture) throws Exception {
        String uuid = o.f0;
        List<GetRequest> requests = generateRequests(uuid);

        // 使用异步 Hbase 客户端库查询 Hbase
        CompletableFuture<List<KeyValue>> future = client.get(requests);

        // 处理异步查询结果
        future.thenApplyAsync(keyValues -> {
            // 业务代码,对查询结果进行处理
            ArrayList<HashMap<String, String>> result = processResult(keyValues);
            return result;
        }).thenAcceptAsync(result -> {
            // 将处理结果发送给 Flink 算子
            resultFuture.complete(Collections.singletonList(result));
        });
    }
}

通过代码看作业在Failover 时的确会有 HBaseClient 的资源泄露。

在 HbaseDimensionAsyncFunc 中重写一下 close 方法,释放掉 HBaseClient。

关闭资源时还需要注意一下事项:

  1. 数据刷盘:在关闭连接时,需要将缓冲区中的数据刷盘,即将缓冲区中的数据写入磁盘。这可以确保数据被正确地保存到文件系统中,而不会丢失或损坏。
  2. 资源关闭:在释放资源之前,需要确保已经完成了所有的操作,并将资源关闭。需要在调用 close() 方法之前,确保所有的数据都已经被写入磁盘,然后再关闭。
  3. Future 取消:在使用 Java 中的 Future 接口时,如果任务还没有完成,就需要取消它。在取消 Future 时,需要使用 cancel() 方法,并传递一个布尔值来指示是否需要中断任务的执行。这可以确保任务被正确地停止,并释放相关资源。

文章作者: hnbian
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 hnbian !
评论
 上一篇
使用 assembly 对 SpringBoot 代码打包 使用 assembly 对 SpringBoot 代码打包
1. 背景最近向服务器上部署 spring boot项目时,会遇到一个问题,那就是 spring boot 打包时,会将自己写的代码和项目的所有依赖文件打成一个可执行的 jar 包。通常我们的项目都是运行在服务器上的,当项目更新时,每次都要
2023-03-17
下一篇 
安装 Ambari 时遇到的异常记录 安装 Ambari 时遇到的异常记录
由于最近公司机房断电,导致几台测试环境 Ambari 服务器损坏, 所以对 Ambari服务进行了重装, 下面对此次安装过程中遇见的问题与解决办法进行记录 1. 安装 unzipTraceback (most recent call l
2023-03-08
  目录