生活随笔
收集整理的這篇文章主要介紹了
RxSwift之深入解析map操作符的底层实现
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
一、map 操作符的使用
map 操作符將源 Observable 的每個元素轉換一遍,然后返回含有轉換結果的 Observable:
Observable < Int > . of ( 1 , 2 , 3 , 4 , 5 , 6 ) . subscribe ( onNext
: { ( val
) in print ( val
) } ) . disposed ( by
: disposeBag
)
1 ,
2 ,
3 ,
4 ,
5 ,
6
Observable < Int > . of ( 1 , 2 , 3 , 4 , 5 , 6 ) . map { $
0 + 10 } . subscribe ( onNext
: { ( val
) in print ( val
) } ) . disposed ( by
: disposeBag
)
11 ,
12 ,
13 ,
14 ,
15 ,
16
說明: map操作符,操作序列每個元素加 10 后作為新元素,構成新的序列。 那么,map 是如何給序列重新設置新值的呢?
二、map 源碼分析
① map 函數的定義
map 閉包就像加工的機器,設定好加工程序 $0+10 就會對 of 中的每一個元素加工產出新的零件,首先來看一下 map 源碼都處理了哪些業務:
extension ObservableType { public func map < R
> ( _ transform
: @escaping
( E
) throws - > R
) - > Observable < R
> { return self . asObservable ( ) . composeMap ( transform
) }
}
分析: transform 逃逸閉包,轉換邏輯交給業務層; 可以看到 map 函數是一個帶閉包參數的 ObservableType 的擴展函數,內部調用了 composeMap 并傳入外部的閉包以便內部調用。我們猜測,該處閉包會被保留在內部,在訂閱時被使用,那么根據斷點繼續探索,看看外界的閉包最終會保留在何處。 composeMap 所在類,如下所示: source 向 _map 函數傳入了 self 即為當前的序列對象;
public class Observable < Element > : ObservableType { public typealias E
= Element . . . . . . internal func composeMap
< R
> ( _ transform
: @escaping
( Element ) throws - > R
) - > Observable < R
> { return _map ( source
: self , transform
: transform
) }
}
可以看到,ObservableType 的子類 Observable 實現 composeMap 方法,返回 Observable 類型的對象,在內部調用了 _map 方法:
internal func _map
< Element , R
> ( source
: Observable < Element > , transform
: @escaping
( Element ) throws - > R
) - > Observable < R
> { return Map ( source
: source
, transform
: transform
)
}
繼續向 Map 內部傳入序列,及業務層閉包,一直強調序列和業務層閉包,主要由于結構復雜,以免被遺忘,后續和訂閱難以被聯系在一起。
② Map 類
查看 Map 類,如下: Map 繼承自 Producer,而 Producer 繼承自 Observable,提供了連接序列和觀察者的方法對象 sink,及發送序列元素到觀察者,再返回到訂閱; run 方法會在父類 Producer 類中方法調用,父類指針指向子類對象。
final private class Map < SourceType , ResultType > : Producer < ResultType > { typealias Transform = ( SourceType ) throws - > ResultType private let _source
: Observable < SourceType > private let _transform
: Transform init ( source
: Observable < SourceType > , transform
: @escaping
Transform ) { self . _source
= source
self . _transform
= transform#
if TRACE_RESOURCES _ = increment ( & _numberOfMapOperators
)
#endif
} override func composeMap
< R
> ( _ selector
: @escaping
( ResultType ) throws - > R
) - > Observable < R
> { let originalSelector
= self . _transform
return Map < SourceType , R
> ( source
: self . _source
, transform
: { ( s
: SourceType ) throws - > R
in let r
: ResultType = try originalSelector ( s
) return try selector ( r
) } ) } override func run
< O
: ObserverType > ( _ observer
: O
, cancel
: Cancelable ) - > ( sink
: Disposable , subscription
: Disposable ) where O
. E
== ResultType { let sink
= MapSink ( transform
: self . _transform
, observer
: observer
, cancel
: cancel
) let subscription
= self . _source
. subscribe ( sink
) return ( sink
: sink
, subscription
: subscription
) }
}
③ 訂閱
繼續斷點運行就到達訂閱,該處方法與 RxSwift 之深入解析核心邏輯 Observable 的底層原理中的訂閱方法為同一方法:
extension ObservableType { public func subscribe ( onNext
: ( ( E
) - > Void ) ? = nil , onError
: ( ( Swift . Error ) - > Void ) ? = nil , onCompleted
: ( ( ) - > Void ) ? = nil , onDisposed
: ( ( ) - > Void ) ? = nil ) - > Disposable { let disposable
: Disposable if let disposed
= onDisposed
{ disposable
= Disposables . create ( with
: disposed
) } else { disposable
= Disposables . create ( ) } #
if DEBUG let synchronizationTracker
= SynchronizationTracker ( ) #endif
let callStack
= Hooks . recordCallStackOnError
? Hooks . customCaptureSubscriptionCallstack ( ) : [ ] let observer
= AnonymousObserver < E
> { event
in #
if DEBUG synchronizationTracker
. register ( synchronizationErrorMessage
: . default ) defer { synchronizationTracker
. unregister ( ) } #endif
switch event
{ case . next ( let value
) : onNext
? ( value
) case . error ( let error
) : if let onError
= onError
{ onError ( error
) } else { Hooks . defaultErrorHandler ( callStack
, error
) } disposable
. dispose ( ) case . completed
: onCompleted
? ( ) disposable
. dispose ( ) } } return Disposables . create ( self . asObservable ( ) . subscribe ( observer
) , disposable
) }
}
self.asObservable().subscribe(observer) 此處調用的則是 Producer 中的 subscribe 方法,該處方法實現邏輯如下:
class Producer < Element > : Observable < Element > { override init ( ) { super . init ( ) } override func subscribe
< O
: ObserverType > ( _ observer
: O
) - > Disposable where O
. E
== Element { if ! CurrentThreadScheduler . isScheduleRequired
{ let disposer
= SinkDisposer ( ) let sinkAndSubscription
= self . run ( observer
, cancel
: disposer
) disposer
. setSinkAndSubscription ( sink
: sinkAndSubscription
. sink
, subscription
: sinkAndSubscription
. subscription
) return disposer
} else { return CurrentThreadScheduler . instance
. schedule ( ( ) ) { _ in let disposer
= SinkDisposer ( ) let sinkAndSubscription
= self . run ( observer
, cancel
: disposer
) disposer
. setSinkAndSubscription ( sink
: sinkAndSubscription
. sink
, subscription
: sinkAndSubscription
. subscription
) return disposer
} } }
}
④ run 方法
繼續查看內部 self.run 方法調用,它的繼承鏈與 RxSwift之深入解析核心邏輯Observable的底層原理 中的繼承鏈有所不同,它們的繼承鏈對比如下: RxSwift 核心邏輯中的 Producer 的子類是 AnonymousObservable,run方法在此類實現; Map 源碼中 Producer 的子類是 Map,run 方法在該處被實現。 run 方法的實現如下: MapSink 方法和 RxSwift 核心邏輯中的 AnnonymousObservableSink 類似; self._source 此處為訂閱時保存的閉包; .subscribe(sink)Producer 類的方法,傳入 sink 用來調用 sink 中的 on 方法。
override func run
< O
: ObserverType > ( _ observer
: O
, cancel
: Cancelable ) - > ( sink
: Disposable , subscription
: Disposable ) where O
. E
== ResultType { let sink
= MapSink ( transform
: self . _transform
, observer
: observer
, cancel
: cancel
) let subscription
= self . _source
. subscribe ( sink
) return ( sink
: sink
, subscription
: subscription
)
}
MapSink 中保留的是觀察者,Map 中保留的為可觀察序列 Observable,通過 Observable 來觸發觀察者的方法調用,subscribe 方法中調用的 sinkAndSubscription = self.run(observer, cancel: disposer):
final private class ObservableSequence < S
: Sequence > : Producer < S
. Iterator . Element > { fileprivate
let _elements
: Sfileprivate
let _scheduler
: ImmediateSchedulerType init ( elements
: S
, scheduler
: ImmediateSchedulerType ) { self . _elements
= elements
self . _scheduler
= scheduler
} override func run
< O
: ObserverType > ( _ observer
: O
, cancel
: Cancelable ) - > ( sink
: Disposable , subscription
: Disposable ) where O
. E
== E
{ let sink
= ObservableSequenceSink ( parent
: self , observer
: observer
, cancel
: cancel
) let subscription
= sink
. run ( ) return ( sink
: sink
, subscription
: subscription
) }
}
ObservableSequence 是繼承自 Producer 的方法,內部創建了 ObservableSequenceSink 對象并傳入了當前 Observable 對象和 observer 對象,最后調用 run() 方法,此處內部為變量序列并調用觀察者閉包方法,向外界發送消息。ObservableSequence 類繼承自 Sink,由此可知會調用 Sink 中的 forwardOn 方法,實現如下:
final private class ObservableSequenceSink < S
: Sequence , O
: ObserverType > : Sink < O
> where S
. Iterator . Element == O
. E
{ typealias Parent = ObservableSequence < S
> private let _parent
: Parent init ( parent
: Parent , observer
: O
, cancel
: Cancelable ) { self . _parent
= parent
super . init ( observer
: observer
, cancel
: cancel
) } func run ( ) - > Disposable { return self . _parent
. _scheduler
. scheduleRecursive ( self . _parent
. _elements
. makeIterator ( ) ) { iterator
, recurse
in var mutableIterator
= iterator
if let next
= mutableIterator
. next ( ) { self . forwardOn ( . next ( next
) ) recurse ( mutableIterator
) } else { self . forwardOn ( . completed
) self . dispose ( ) } } }
}
_elements 是由 of 創建時保留的序列集合,此處對序列元素進行遍歷,并調用 forwardOn 方法發送元素。forwardOn 的實現如下,_observer 是上面傳入的 MapSink 對象:
class Sink < O
: ObserverType > : Disposable { fileprivate
let _observer
: Ofileprivate
let _cancel
: Cancelable fileprivate
var _disposed
= AtomicInt ( 0 ) #
if DEBUG fileprivate
let _synchronizationTracker
= SynchronizationTracker ( ) #endif
init ( observer
: O
, cancel
: Cancelable ) {
#
if TRACE_RESOURCES _ = Resources . incrementTotal ( )
#endif
self . _observer
= observer
self . _cancel
= cancel
} final func forwardOn ( _ event
: Event < O
. E
> ) { #
if DEBUG self . _synchronizationTracker
. register ( synchronizationErrorMessage
: . default ) defer { self . _synchronizationTracker
. unregister ( ) } #endif
if isFlagSet ( & self . _disposed
, 1 ) { return } self . _observer
. on ( event
) }
}
可以看到,此處調用了 sink 的 on 方法,self._observer.on(event)。繼續追蹤 MapSink 類的 on 方法實現:
final private class MapSink < SourceType , O
: ObserverType > : Sink < O
> , ObserverType { typealias Transform = ( SourceType ) throws - > ResultType typealias ResultType = O
. E
typealias Element = SourceType private let _transform
: Transform init ( transform
: @escaping
Transform , observer
: O
, cancel
: Cancelable ) { self . _transform
= transform
super . init ( observer
: observer
, cancel
: cancel
) } func on ( _ event
: Event < SourceType > ) { switch event
{ case . next ( let element
) : do { let mappedElement
= try self . _transform ( element
) self . forwardOn ( . next ( mappedElement
) ) } catch let e
{ self . forwardOn ( . error ( e
) ) self . dispose ( ) } case . error ( let error
) : self . forwardOn ( . error ( error
) ) self . dispose ( ) case . completed
: self . forwardOn ( . completed
) self . dispose ( ) } }
}
至此,就容易理解了,這里的 on 和 RxSwift 核心邏輯中的不同: RxSwift 核心邏輯中此處由業務層 onNext 來觸發; 元素處理: let mappedElement = try self._transform(element) 調用外界閉包獲取新值; self.forwardOn(.next(mappedElement)) 通過 forwardOn 將新值發送至訂閱者。 最終會調用 ObserverBase 中的 on 方法,再調用觀察者 observer 的 onCore 方法,向觀察者發送元素。在由觀察者調用業務層訂閱時實現的閉包將序列元素發送到了業務層,到此 map 就完成了對源序列的修改。
三、總結
map 是對 sink 做了一層封裝,根據業務層的 map 設置在 ObservableSequenceSink 中處理了序列元素再發送至 forwardOn 直至 Observer 對象,由此完成了對元素的加工處理。 RxSwift 源碼比較繁瑣,復雜的邏輯帶來的是高效的開發,高效的運行,因此對 RxSwfit 源碼還需要進一步地理解和分析。
總結
以上是生活随笔 為你收集整理的RxSwift之深入解析map操作符的底层实现 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。