Swiftpack.co - Package - dehesa/Conbini

Conbini icon

Swift 5.2 macOS 10.15+ - iOS 13+ - tvOS 13+ - watchOS 6+ MIT License

Conbini provides convenience Publishers, operators, and Subscribers to squeeze the most out of Apple's Combine framework.

Usage

To use this library, you need to:

    Add Conbini to your project through SPM.

    // swift-tools-version:5.2
    import PackageDescription
    
    let package = Package(
        /* Your package name, supported platforms, and generated products go here */
        dependencies: [
            .package(url: "https://github.com/dehesa/Conbini.git", .upToNextMinor(from: "0.5.0"))
        ],
        targets: [
            .target(name: /* Your target name here */, dependencies: ["Conbini"])
        ]
    )
    

    Import Conbini in the file that needs it.

    import Conbini
    

Operators

Publisher Operators:

    retry(on:intervals:)

    Attempts to recreate a failed subscription with the upstream publisher a given amount of times waiting the specified number of seconds between failed attempts.

    let apiCallPublisher.retry(on: queue, intervals: [0.5, 2, 5])
    // Same functionality to retry(3), but waiting between attemps 0.5, 2, and 5 seconds after each failed attempt.
    

    This operator accept any scheduler conforming to Scheduler (e.g. DispatchQueue, RunLoop, etc). You can also optionally tweak the tolerance and scheduler operations.

    then(maxDemand:_:)

    Ignores all values and executes the provided publisher once a successful completion is received. If a failed completion is emitted, it is forwarded downstream.

    let publisher = setConfigurationOnServer.then {
        subscribeToWebsocket.publisher
    }
    

    This operator optionally lets you control backpressure with its maxDemand parameter. The parameter behaves like flatMap's maxPublishers, which specifies the maximum demand requested to the upstream at any given time.

    handleEnd(_:)

    Executes (only once) the provided closure when the publisher completes (whether successfully or with a failure) or when the publisher gets cancelled.

    It performs the same operation that the standard handleEvents(receiveSubscription:receiveOutput:receiveCompletion:receiveCancel:receiveRequest:) would perform if you add similar closures to receiveCompletion and receiveCancel.

    let publisher = upstream.handleEnd { (completion) in
        switch completion {
        case .none: // The publisher got cancelled.
        case .finished: // The publisher finished successfully.
        case .failure(let error): // The publisher generated an error.
        }
    }
    

    asyncMap(_:)

    It transforms elements received from upstream (similar to map), but the result is returned from a promise instead of using the return statement. Furthermore, promises can be called multipled times effectively transforming one upstream value into many outputs.

    let publisher = [1, 10, 100].publisher.asyncMap { (value, isCancelled, promise) in
        queue.asyncAfter(deadline: ....) {
            guard isCancelled else { return }
            promise(newValue1, .continue)
            promise(newValue2, .continue)
            promise(newValue3, .finished)
        }
    }
    

    This operator also provides a try variant accepting a result (instead of a value).

Subscriber Operators:

    result(onEmpty:_:)

    It subscribes to the receiving publisher and executes the provided closure when a value is received. In case of failure, the handler is executed with such failure.

    let cancellable = serverRequest.result { (result) in
        switch result {
        case .success(let value): ...
        case .failure(let error): ...
        }
    }
    

    The operator lets you optionally generate an error (which will be consumed by your handler) for cases where upstream completes without a value.

    sink(fixedDemand:)

    It subscribes upstream and request exactly fixedDemand values (after which the subscriber completes). The subscriber may receive zero to fixedDemand of values before completing, but never more than that.

    let cancellable = upstream.sink(fixedDemand: 5, receiveCompletion: { (completion) in ... }) { (value) in ... }
    

    sink(maxDemand:)

    It subscribes upstream requesting maxDemand values and always keeping the same backpressure.

    let cancellable = upstream.sink(maxDemand: 3) { (value) in ... }
    

Publishers

    Deferred...

    These publishers accept a closure that is executed once a greater-than-zero demand is requested. There are several flavors:

      DeferredValue emits a single value and then completes.

      The value is not provided/cached, but instead a closure will generate it. The closure is executed once a positive subscription is received.

      let publisher = DeferredValue<Int,CustomError> {
          return intenseProcessing()
      }
      

      A Try variant is also offered, enabling you to throw from within the closure. It loses the concrete error type (i.e. it gets converted to Swift.Error).

      DeferredResult offers the same functionality as DeferredValue, but the closure generates a Result instead

      let publisher = DeferredResult {
          guard someExpression else { return .failure(CustomError()) }
          return .success(someValue)
      }
      

      DeferredComplete offers the same functionality as `DeferredValue`, but the closure only generates a completion event.

      let publisher = DeferredComplete {
          return errorOrNil
      }
      

      A Try variant is also offered, enabling you to throw from within the closure; but it loses the concrete error type (i.e. gets converted to Swift.Error).

      DeferredPassthrough

      It is similar to wrapping a Passthrough subject on a Deferred closure, with the diferrence that the Passthrough given on the closure is already wired on the publisher chain and can start sending values right away. Also, the memory management is taken care of and every new subscriber receives a new subject (closure re-execution).

      let publisher = DeferredPassthrough { (subject) in
          subject.send(something)
          subject.send(somethingElse)
          subject.send(completion: .finished)
      }
      

    There are several reason for these publishers to exist instead of using other Combine-provided closure such as Just, Future, or Deferred:

    • Future publishers execute their provided closure right away (upon initialization) and then cache the returned value. That value is then forwarded for any future subscription.
      Deferred... closures await for subscriptions and a greater-than-zero demand before executing. This also means, the closure will re-execute for any new subscriber.
    • Deferred is the most similar in functionality, but it only accepts a publisher.

    DelayedRetry

    It provides the functionality of the retry(on:intervals:) operator.

    Then

    It provides the functionality of the then operator.

    HandleEnd

    It provides the functionality of the handleEnd(_:) operator.

Extra Functionality:

    Publishers.PrefetchStrategy

    It has been extended with a .fatalError(message:file:line:) option to stop execution if the buffer is filled. This is useful during development and debugging and for cases when you are sure the buffer will never be filled.

    publisher.buffer(size: 10, prefetch: .keepFull, whenFull: .fatalError())
    

Subscribers

    FixedSink

    It requests a fixed amount of values upon subscription and once if has received them all it completes/cancel the pipeline. The values are requested through backpressure, so no more than the allowed amount of values are generated upstream.

    let subscriber = FixedSink(demand: 5) { (value) in ... }
    upstream.subscribe(subscriber)
    

    GraduatedSink

    It requests a fixed amount of values upon subscription and always keep the same demand by asking one more value upon input reception. The standard Subscribers.Sink requests an .unlimited amount of values upon subscription. This might not be what we want since some times a control of in-flight values might be desirable (e.g. allowing only n in-flight* API calls at the same time).

    let subscriber = GraduatedSink(maxDemand: 3) { (value) in ... }
    upstream.subscribe(subscriber)
    

The names for these subscribers are not very good/accurate. Any suggestion is appreciated.

Testing

Conbini provides convenience subscribers to ease code testing. These subscribers make the test wait till a specific expectation is fulfilled (or making the test fail in a negative case). Furthermore, if a timeout ellapses or a expectation is not fulfilled, the affected test line will be marked in red correctly in Xcode.

    expectsCompletion(timeout:on:)

    It subscribes to a publisher making the running test wait for a successful completion while ignoring all emitted values.

    publisherChain.expectsCompletion(timeout: 0.8, on: test)
    

    expectsFailure(timeout:on:)

    It subscribes to a publisher making the running test wait for a failed completion while ignoring all emitted values.

    publisherChain.expectsFailure(timeout: 0.8, on: test)
    

    expectsOne(timeout:on:)

    It subscribes to a publisher making the running test wait for a single value and a successful completion. If more than one values are emitted or the publisher fails, the subscription gets cancelled and the test fails.

    let emittedValue = publisherChain.expectsOne(timeout: 0.8, on: test)
    

    expectsAll(timeout:on:)

    It subscribes to a publisher making the running test wait for zero or more values and a successful completion.

    let emittedValues = publisherChain.expectsAll(timeout: 0.8, on: test)
    

    expectsAtLeast(timeout:on:)

    It subscribes to a publisher making the running test wait for at least the provided amount of values. Once the provided amount of values is received, the publisher gets cancelled and the values are returned.

    let emittedValues = publisherChain.expectsAtLeast(values: 5, timeout: 0.8, on: test)
    

    This operator/subscriber accepts an optional closure to check every value received.

    let emittedValues = publisherChain.expectsAtLeast(values: 5, timeout: 0.8, on: test) { (value) in
        XCTAssert...
    }
    

Quirks

The testing conveniences depend on XCTest, which is not available on regular execution. That is why Conbini is offered in two flavors:

  • import Conbini includes all code excepts the testing conveniences.
  • import ConbiniForTesting includes the testing functionality only.

The rule of thumb is to use import Conbini in your regular code (e.g. within your framework or app) and write import ConbiniForTesting within your test target files.

References

This framework name references both the Combine framework and the helpful Japanese convenience stores 😄

Github

link
Stars: 68

Dependencies

Used By

Total: 0

Releases

Better Backpressure & Custom Subscribers - 2020-02-14 16:14:50

This release includes two new useful custom subscribers and a bunch of interesting operators:

  • retry on delayed intervals.
  • handleEnd to clean up resources in any completion case (i.e. successful/failure completion or cancellation).
  • sink using the new custom subscribers.

Also, there is better support for backpressure all around the framework. Many operators/publishers now accept optional "demand" related parameters and work great with standard backpressure mechanisms (such as buffer).

Enhanced FlatMaps & Standard Naming For Deferred Publishers - 2019-11-14 21:56:31

The sequential flatMap operator/publisher has been re-engineered to accept a transform closure, making it much more useful.

Conbini For Testing - 2019-10-28 11:48:15

Conbini has been separated into two products:

  • import Conbini for regular usage in your framework or application.
  • import ConbiniForTesting including the testing conveniences (which depend on XCTest).

First Release - 2019-10-24 00:56:34

Initial release gathering the first batch of publishers/subscribers.