admin管理员组

文章数量:1530967

Flink重启策略

为什么需要设置重启策略?

当任务失败时,Flink需要重新启动失败的任务和其他受影响的任务,以将作业恢复到正常状态。

重新启动策略和故障转移策略用于控制任务重新启动。重新启动策略决定是否以及何时可以重新启动失败/受影响的任务。故障转移策略决定应重新启动哪些任务以恢复作业。

NOTE:重启策略需要配合Checkpoint启动,因为需要用到flink的内部State

使用RestartStrategy

配置文件配置

配置文件中是DataSet&DataStream通用的。

如果enableCheckpoint()没有设置,那么restart-strategy默认为:none

如果设置了enableCheckpoint(),那么restart-strategy为:fixed-delay且delay=1s

#这里有3种不同的重启策略,
restart-strategy: none, off, disable|fixeddelay, fixed-delay|failurerate, failure-rate

通过ExecutionConfig配置

//限定重启次数
val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setRestartStrategy(
	RestartStrategies.fixedDelayRestart(
  3, // 尝试重启的次数
  Time.of(10, TimeUnit.SECONDS) // 每次重启之间的时间间隔,即重启尝试时间
)
  
//限定失败率
  //如果Duration被设为5分钟 = 300s,那么10s尝试重启一次,那么实际可重试30次
  //failure-rate = n/30 其中n为重试但失败的次数,如果达到一定的阈值,那么任务重启失败
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3, // 5min内允许失败的最大次数,可以适当调整
  Time.of(5, TimeUnit.MINUTES), //用来衡量失败率的时间间隔
  Time.of(10, TimeUnit.SECONDS) //2个连续的重试尝试之间的时间间隔
))

重启策略 Restart strategy

fixed-delay
#假如 restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts=3 [default]
restart-strategy.fixed-delay.delay=2s [default]

举个栗子:
===> 假如 delay=1s,attempts=1,那么重启的策略就为每2秒尝试重启一次,要么重启成功,要么失败进入下一次重启尝试,如果累计重试次数达到3次但是任然没有成功,那么这个task重启就算失败
failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3  
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s

举个栗子:
===> 假如failure-rate-interval=5min,max-failures-per-interval=3,delay=10,那么重启策略就是每10s尝试重启一次,如果连续重试失败次数超过3次,那么表示重启失败
non-restart

不启用重启策略

fallback-restart

Flink自动管理重启策略,如果用这个策略,那么默认就是使用fixed-dalay

失败策略Failover strategy

官网参考:https://ci.apache/projects/flink/flink-docs-release-1.10/dev/task_failure_recovery.html

可以通过flink-conf.yaml来设置failover strategy

Restart All Failover Strategy
  • 这个策略是重启整个job中所有的task,从失败恢复到正常状态
Restart Pipelined Region Failover Strateg
  • 用来决定在region 失败策略中的region范围,这种策略比重启所有任务代价要小的多env.getConfig.setExecutionMode(ExecutionMode.PIPELINED)
jobmanager.execution.failover-strategyvalue to config
Restart all 重启所有的任务Full
Restart pipelined region 重启单个分区内的任务Region

简单的实践Checkpoint代码

package com.shufang.state.chekpoint

import com.shufang.broadcast.People
import com.shufang.entities.WorkPeople
import com.shufang.source.MyUDFPeopleSource
import org.apache.flink.api.common.ExecutionMode
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.functions.co.{BroadcastProcessFunction, KeyedBroadcastProcessFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object CheckPointDemo {

  def main(args: Array[String]): Unit = {

    //获取执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //用来决定在region (failover strategy)失败策略中的region范围
    env.getConfig.setExecutionMode(ExecutionMode.PIPELINED)
    /**
     * --------------------------------------checkpoint的配置-----------------------------------------------
     */
    env.enableCheckpointing(1000) //每1s checkpoint 一次
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) //默认是EXACTLY_ONCE
    env.getCheckpointConfig.setCheckpointInterval(1000) //每隔 1s进行一次checkpoint 的工作
    env.getCheckpointConfig.setCheckpointTimeout(6000) //如果checkpoint操作在6s之内没有完成,那么就discard终端该checkpoint操作
    //true:假如在checkpoint过程中产生了Error,那么Task直接显示失败
    //false:产生了error,Task继续运行,checkpoint会降级到之前那个状态
    env.getCheckpointConfig.setFailOnCheckpointingErrors(false) //默认为true
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //在统一时间只能同时有1个checkpoint操作,其他的操作必须等当前操作执行完或者超时之后才能执行
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION) //清除或保留状态
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(0) //下一个checkpoint操作触发之前最小的阻塞时间,必须>=0


    /** --------------------------------------配置重启策略----------------------------------------------------
     * When a task failure happens, (当一个任务失败后)
     * Flink needs to restart the failed task and other affected tasks to recover the job to a normal state.
     * (Flink 需要重启失败的任务和其他受影响的task并恢复到一个正常的状态)
     * 重启配置与checkpoint设置有关:
     * 如果没有开启checkpoint,那么重启策略为:no restart!
     * 如果开启了checkpoint,那么重启策略默认为:fixed-delay strategy is used with Integer.MAX_VALUE
     *
     * restart-strategy 可以在flink-conf.yaml中进行设置,也可以通过env.setRestartStrategy()设置
     */


    /*env.setRestartStrategy(
      RestartStrategies.failureRateRestart(
        10,
        Time.minutes(5),
        Time.seconds(10))
    )*/

    //env.setRestartStrategy(new RestartStrategies.FallbackRestartStrategyConfiguration) //自动按照fixed-dalay重启策略

    /*env.setRestartStrategy(
      new RestartStrategies.FailureRateRestartStrategyConfiguration(
      10,
      Time.minutes(5),
      Time.seconds(10)))*/

    //env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration())

    //env.setRestartStrategy(new RestartStrategies.FixedDelayRestartStrategyConfiguration(5,Time.seconds(4)))

    //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,Time.seconds(4)))

    val config = new RestartStrategies.FailureRateRestartStrategyConfiguration(3, Time.minutes(5), Time.seconds(10))
    env.setRestartStrategy(config)

    val ds: DataStream[WorkPeople] = env.addSource(new MyUDFPeopleSource)

    val ds1: DataStream[(Int, Char)] = env.fromElements((1, '男'), (2, '女'))

    val describer = new MapStateDescriptor[Int, Char]("genderInfo", classOf[Int], classOf[Char])

    val bcStream: BroadcastStream[(Int, Char)] = ds1.broadcast(describer)

    val resultStream: DataStream[People] = ds.connect(bcStream).process(
      new BroadcastProcessFunction[WorkPeople, (Int, Char), People] {
        override def processElement(value: WorkPeople,
                                    ctx: BroadcastProcessFunction[WorkPeople, (Int, Char), People]#ReadOnlyContext,
                                    out: Collector[People]): Unit = {
          val gender: Char = ctx.getBroadcastState(describer).get(value.genderCode).charValue()
          out.collect(People(value.id, value.name, gender, value.address, value.price))
        }

        override def processBroadcastElement(value: (Int, Char), ctx: BroadcastProcessFunction[WorkPeople, (Int, Char), People]#Context, out: Collector[People]): Unit = {
          ctx.getBroadcastState(describer).put(value._1, value._2)

        }
      }
    )


    ds.print("before:")
    resultStream.print("after:")


    env.execute("checkpoint")
  }
}

本文标签: 重启策略FlinkrestartStrategy