Skip to content

Commit c594e28

Browse files
committed
Fixes issues with locking strategy for subjects. ReactiveX#936
1 parent 2d63fac commit c594e28

File tree

10 files changed

+1224
-84
lines changed

10 files changed

+1224
-84
lines changed

Rx.xcodeproj/project.pbxproj

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -676,6 +676,9 @@
676676
C8941BE71BD56B0700A0E874 /* BlockingObservable+Operators.swift in Sources */ = {isa = PBXBuildFile; fileRef = C8941BE31BD56B0700A0E874 /* BlockingObservable+Operators.swift */; };
677677
C89461751BC6C1210055219D /* ObservableConvertibleType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C849BE2A1BAB5D070019AD27 /* ObservableConvertibleType.swift */; };
678678
C89461761BC6C1220055219D /* ObservableConvertibleType.swift in Sources */ = {isa = PBXBuildFile; fileRef = C849BE2A1BAB5D070019AD27 /* ObservableConvertibleType.swift */; };
679+
C89579381DE1FD8C00DC9C9C /* Anomalies.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89579351DE1FD8000DC9C9C /* Anomalies.swift */; };
680+
C895793A1DE1FD8D00DC9C9C /* Anomalies.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89579351DE1FD8000DC9C9C /* Anomalies.swift */; };
681+
C895793B1DE1FD8E00DC9C9C /* Anomalies.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89579351DE1FD8000DC9C9C /* Anomalies.swift */; };
679682
C89CDB361BCB0DD7002063D9 /* ShareReplay1.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89CDB351BCB0DD7002063D9 /* ShareReplay1.swift */; };
680683
C89CDB371BCB0DD7002063D9 /* ShareReplay1.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89CDB351BCB0DD7002063D9 /* ShareReplay1.swift */; };
681684
C89CDB381BCB0DD7002063D9 /* ShareReplay1.swift in Sources */ = {isa = PBXBuildFile; fileRef = C89CDB351BCB0DD7002063D9 /* ShareReplay1.swift */; };
@@ -1689,6 +1692,7 @@
16891692
C88FA53F1C25C4CC00CCFEA4 /* RxTests.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxTests.framework; sourceTree = BUILT_PRODUCTS_DIR; };
16901693
C8941BDE1BD5695C00A0E874 /* BlockingObservable.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = BlockingObservable.swift; sourceTree = "<group>"; };
16911694
C8941BE31BD56B0700A0E874 /* BlockingObservable+Operators.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = "BlockingObservable+Operators.swift"; sourceTree = "<group>"; };
1695+
C89579351DE1FD8000DC9C9C /* Anomalies.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; name = Anomalies.swift; path = Tests/RxSwiftTests/Anomalies.swift; sourceTree = SOURCE_ROOT; };
16921696
C89CDB351BCB0DD7002063D9 /* ShareReplay1.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; lineEnding = 0; path = ShareReplay1.swift; sourceTree = "<group>"; xcLanguageSpecificationIdentifier = xcode.lang.swift; };
16931697
C8A56AD71AD7424700B4673B /* RxSwift.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = RxSwift.framework; sourceTree = BUILT_PRODUCTS_DIR; };
16941698
C8B144FA1BD2D44500267DCE /* ConcurrentMainScheduler.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ConcurrentMainScheduler.swift; sourceTree = "<group>"; };
@@ -2357,6 +2361,7 @@
23572361
C83509011C38706D0027C24C /* Tests */ = {
23582362
isa = PBXGroup;
23592363
children = (
2364+
C89579351DE1FD8000DC9C9C /* Anomalies.swift */,
23602365
C83509021C38706D0027C24C /* AnonymousObservable+Test.swift */,
23612366
C83509031C38706D0027C24C /* AssumptionsTest.swift */,
23622367
C83509041C38706D0027C24C /* BagTest.swift */,
@@ -3510,6 +3515,7 @@
35103515
files = (
35113516
033C2EF61D081C460050C015 /* UIScrollView+RxTests.swift in Sources */,
35123517
C835092A1C38706E0027C24C /* CLLocationManager+RxTests.swift in Sources */,
3518+
C89579381DE1FD8C00DC9C9C /* Anomalies.swift in Sources */,
35133519
C83509661C38706E0027C24C /* RxTest.swift in Sources */,
35143520
C83509611C38706E0027C24C /* ObserverTests.swift in Sources */,
35153521
C83509471C38706E0027C24C /* PrimitiveMockObserver.swift in Sources */,
@@ -3600,6 +3606,7 @@
36003606
C83509F31C38755D0027C24C /* AssumptionsTest.swift in Sources */,
36013607
C83509ED1C3875580027C24C /* MySubject.swift in Sources */,
36023608
C83509C61C3875220027C24C /* NSObject+RxTests.swift in Sources */,
3609+
C895793B1DE1FD8E00DC9C9C /* Anomalies.swift in Sources */,
36033610
C83509BB1C38750D0027C24C /* Control+RxTests.swift in Sources */,
36043611
C83509EE1C3875580027C24C /* Observable.Extensions.swift in Sources */,
36053612
C83509BD1C38750D0027C24C /* ControlPropertyTests.swift in Sources */,
@@ -3721,6 +3728,7 @@
37213728
C83509CC1C3875230027C24C /* NSLayoutConstraint+RxTests.swift in Sources */,
37223729
C8350A0C1C38755E0027C24C /* Observable+CreationTest.swift in Sources */,
37233730
C8350A1C1C38756B0027C24C /* Observable+StandardSequenceOperatorsTest.swift in Sources */,
3731+
C895793A1DE1FD8D00DC9C9C /* Anomalies.swift in Sources */,
37243732
C83509CA1C3875230027C24C /* Driver+Test.swift in Sources */,
37253733
C83509E41C3875580027C24C /* MockDisposable.swift in Sources */,
37263734
C83509D51C38753E0027C24C /* RxObjCRuntimeState.swift in Sources */,

RxSwift/Observables/Implementations/ShareReplay1.swift

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,13 @@ final class ShareReplay1<Element>
7777
}
7878

7979
func on(event: Event<E>) {
80-
_lock.lock(); defer { _lock.unlock() }
81-
_synchronized_on(event)
80+
_synchronized_on(event).on(event)
8281
}
8382

84-
func _synchronized_on(event: Event<E>) {
83+
func _synchronized_on(_ event: Event<E>) -> Bag<AnyObserver<Element>> {
84+
_lock.lock(); defer { _lock.unlock() }
8585
if _stopped {
86-
return
86+
return Bag()
8787
}
8888

8989
switch event {
@@ -96,6 +96,6 @@ final class ShareReplay1<Element>
9696
_connection = nil
9797
}
9898

99-
_observers.on(event)
99+
return _observers
100100
}
101-
}
101+
}

RxSwift/Observables/Implementations/ShareReplay1WhileConnected.swift

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,22 +71,22 @@ final class ShareReplay1WhileConnected<Element>
7171
}
7272

7373
func on(event: Event<E>) {
74-
_lock.lock(); defer { _lock.unlock() }
75-
_synchronized_on(event)
74+
_synchronized_on(event).on(event)
7675
}
7776

78-
func _synchronized_on(event: Event<E>) {
77+
func _synchronized_on(_ event: Event<E>) -> Bag<AnyObserver<Element>> {
78+
_lock.lock(); defer { _lock.unlock() }
7979
switch event {
8080
case .Next(let element):
8181
_element = element
82-
_observers.on(event)
82+
return _observers
8383
case .Error, .Completed:
8484
_element = nil
8585
_connection?.dispose()
8686
_connection = nil
8787
let observers = _observers
8888
_observers = Bag()
89-
observers.on(event)
89+
return observers
9090
}
9191
}
92-
}
92+
}

RxSwift/Subjects/BehaviorSubject.swift

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,13 @@ public final class BehaviorSubject<Element>
8181
- parameter event: Event to send to the observers.
8282
*/
8383
public func on(event: Event<E>) {
84-
_lock.lock(); defer { _lock.unlock() }
85-
_synchronized_on(event)
84+
_synchronized_on(event).on(event)
8685
}
8786

88-
func _synchronized_on(event: Event<E>) {
87+
func _synchronized_on(event: Event<E>) -> Bag<AnyObserver<Element>> {
88+
_lock.lock(); defer { _lock.unlock() }
8989
if _stoppedEvent != nil || _disposed {
90-
return
90+
return Bag()
9191
}
9292

9393
switch event {
@@ -97,7 +97,7 @@ public final class BehaviorSubject<Element>
9797
_stoppedEvent = event
9898
}
9999

100-
_observers.on(event)
100+
return _observers
101101
}
102102

103103
/**
@@ -158,4 +158,4 @@ public final class BehaviorSubject<Element>
158158
_stoppedEvent = nil
159159
}
160160
}
161-
}
161+
}

RxSwift/Subjects/PublishSubject.swift

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,25 +59,29 @@ final public class PublishSubject<Element>
5959
- parameter event: Event to send to the observers.
6060
*/
6161
public func on(event: Event<Element>) {
62-
_lock.lock(); defer { _lock.unlock() }
63-
_synchronized_on(event)
62+
_synchronized_on(event).on(event)
6463
}
6564

66-
func _synchronized_on(event: Event<E>) {
65+
func _synchronized_on(event: Event<E>) -> Bag<AnyObserver<Element>> {
66+
_lock.lock(); defer { _lock.unlock() }
67+
6768
switch event {
6869
case .Next(_):
6970
if _disposed || _stopped {
70-
return
71+
return Bag()
7172
}
7273

73-
_observers.on(event)
74+
return _observers
7475
case .Completed, .Error:
7576
if _stoppedEvent == nil {
7677
_stoppedEvent = event
7778
_stopped = true
78-
_observers.on(event)
79+
let observers = _observers
7980
_observers.removeAll()
81+
return observers
8082
}
83+
84+
return Bag()
8185
}
8286
}
8387

@@ -136,4 +140,4 @@ final public class PublishSubject<Element>
136140
_observers.removeAll()
137141
_stoppedEvent = nil
138142
}
139-
}
143+
}

RxSwift/Subjects/ReplaySubject.swift

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,29 +105,30 @@ class ReplayBufferBase<Element>
105105
}
106106

107107
override func on(event: Event<Element>) {
108-
_lock.lock(); defer { _lock.unlock() }
109-
_synchronized_on(event)
108+
_synchronized_on(event).on(event)
110109
}
111110

112-
func _synchronized_on(event: Event<E>) {
111+
func _synchronized_on(event: Event<E>) -> Bag<AnyObserver<Element>> {
112+
_lock.lock(); defer { _lock.unlock() }
113113
if _disposed {
114-
return
114+
return Bag()
115115
}
116116

117117
if _stoppedEvent != nil {
118-
return
118+
return Bag()
119119
}
120120

121121
switch event {
122122
case .Next(let value):
123123
addValueToBuffer(value)
124124
trim()
125-
_observers.on(event)
125+
return _observers
126126
case .Error, .Completed:
127127
_stoppedEvent = event
128128
trim()
129-
_observers.on(event)
129+
let observers = _observers
130130
_observers.removeAll()
131+
return observers
131132
}
132133
}
133134

@@ -260,4 +261,4 @@ final class ReplayAll<Element> : ReplayManyBase<Element> {
260261
override func trim() {
261262

262263
}
263-
}
264+
}

Sources/AllTestz/Anomalies.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
../../Tests/RxSwiftTests/Anomalies.swift

0 commit comments

Comments
 (0)