温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Executor容错安全性实例分析

发布时间:2021-12-16 16:32:37 来源:亿速云 阅读:112 作者:iii 栏目:云计算

这篇文章主要介绍“Executor容错安全性实例分析”,在日常操作中,相信很多人在Executor容错安全性实例分析问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Executor容错安全性实例分析”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

sparkstreaming会不断的接收数据、不断的产生job、不断的提交job。所以有一个至关重要的问题就是数据安全性。由于sparkstreaming是基于sparkcore的,如果我们可以确保数据安全可靠的话(sparkstreaming生产job的时候里面是基于RDD),即使运行的时候出现错误或者故障,也可以基于RDD的容错的能力自动进行恢复。所以要确保数据的安全性。

对于executor的安全容错主要是数据的安全容错。Executor计算时候的安全容错是借助spark core的RDD的,所以天然是安全的。

数据安全性的一种方式是存储一份副本,另一种方式是不做副本,但是数据源支持重放(也就是可以反复的读取数据源的数据),如果之前读取的数据出现问题,可以重新读取数据。

做副本的方式可以借助blockmanager做备份。Blockmanager存储数据的时候有很多storagelevel,Receiver接收数据后,存储的时候指定storagelevel为MEMORY_AND_DISK_SER_2的方式。Blockmanager早存储的时候会先考虑memory,只有memory不够的时候才会考虑disk,一般memory都是够的。所以至少两个executor上都会有数据,假设一个executor挂掉,就会马上切换到另一个executor。

ReceiverSupervisorImpl在存储数据的时候会有两种方式,一种是WAL的方式,究竟是不是WAL得方式是通过配置修改的。默认是false。如果用WAL的方式必须有checkpoint的目录,因为WAL的数据是放在checkpoint的目录之下的。

def enableReceiverLog(conf: SparkConf): Boolean = {
  conf.getBoolean(RECEIVER_WAL_ENABLE_CONF_KEY, false)
}

Storagelevel是在构建inputDstream的时候传入的,默认就是MEMORY_AND_DISK_SER_2。

* @param storageLevel  Storage level to use for storing the received objects
 *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
 */

def socketTextStream(
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
  socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}

现在来看ReceiverSupervisorImpl在存储数据的另一种方式(副本方式)。注释中说的很清楚,根据指定的storagelevel把接收的blocks交给blockmanager。也就是通过blockmanager来存储。

/**
 * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
 * stores the received blocks into a block manager with the specified storage level.
 */

private[streaming] class BlockManagerBasedBlockHandler(
    blockManager: BlockManager, storageLevel: StorageLevel)

Blockmanager存储的时候会分为多种不同的数据类型,ArrayBufferBlock,IteratorBlock,ByteBufferBlock。

Blockmanager存储数据前面已经讲过了。Receiver在接收到数据后除了在自己这个executor上面存储,还会在另外一个executor上存储。如果一个executor出现问题会瞬间切换到另一个executor。

WAL的方式原理:在具体的目录下会做一份日志,假设后续处理的过程中出了问题,可以基于日志恢复,日志是写在checkpoint下。在生产环境下checkpoint是在HDFS上,这样日志就会有三份副本。

下面就是用WAL存储数据的类,先写日志再交给blockmanager存储。

/**
 * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which
 * stores the received blocks in both, a write ahead log and a block manager.
 */

private[streaming] class WriteAheadLogBasedBlockHandler(

如果采用WAL的方式,存储数据的时候就不需要有两份副本,这样太浪费内存,如果storagelevel.replication大于1就会打印警告日志。

private val effectiveStorageLevel = {
  if (storageLevel.deserialized) {
    logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported when" +
      s" write ahead log is enabled, change to serialization false")
  }
  if (storageLevel.replication > 1) {
    logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when " +
      s"write ahead log is enabled, change to replication 1")
  }

  StorageLevel(storageLevel.useDisk, storageLevel.useMemory, storageLevel.useOffHeap, false, 1)
}

这里采用两条线程的线程池,使得blockmanager存储数据和write ahead log可以并发的执行。

// For processing futures used in parallel block storing into block manager and write ahead log
// # threads = 2, so that both writing to BM and WAL can proceed in parallel
implicit private val executionContext = ExecutionContext.fromExecutorService(
  ThreadUtils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName))

这个是把日志写入WAL中

// Store the block in write ahead log
val storeInWriteAheadLogFuture = Future {
  writeAheadLog.write(serializedBlock, clock.getTimeMillis())
}

负责读写WAL的是WriteAheadLog,这是一个抽象类,负责写入、读取、清除数据的功能。在写入数据后会返回一个句柄,以供读取数据使用。

看一下具体写入数据的实现。如果失败并且失败次数小于最大的失败次数就会重试。确实是返回了一个句柄。

/**
 * Write a byte buffer to the log file. This method synchronously writes the data in the
 * ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed
 * to HDFS, and will be available for readers to read.
 */

def write(byteBuffer: ByteBuffer, time: Long): FileBasedWriteAheadLogSegment = synchronized {
  var fileSegment: FileBasedWriteAheadLogSegment = null
  var failures = 0
  var lastException: Exception = null
  var succeeded = false
  while (!succeeded && failures < maxFailures) {
    try {
      fileSegment = getLogWriter(time).write(byteBuffer)
      if (closeFileAfterWrite) {
        resetWriter()
      }
      succeeded = true
    catch {
      case ex: Exception =>
        lastException = ex
        logWarning("Failed to write to write ahead log")
        resetWriter()
        failures += 1
    }
  }
  if (fileSegment == null) {
    logError(s"Failed to write to write ahead log after $failures failures")
    throw lastException
  }
  fileSegment
}

下面就是把数据写入HDFS的代码

/** Write the bytebuffer to the log file */
def write(data: ByteBuffer): FileBasedWriteAheadLogSegment = synchronized {
  assertOpen()
  data.rewind() // Rewind to ensure all data in the buffer is retrieved
  val lengthToWrite = data.remaining()
  val segment = new FileBasedWriteAheadLogSegment(path, nextOffset, lengthToWrite)
  stream.writeInt(lengthToWrite)
  if (data.hasArray) {
    stream.write(data.array())
  } else {
    // If the buffer is not backed by an array, we transfer using temp array
    // Note that despite the extra array copy, this should be faster than byte-by-byte copy
    while (data.hasRemaining) {
      val array = new Array[Byte](data.remaining)
      data.get(array)
      stream.write(array)
    }
  }
  flush()
  nextOffset stream.getPos()
  segment
}

不管是WAL还是直接交给blockmanager都是采用副本的方式。还有一种是数据源支持数据存放,典型的就是kafka。Kafka已经成为了数据存储系统,它天然具有容错和数据副本。

Kafka有receiver和direct的方式。Receiver的方式其实是交给zookeper来管理matadata的(偏移量offset),如果数据处理失败后,kafka会基于offset重新读取数据。为什么可以重新读取?如果程序崩溃或者数据没处理完是不会给zookeper发ack。Zookeper就认为这个数据没有被消费。实际生产环境下越来越多的使用directAPI的方式,直接去操作kafka并且是自己管理offset。这就可以保证有且只有一次的容错处理。DirectKafkaInputDstream,它会去看最新的offset,并把这个内容放入batch中。

获取最新的offset,通过最新的offset减去上一个offset就可以确定读哪些数据,也就是一个batch中的数据。

@tailrec
protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
  val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
  // Either.fold would confuse @tailrec, do it manually
  if (o.isLeft) {
    val err = o.left.get.toString
    if (retries <= 0) {
      throw new SparkException(err)
    } else {
      log.error(err)
      Thread.sleep(kc.config.refreshLeaderBackoffMs)
      latestLeaderOffsets(retries - 1)
    }
  } else {
    o.right.get
  }
}

容错的弊端就是消耗性能,占用时间。也不是所有情况都不能容忍数据丢失。有些情况下可以不进行容错来提高性能。

假如一次处理1000个block,但是有1个block出错,就需要把1000个block进行重新读取或者恢复,这也有性能问题。

到此,关于“Executor容错安全性实例分析”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI