1
\$\begingroup\$

I am trying to develop an Observable queue that executes given Observable chains one at a time. The code will be used to encode videos on an iPhone and only one queue will be needed per launch, therefore it is declared as static. It seems to work but I am not sure I covered all the edge cases.

Below is the implementation as well as the test. I use Swift 2 with the latest supported RxSwift pod.

class SomeObject {}

// Implementation
extension SomeObject {
    // Will be created once per runtime
    static let queueSubject: PublishSubject<Observable<Int>> = PublishSubject()

    // Will be called once per runtime
    static func consumeQueue() {

        let queueName = "someQueue"
        let queue = dispatch_queue_create(queueName, DISPATCH_QUEUE_SERIAL)
        let scheduler = SerialDispatchQueueScheduler(queue: queue, internalSerialQueueName: queueName)
        let lock = NSLock()

        lock.unlock()

        _ = queueSubject
            .observeOn(scheduler)
            .doOnNext({ _ in
                print("queue locking")
                lock.lock()
                print("queue locked")
            })
            .flatMap({
                return $0.observeOn(scheduler).subscribeOn(scheduler)
            })
            .doOnNext({
                print("queue unlocking")
                lock.unlock()
                print("queue unlocked")
            })
            .subscribeOn(scheduler)
            .subscribe(
                onNext: { _ in
                    print("queue onNext")
                },
                onError: { _ in
                    print("queue error")
                },
                onCompleted: {
                    print("queue completed")
                },
                onDisposed: {}
        )
    }
}

// Test
extension SomeObject {
    static func testQueue() {
        SomeObject.consumeQueue()

        let test1 = Observable<Int>.timer(1.0, scheduler: MainScheduler.instance)
                .doOnCompleted({
                    print("test 1")
                })

        let test2 = Observable<Int>.timer(1.0, scheduler: MainScheduler.instance)
                .doOnCompleted({
                    print("test 2")
                })

        SomeObject.queueSubject.onNext(test1)
        SomeObject.queueSubject.onNext(test2)
    }
}
\$\endgroup\$
6
  • \$\begingroup\$ Why don't you upgrade to Swift 3, do it before it's too late! \$\endgroup\$ Commented Jun 1, 2017 at 12:15
  • \$\begingroup\$ Eventually it will happen but current code base is quite large and have some heavy dependencies written in Swift 2. It is impossible to migrate to Swift 3 in a week. A month long migration is scheduled to next autumn. \$\endgroup\$ Commented Jun 1, 2017 at 16:12
  • \$\begingroup\$ This code doesn't compile, even in Swift 2. Where is the definition of SomeObject? (Only two extensions are here.) Where is the definition of encodeLock? What is lock supposed to be for? (It isn't being used.) \$\endgroup\$ Commented Sep 18, 2017 at 1:50
  • \$\begingroup\$ @DanielT. I fixed the typos occurred during reducing the irrelevant code. It should compile now. \$\endgroup\$ Commented Sep 19, 2017 at 11:09
  • \$\begingroup\$ What is the expected output? I'm getting queue locking\nqueue locked\nqueue locking then a crash due to a deadlock. \$\endgroup\$ Commented Sep 19, 2017 at 11:41

0

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.