背景:
整个 Streaming是按照Batch Duractions划分Job的。但是很多时候我们需要算过去的一天甚至一周的数据,这个时候不可避免的要进行状态管理,而Spark Streaming每个Batch Duractions都会产生一个Job,Job里面都是RDD,所以此时面临的问题就是怎么对状态进行维护?这个时候就需要借助updateStateByKey和mapWithState方法完成核心的步骤。 源码分析: 1. 无论是updateStateByKey还是mapWithState方法在DStream中均没有,但是是通过隐身转换函数实现其功能。object DStream { // `toPairDStreamFunctions` was in SparkContext before 1.3 and users had to // `import StreamingContext._` to enable it. Now we move it here to make the compiler find // it automatically. However, we still keep the old function in StreamingContext for backward // compatibility and forward to the following function directly. implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairDStreamFunctions[K, V] = { new PairDStreamFunctions[K, V](stream) }
updateStateByKey:
1. 在PairDStreamFunctions中updateStateByKey具体实现如下: 在已有的历史基础上,updateFunc对历史数据进行更新。该函数的返回值是DStream类型的。/** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. * @tparam S State type */def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)] = ssc.withScope {// defaultPartitioner updateStateByKey(updateFunc, defaultPartitioner())}
2. defaultPartitioner:
private[streaming] def defaultPartitioner(numPartitions: Int = self.ssc.sc.defaultParallelism) = { new HashPartitioner(numPartitions)}
3. partitioner就是控制RDD的每个patition
/** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of the key. * org.apache.spark.Partitioner is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. * @param partitioner Partitioner for controlling the partitioning of each RDD in the new * DStream. * @tparam S State type */def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner ): DStream[(K, S)] = ssc.withScope { val cleanedUpdateF = sparkContext.clean(updateFunc) val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => { iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s))) } updateStateByKey(newUpdateFunc, partitioner, true)}
4. rememberPartitioner默认为true
/** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of each key. * org.apache.spark.Partitioner is used to control the partitioning of each RDD. * @param updateFunc State update function. Note, that this function may generate a different * tuple with a different key than the input key. Therefore keys may be removed * or added in this way. It is up to the developer to decide whether to * remember the partitioner despite the key being changed. * @param partitioner Partitioner for controlling the partitioning of each RDD in the new * DStream * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. * @tparam S State type */def updateStateByKey[S: ClassTag]( updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, rememberPartitioner: Boolean ): DStream[(K, S)] = ssc.withScope { new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, rememberPartitioner, None)}
5. 在StateDStream中,StorageLevel是直接存储到磁盘,因为此时的数据非常大
class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( parent: DStream[(K, V)], updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, preservePartitioning: Boolean, initialRDD : Option[RDD[(K, S)]] ) extends DStream[(K, S)](parent.ssc) { super.persist(StorageLevel.MEMORY_ONLY_SER)
在computeUsingPreiviousRDD源码如下:
private [this] def computeUsingPreviousRDD ( parentRDD : RDD[(K, V)], prevStateRDD : RDD[(K, S)]) = { // Define the function for the mapPartition operation on cogrouped RDD; // first map the cogrouped tuple to tuples of required type, // and then apply the update function val updateFuncLocal = updateFunc val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => { val i = iterator.map(t => { val itr = t._2._2.iterator val headOption = if (itr.hasNext) Some(itr.next()) else None (t._1, t._2._1.toSeq, headOption) }) updateFuncLocal(i) }//cogroup每次计算的时候都会遍历prevSrateRDD中的所有parititioner的信息// val cogroupedRDD = parentRDD.cogroup(prevStateRDD, partitioner) val stateRDD = cogroupedRDD.mapPartitions(finalFunc, preservePartitioning) Some(stateRDD)}
所以,如果数据很多的时候不建议使用updateStateByKey。
updateStateByKey函数实现如下:mapWithState:
1. 返回MapWithStateDStream函数,维护和更新历史状态都是基于Key。使用一个function对key-value形式的数据进行状态维护。/** * :: Experimental :: * Return a `MapWithStateDStream` by applying a function to every key-value element of * `this` stream, while maintaining some state data for each unique key. The mapping function * and other specification (e.g. partitioners, timeouts, initial state data, etc.) of this * transformation can be specified using `StateSpec` class. The state data is accessible in * as a parameter of type `State` in the mapping function. * * Example of using `mapWithState`: * { { { * // A mapping function that maintains an integer state and return a String//此时的state就可以看成一张表,这张表记录了状态维护中所有的历史状态。 * def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = { * // Use state.exists(), state.get(), state.update() and state.remove() * // to manage state, and return the necessary string * } * * val spec = StateSpec.function(mappingFunction).numPartitions(10) * * val mapWithStateDStream = keyValueDStream.mapWithState[StateType, MappedType](spec) * }}} * * @param spec Specification of this transformation * @tparam StateType Class type of the state data * @tparam MappedType Class type of the mapped data */@Experimentaldef mapWithState[StateType: ClassTag, MappedType: ClassTag]( spec: StateSpec[K, V, StateType, MappedType] ): MapWithStateDStream[K, V, StateType, MappedType] = { new MapWithStateDStreamImpl[K, V, StateType, MappedType]( self,// StateSpecImpl类封装了StateSpec操作。 spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]] )}
2. MapWithStateDStream源码如下:
/** * :: Experimental :: * DStream representing the stream of data generated by `mapWithState` operation on a * [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair DStream]]. * Additionally, it also gives access to the stream of state snapshots, that is, the state data of * all keys after a batch has updated them. * * @tparam KeyType Class of the key * @tparam ValueType Class of the value * @tparam StateType Class of the state data * @tparam MappedType Class of the mapped data */@Experimentalsealed abstract class MapWithStateDStream[KeyType, ValueType, StateType, MappedType: ClassTag]( ssc: StreamingContext) extends DStream[MappedType](ssc) { /** Return a pair DStream where each RDD is the snapshot of the state of all the keys. */ def stateSnapshots(): DStream[(KeyType, StateType)]}/** Internal implementation of the `MapWithStateDStream` */private[streaming] class MapWithStateDStreamImpl[ KeyType: ClassTag, ValueType: ClassTag, StateType: ClassTag, MappedType: ClassTag]( dataStream: DStream[(KeyType, ValueType)], spec: StateSpecImpl[KeyType, ValueType, StateType, MappedType]) extends MapWithStateDStream[KeyType, ValueType, StateType, MappedType](dataStream.context) { private val internalStream = new InternalMapWithStateDStream[KeyType, ValueType, StateType, MappedType](dataStream, spec) override def slideDuration: Duration = internalStream.slideDuration override def dependencies: List[DStream[_]] = List(internalStream)//计算的时候是通过InternalMapWithStateDStream来实现的。 override def compute(validTime: Time): Option[RDD[MappedType]] = { internalStream.getOrCompute(validTime).map { _.flatMap[MappedType] { _.mappedData } } }
3. 更新历史数据。
/** * A DStream that allows per-key state to be maintains, and arbitrary records to be generated * based on updates to the state. This is the main DStream that implements the `mapWithState` * operation on DStreams. * * @param parent (key, value) stream that is the source * @param spec Specifications of the mapWithState operation * @tparam K Key type * @tparam V Value type * @tparam S Type of the state maintained * @tparam E Type of the mapped data */private[streaming]class InternalMapWithStateDStream[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag]( parent: DStream[(K, V)], spec: StateSpecImpl[K, V, S, E]) extends DStream[MapWithStateRDDRecord[K, S, E]](parent.context) {//不断的更新内存数据结构。 persist(StorageLevel.MEMORY_ONLY)
4. MapWithStateDStream.Compute
/** Method that generates a RDD for the given time */ override def compute(validTime: Time): Option[RDD[MapWithStateRDDRecord[K, S, E]]] = { // Get the previous state or create a new empty state RDD val prevStateRDD = getOrCompute(validTime - slideDuration) match { case Some(rdd) => if (rdd.partitioner != Some(partitioner)) { // If the RDD is not partitioned the right way, let us repartition it using the // partition index as the key. This is to ensure that state RDD is always partitioned // before creating another state RDD using it MapWithStateRDD.createFromRDD[K, V, S, E]( rdd.flatMap { _.stateMap.getAll() }, partitioner, validTime) } else { rdd } case None => MapWithStateRDD.createFromPairRDD[K, V, S, E]( spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext)), partitioner, validTime ) }//基于时间窗口创建RDD // Compute the new state RDD with previous state RDD and partitioned data RDD // Even if there is no data RDD, use an empty one to create a new state RDD val dataRDD = parent.getOrCompute(validTime).getOrElse { context.sparkContext.emptyRDD[(K, V)] } val partitionedDataRDD = dataRDD.partitionBy(partitioner) val timeoutThresholdTime = spec.getTimeoutInterval().map { interval => (validTime - interval).milliseconds } Some(new MapWithStateRDD( prevStateRDD, partitionedDataRDD, mappingFunction, validTime, timeoutThresholdTime)) }}
5. MapWithStateRDD: 是一个RDD,他本身包含了对mapWithState操作的数据,以及对数据怎么操作,MapWithStateRDDRecord代表了每个RDD的partition。
/** * RDD storing the keyed states of `mapWithState` operation and corresponding mapped data. * Each partition of this RDD has a single record of type `MapWithStateRDDRecord`. This contains a * `StateMap` (containing the keyed-states) and the sequence of records returned by the mapping * function of `mapWithState`. * @param prevStateRDD The previous MapWithStateRDD on whose StateMap data `this` RDD * will be created * @param partitionedDataRDD The partitioned data RDD which is used update the previous StateMaps * in the `prevStateRDD` to create `this` RDD * @param mappingFunction The function that will be used to update state and return new data * @param batchTime The time of the batch to which this RDD belongs to. Use to update * @param timeoutThresholdTime The time to indicate which keys are timeout */private[streaming] class MapWithStateRDD[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag]( private var prevStateRDD: RDD[MapWithStateRDDRecord[K, S, E]], private var partitionedDataRDD: RDD[(K, V)], mappingFunction: (Time, K, Option[V], State[S]) => Option[E], batchTime: Time, timeoutThresholdTime: Option[Long] ) extends RDD[MapWithStateRDDRecord[K, S, E]]( partitionedDataRDD.sparkContext, List( new OneToOneDependency[MapWithStateRDDRecord[K, S, E]](prevStateRDD), new OneToOneDependency(partitionedDataRDD)) ) { @volatile private var doFullScan = false require(prevStateRDD.partitioner.nonEmpty) require(partitionedDataRDD.partitioner == prevStateRDD.partitioner) override val partitioner = prevStateRDD.partitioner override def checkpoint(): Unit = { super.checkpoint() doFullScan = true } override def compute( partition: Partition, context: TaskContext): Iterator[MapWithStateRDDRecord[K, S, E]] = { val stateRDDPartition = partition.asInstanceOf[MapWithStateRDDPartition] val prevStateRDDIterator = prevStateRDD.iterator( stateRDDPartition.previousSessionRDDPartition, context) val dataIterator = partitionedDataRDD.iterator( stateRDDPartition.partitionedDataRDDPartition, context) val prevRecord = if (prevStateRDDIterator.hasNext) Some(prevStateRDDIterator.next()) else None val newRecord = MapWithStateRDDRecord.updateRecordWithData( prevRecord, dataIterator, mappingFunction, batchTime, timeoutThresholdTime, removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled ) Iterator(newRecord) }
6. updateRecordWithData: RDD本身不可变的,但是可以处理变化的数据。
def updateRecordWithData[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag]( prevRecord: Option[MapWithStateRDDRecord[K, S, E]], dataIterator: Iterator[(K, V)], mappingFunction: (Time, K, Option[V], State[S]) => Option[E], batchTime: Time, timeoutThresholdTime: Option[Long], removeTimedoutData: Boolean ): MapWithStateRDDRecord[K, S, E] = { // Create a new state map by cloning the previous one (if it exists) or by creating an empty one val newStateMap = prevRecord.map { _.stateMap.copy() }. getOrElse { new EmptyStateMap[K, S]() } val mappedData = new ArrayBuffer[E] val wrappedState = new StateImpl[S]() // Call the mapping function on each record in the data iterator, and accordingly // update the states touched, and collect the data returned by the mapping function dataIterator.foreach { case (key, value) => wrappedState.wrap(newStateMap.get(key)) val returned = mappingFunction(batchTime, key, Some(value), wrappedState) if (wrappedState.isRemoved) { newStateMap.remove(key) } else if (wrappedState.isUpdated || timeoutThresholdTime.isDefined) {//遍历当前所有batchTime的所有数据,然后使用自定义的函数对当前的batch数据进行计算,更新newStateMap数据结构。// newStateMap是保存历史数据 newStateMap.put(key, wrappedState.get(), batchTime.milliseconds) } mappedData ++= returned } // Get the timed out state records, call the mapping function on each and collect the // data returned if (removeTimedoutData && timeoutThresholdTime.isDefined) { newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) => wrappedState.wrapTimingOutState(state) val returned = mappingFunction(batchTime, key, None, wrappedState) mappedData ++= returned newStateMap.remove(key) } }// MapWithStateRDDRecord所代表的partition,从RDD的角度来说,没有变。但是内部变了。只是内部数据发送变化了。 MapWithStateRDDRecord(newStateMap, mappedData) }}
MapWithState实现如下:
总结:
备注:
1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY直播频道号:689175803、新浪微博: http://www.weibo.com/ilovepains本文转自http://blog.csdn.net/snail_gesture/article/details/51510588