I am trying to develop an Observable queue that executes given Observable chains one at a time. The code will be used to encode videos on an iPhone and only one queue will be needed per launch, therefore it is declared as static. It seems to work but I am not sure I covered all the edge cases.
Below is the implementation as well as the test. I use Swift 2 with the latest supported RxSwift pod.
class SomeObject {}
// Implementation
extension SomeObject {
// Will be created once per runtime
static let queueSubject: PublishSubject<Observable<Int>> = PublishSubject()
// Will be called once per runtime
static func consumeQueue() {
let queueName = "someQueue"
let queue = dispatch_queue_create(queueName, DISPATCH_QUEUE_SERIAL)
let scheduler = SerialDispatchQueueScheduler(queue: queue, internalSerialQueueName: queueName)
let lock = NSLock()
lock.unlock()
_ = queueSubject
.observeOn(scheduler)
.doOnNext({ _ in
print("queue locking")
lock.lock()
print("queue locked")
})
.flatMap({
return $0.observeOn(scheduler).subscribeOn(scheduler)
})
.doOnNext({
print("queue unlocking")
lock.unlock()
print("queue unlocked")
})
.subscribeOn(scheduler)
.subscribe(
onNext: { _ in
print("queue onNext")
},
onError: { _ in
print("queue error")
},
onCompleted: {
print("queue completed")
},
onDisposed: {}
)
}
}
// Test
extension SomeObject {
static func testQueue() {
SomeObject.consumeQueue()
let test1 = Observable<Int>.timer(1.0, scheduler: MainScheduler.instance)
.doOnCompleted({
print("test 1")
})
let test2 = Observable<Int>.timer(1.0, scheduler: MainScheduler.instance)
.doOnCompleted({
print("test 2")
})
SomeObject.queueSubject.onNext(test1)
SomeObject.queueSubject.onNext(test2)
}
}
SomeObject? (Only two extensions are here.) Where is the definition ofencodeLock? What islocksupposed to be for? (It isn't being used.) \$\endgroup\$queue locking\nqueue locked\nqueue lockingthen a crash due to a deadlock. \$\endgroup\$