Swiftpack.co - Package - haifengkao/ResumableCombine

ResumableCombine

CI Status Version License Platform

Swift Combine lacks of support for proper backpressure handling. Many of its operators(e.g. sink and assign) just send request(.unlimited) for the first demand request. It renders the Combine's pull mechanism utterly uselesss. This project aims to fix this problem.

rm.sink

Sink that will request one item at a time.

let subscription = [1,2,3,4,5].publisher
    .rm.sink(
        receiveCompletion: { completion in
            print("Completion: \(completion)")
        },
        receiveValue: { value -> Bool in
             print("Receive value: \(value)")

             // return true indicates that we want to request for another demand
             return true
        }
    )

// Receive value: 1
// Receive value: 2
// Receive value: 3
// Receive value: 4
// Receive value: 5

Sink that will request one item then stop.

We can use subscription.resume() to request for additional items.

let subscription = (1 ... Int.max).publisher
    .rm.sink(
        receiveCompletion: { completion in
            print("Completion: \(completion)")
        },
        receiveValue: { value -> Bool in
             print("Receive value: \(value)")

             // return false will stop the demands
             return false
        }
    )
// Receive value: 1

// Receive value: 2
subscription.resume()

// Receive value: 3
subscription.resume()

// Receive value: 4
subscription.resume()

rm.assign

Assign that will request one item at a time.

class SomeObject {
    var value: Int = -1 {
        didSet {
            print(value)
        }
    }
}

let object = SomeObject()
let subscription = [1, 2, 3, 4, 5].publisher.rm.assign(to: \.value, on: object)

// object.value == 1
// object.value == 2
// object.value == 3
// object.value == 4
// object.value == 5

Assign that will request one item then stop.

We can use subscription.resume() to request for additional items.

let subscription = (1 ... Int.max).publisher.rm.assign(to: \.value, on: object, mode: .singleDemandThenStop)

// object.value == 1

// object.value == 2
subscription.resume()

// object.value == 3
subscription.resume()

rm.flatMap

Combine's FlatMap works quite unexpectedly. Despite the resumable sink has stooped the demand. FlatMap continues sending all its values.

let subscription = (1 ... 100).publisher
.flatMap(maxPublishers: .max(1)) { value -> AnyPublisher<Int, Never> in
    print("Receive flatMap:", value)
    return AnyPublisher([10].publisher) // sends single value then complete
}.rm.sink { (completion) in
    print(completion)
} receiveValue: { (value) -> Bool in
    print("Receive value: ", value)

    return false // stop requesting new demands
}
// Receive flatMap: 1
// Receive value:  10
// Receive flatMap: 2
// Receive flatMap: 3
// Receive flatMap: 4
// Receive flatMap: 5
// Receive flatMap: 6
// Receive flatMap: 7
// Receive flatMap: 8
// Receive flatMap: 9
// Receive flatMap: 10
// ...
// Receive flatMap: 100

If we let the publisher inside flatMap send 2 values, FlatMap will send 2 values, despite the resumable sink only requests single demand.

let subscription = (1 ... 100).publisher
.flatMap(maxPublishers: .max(1)) { value -> AnyPublisher<Int, Never> in
    print("Receive flatMap:", value)
    return AnyPublisher([10, 20].publisher) // sends 2 values then complete
}.rm.sink { (completion) in
    print(completion)
} receiveValue: { (value) -> Bool in
    print("Receive value: ", value)

    return false // stop requesting new demands
}

// Receive flatMap: 1
// Receive value:  10
// Receive flatMap: 2

ResumbableCombine provides rm.flatMap to fix these problems

let subscription = (1 ... 100).publisher
.rm.flatMap(maxPublishers: .max(1)) { value -> AnyPublisher<Int, Never> in
    print("Receive flatMap:", value)
    return AnyPublisher([10].publisher) // sends single value then complete
}.rm.sink { (completion) in
    print(completion)
} receiveValue: { (value) -> Bool in
    print("Receive value: ", value)

    return false // stop requesting new demands
}

// Receive flatMap: 1
// Receive value:  10
let subscription = (1 ... 100).publisher
.rm.flatMap(maxPublishers: .max(1)) { value -> AnyPublisher<Int, Never> in
    print("Receive flatMap:", value)
    return AnyPublisher([10, 20].publisher) // sends 2 values then complete
}.rm.sink { (completion) in
    print(completion)
} receiveValue: { (value) -> Bool in
    print("Receive value: ", value)

    return false // stop requesting new demands
}

// Receive flatMap: 1
// Receive value:  10

rm.assert

ResumableCombine provides an assert function to check if the downstream sends large demands. It's useful because many Swift Combine operators request unlimited demand, it will invalid the whole backpressure mechanism. When it happens, we can use asser(maxDemand:) to detect it.

//  rm.sink will pass the maxDemand test
let subscription = (1 ... 100).publisher
.rm.assert(maxDemand: .max(1), "rm.sink handle backpressure gracefully, will not assert")
.rm.sink { (completion) in
    print(completion)
} receiveValue: { (value) -> Bool in
    print("Receive value: ", value)
    return true
}
//  Swift Combine's sink will not pass, because it sends unlimited demands
let subscription = (1 ... 100).publisher
.rm.assert(maxDemand: .max(1), "sink handle backpressure awkwardly, it shall not pass")
.sink { (completion) in
    print(completion)
} receiveValue: { (value) -> Bool in
    print("Receive value: ", value)
    return true
}

Another assert function assert(accumulatedDemand:) will check if any downstream request large volume of total demands.

//  rm.flatMap will pass the accumulatedDemand test
let subscription = (1 ... 100).publisher
.rm.flatMap(maxPublishers: .max(1)) { value -> AnyPublisher<Int, Never> in
    print("Receive flatMap:", value)
    return AnyPublisher([10].publisher)
}.assert(accumulatedDemand: .max(1), "rm.flatMap handle backpressure gracefully, will not assert")
.rm.sink { (completion) in
    print(completion)
} receiveValue: { (value) -> Bool in
    print("Receive value: ", value)
    return false // stop after signle demand
}
//  Swift Combine's flatMap will not pass the accumulatedDemand test
let subscription = (1 ... 100).publisher
.rm.flatMap(maxPublishers: .max(1)) { value -> AnyPublisher<Int, Never> in
    print("Receive flatMap:", value)
    return AnyPublisher([10].publisher)
}.assert(accumulatedDemand: .max(1), "flatMap handle backpressure awkwardly, it shall not pass")
.rm.sink { (completion) in
    print(completion)
} receiveValue: { (value) -> Bool in
    print("Receive value: ", value)
    return false // stop after signle demand
}

assert(minInterval:) will assert if downstream sends new demands in a very fast speed.

let subscription = (1 ... 100).publisher
    .rm.assert(minInterval: .milliseconds(10), "will not assert")
    .rm.flatMap(maxPublishers: .max(1)) { _ in
        return [1].publisher
    }
    .rm.sink { (completion) in
    print(completion)
} receiveValue: { (value) in
    print("Receive value: ", value)
    return false
}
// Swift Combine's flatMap requests single demand at a time
// but it will send all the requests in a while loop. 
// It will trigger the assert
let subscription = (1 ... 100).publisher
    .rm.assert(minInterval: .milliseconds(10), "will assert")
    .flatMap(maxPublishers: .max(1)) { _ in
        return [1].publisher
    }
    .rm.sink { (completion) in
    print(completion)
} receiveValue: { (value) in
    print("Receive value: ", value)
    return false
}

Requirements

ios 13

Installation

ResumableCombine is available through CocoaPods. To install it, simply add the following line to your Podfile:

pod 'ResumableCombine'

Author

Hai Feng Kao, haifeng@cocoaspice.in

License

ResumableCombine is available under the MIT license. See the LICENSE file for more info.

Github

link
Stars: 5

Dependencies

Used By

Total: 0