TCPTransports/StreamTaskTransport.swift
/* |
Copyright (C) 2018 Apple Inc. All Rights Reserved. |
See LICENSE.txt for this sample’s licensing information |
Abstract: |
Implements `TCPTransport` on a `URLSessionStreamTask`. |
*/ |
import Foundation |
/// An implementation of the `TCPTransport` protocol that uses a `URLSessionStreamTask` |
/// to run the TCP connection. This type supports both connect-by-name and TLS. |
/// Out of the box this type creates a stream task in the global shared session, |
/// but it would be trivial to modify it to use some custom session. |
/// |
/// Note that streams tasks do not give any indication of connection completion, |
/// so you'll never see this type in the `.starting` state. |
/// |
/// For more information on how to to use this type, see the `TCPTransport` |
/// protocol. |
class StreamTaskTransport : TCPTransport { |
/// Creates an object that will communicate over a stream task created by |
/// calling the stream task maker closure. |
/// |
/// The `streamTaskMaker` closure means that the client is in change of |
/// stream task creation, which is useful because it allows the client to |
/// manage the `URLSession` in which the streams are created. |
/// |
/// - Parameters: |
/// - name: A name for the connection; not used internally, but handy while |
/// debuging. |
/// - use: Whether to enable TLS or not. |
/// - queue: The queue on which to operate; delegate callbacks are called |
/// on this queue. |
/// - streamTaskMaker: A closure called to create the stream task for this |
/// connection. The resulting stream task is ‘owned‘ by this object. |
/// You can’t go modifying it after the fact. The stream task must not |
/// be resumed; this object takes care of resuming it. |
required init(name: String, useTLS: Bool, queue: DispatchQueue, streamTaskMaker: @escaping () -> URLSessionStreamTask) { |
dispatchPrecondition(condition: .onQueue(queue)) |
self.name = name |
self.useTLS = useTLS |
self.queue = queue |
self.streamTaskMaker = streamTaskMaker |
} |
/// Creates an object that will communicate with the specified host and |
/// port, optionally using TLS. |
/// |
/// - Parameters: |
/// - hostName: The host name to connect to (can be an IP address, both IPv4 and IPv6). |
/// - port: The port on which to connect. |
/// - useTLS: Whether to enable TLS or not. |
/// - queue: The queue on which to operate; delegate callbacks are called |
/// on this queue. |
convenience init(hostName: String, port: Int, useTLS: Bool, queue: DispatchQueue) { |
dispatchPrecondition(condition: .onQueue(queue)) |
self.init(name: "\(hostName):\(port)", useTLS: useTLS, queue: queue, streamTaskMaker: { () -> URLSessionStreamTask in |
return URLSession.shared.streamTask(withHostName: hostName, port: port) |
}) |
} |
deinit { |
// If this assert fires you’ve released the last reference to this |
// transport without stopping it. |
switch self._state { |
case .initialised: break |
case .started: fatalError() |
case .stopped: break |
} |
} |
/// The value passed to the initialiser. |
let name: String |
/// The value passed to the initialiser. |
let useTLS: Bool |
/// The value passed to the initialiser. |
let queue: DispatchQueue |
/// The value passed to the initialiser. |
let streamTaskMaker: () -> URLSessionStreamTask |
/// See the comments associated with the `TCPTransport` protocol. |
weak var delegate: TCPTransportDelegate? = nil |
/// See the comments associated with the `TCPTransport` protocol. |
var transportState: TCPTransportState { |
return self._state.transportState |
} |
/// See the comments associated with the `TCPTransport` protocol. |
func start() { |
dispatchPrecondition(condition: .onQueue(queue)) |
self.process(event: .start) |
} |
/// See the comments associated with the `TCPTransport` protocol. |
func send(message: String) { |
dispatchPrecondition(condition: .onQueue(queue)) |
self.process(event: .send(message: message)) |
} |
/// See the comments associated with the `TCPTransport` protocol. |
func stop() { |
dispatchPrecondition(condition: .onQueue(queue)) |
self.process(event: .stop(error: nil, notify: false)) |
} |
/// Errors returned by this module. |
/// |
/// - streamError: An error from the stream task; the actual error is saved as an associated value. |
/// - framingError: An error from the line unframer; the actual error is saved as an associated value. |
enum Error : Swift.Error { |
case streamTaskError(Swift.Error) |
case framingError(LineUnframer.Error) |
} |
/// The runtime state of this object. |
/// |
/// - important: This has a leading underscore so that the state event |
/// functions don't actually access it via `state` or `self.state`. |
private var _state: State = .initialised |
} |
extension StreamTaskTransport { |
/// Holds the runtime state of this object. Each state maps to a state in |
/// `TCPTransportState` but many states have an associated value that holds |
/// all the values valid during that states. |
/// |
/// - note: Stream task have no notion of a ‘did connect’ event, so there’s |
/// no ‘starting’ state here. |
private enum State { |
case initialised |
case started(Started) |
case stopped(Error?) |
/// Returns the transport state associated with our state. |
var transportState: TCPTransportState { |
switch self { |
case .initialised: return .initialised |
case .started: return .started |
case .stopped(let error): return .stopped(error) |
} |
} |
} |
/// Holds the values meaningful in the `.started` state. |
private struct Started { |
/// The stream task itself. |
var streamTask: URLSessionStreamTask |
/// Frames lines to send as data. |
var framer: LineFramer |
/// Unframes received data to parse out the lines. |
var unframer: LineUnframer |
} |
/// Events that trigger changes of state. |
/// |
/// - start: The client has called `start()`. |
/// - send: The client has called `send(message:)`. |
/// - stop: The client has called `stop()` (`notify` false) or something has |
/// caused the stream to stop (`notify` true). `error` will be nil if |
/// the stop was orderly (either because the client called `stop()` or |
/// because there was on EOF on the stream, or non-nil otherwise. |
/// - readCompleted: A read operation has completed. |
private enum Event { |
case start |
case send(message: String) |
case readCompleted(data: Data, isEOF: Bool) |
case stop(error: Error?, notify: Bool) |
} |
/// Processes an event in the state machine. All the heavy lifting here is |
/// done in state event functions, which take the state as a parameter and |
/// return a new state as the function result. |
private func process(event: Event) { |
dispatchPrecondition(condition: .onQueue(queue)) |
switch (self._state, event) { |
case (.initialised, .start): |
self._state = self.handleStart() |
case (_, .start): |
fatalError() |
case (.started(let started), .send(let message)): |
self._state = self.handleSend(message: message, started: started) |
case (_, .send): |
fatalError() |
case (.started(let started), .readCompleted(let data, let isEOF)): |
self._state = self.handleReadCompleted(data: data, isEOF: isEOF, started: started) |
case (_, .readCompleted): |
fatalError() |
case (let _state, .stop(let error, let notify)): |
self._state = self.handleStop(error: error, notify: notify, state: _state) |
} |
} |
/// Runs the supplied closure after the current state machine event has |
/// completed. This is primarily used to post delegate events from a |
/// a state event function. |
/// |
/// - warning: It is critical that the closure check the current state |
/// before acting. It’s quite possible for some intervening event to |
/// change the state to invalidate the resaon why the closure was |
/// scheduled in the first place. |
/// |
/// This function is safe to call from any context and can, in fact, be |
/// called on a secondary thread (the thread running the `URLSession`’s |
/// delegate queue) courtesy of the stream task completion closures. |
/// |
/// - Parameter body: The closure to call. This is passed the current state |
/// as a parameter but, unlike a state event function, is not able to |
/// return a new state directly (although it can call `process(event:)` |
/// if necessary). |
private func deferred(_ body: @escaping (State) -> Void) { |
self.queue.async { |
dispatchPrecondition(condition: .onQueue(self.queue)) |
body(self._state) |
} |
} |
/// The state event function for the `.start` event. Transitions the object |
/// from the `.initialised` state to the `.started` state. |
/// |
/// There’s a couple of things to note here: |
/// |
/// * Stream task have no notion of a ‘did connect’ event, so we transition |
/// straight from `.initialised` to `.started` no ‘starting’ state here. |
/// |
/// * We can’t call the delegate directly here because we _return_ the new |
/// state, so the new state isn’t applied until we return. If we called |
/// the delegate directly, it would see us in the wrong state. |
/// |
/// Instead we defer the delegate callback via a deferred closure. That |
/// must check the state to make sure we haven’t failed in the interim. |
/// |
/// - Returns: The new state. |
private func handleStart() -> State { |
dispatchPrecondition(condition: .onQueue(queue)) |
let streamTask = self.streamTaskMaker() |
if self.useTLS { |
streamTask.startSecureConnection() |
} |
streamTask.resume() |
self.deferred { state in |
guard case .started = state else { return } |
self.delegate?.didStart(transport: self) |
} |
return self.setupRead(started: Started( |
streamTask: streamTask, |
framer: LineFramer(), |
unframer: LineUnframer(maxLineLength: 1024) |
)) |
} |
/// The state event function for the `.send` event. Called when the object |
/// is in the `.started` state and typically leaves the object in the same |
/// state (although with various associated values changed). |
/// |
/// - Parameters: |
/// - message: The message to be sent. |
/// - started: The value associated with the `.started` state. |
/// - Returns: The new state. |
private func handleSend(message: String, started: Started) -> State { |
dispatchPrecondition(condition: .onQueue(queue)) |
let started = started |
let messageData = started.framer.data(for: [message]) |
started.streamTask.write(messageData, timeout: 60.0) { (error) in |
if let error = error { |
self.deferred { _ in |
self.process(event: .stop(error: .streamTaskError(error), notify: true)) |
} |
} |
} |
return .started(started) |
} |
/// The state event function for the `.stop` event. Can be called in any |
/// state and transitions the object to the `.stopped` state. |
/// |
/// - Parameters: |
/// - error: If not nil, this holds the error that caused the |
/// transition; if nil, the transition was not due to an error (it was |
/// either triggered by the client calling `stop()` or by EOF on the |
/// network). |
/// - notify: If true, the client is notified of the stop via the |
/// `didStop(transport:)` delegate callback. |
/// - state: The current state. |
/// - Returns: The new state. |
private func handleStop(error: Error?, notify: Bool, state: State) -> State { |
dispatchPrecondition(condition: .onQueue(queue)) |
// If we’re already stopped, do nothing. |
if case .stopped = state { return state } |
// Clean up based on the current state. |
switch state { |
case .initialised: |
break |
case .started(let started): |
started.streamTask.closeRead() |
started.streamTask.closeWrite() |
case .stopped: |
fatalError() |
} |
// Set up a delegate callback, if required. |
if notify { |
self.deferred { _ in |
// We don't need to check the state here because the check above |
// means that only one person can ever schedule this delegate |
// callback. Moreover, if it was scheduled it needs to be |
// called. |
self.delegate?.didStop(transport: self) |
} |
} |
return .stopped(error) |
} |
/// The state event function for the `.readCompleted` event. Called when |
/// the object is in the `.started` state and typically leaves the object in |
/// the same state (although with various associated values changed). |
/// |
/// This code is a little tricky because: |
/// |
/// * The unframer can error, which will stop the stream. |
/// |
/// * The unframer can generate multiple messages, which each have to |
/// delivered to the delegate via a deferred closure. |
/// |
/// * The stream task could be at EOF, which generates a `.stop` event. |
/// This can’t be issued directly because there could be messages in front |
/// of it. Rather, the stop event has to be deferred until after the |
/// message delivery is done. |
/// |
/// - Parameters: |
/// - data: The data that was read; may be empty. |
/// - isEOF: True if this is the last data that can be read. |
/// - started: The value associated with the `.started` state. |
/// - Returns: The new state. |
private func handleReadCompleted(data: Data, isEOF: Bool, started: Started) -> State { |
dispatchPrecondition(condition: .onQueue(queue)) |
var started = started |
// If we received data, run it through the unframer. |
let messages: [String] |
do { |
messages = try started.unframer.lines(for: data) |
} catch let error as LineUnframer.Error { |
return self.handleStop(error: Error.framingError(error), notify: true, state: .started(started)) |
} catch { |
fatalError() |
} |
// Schedule the delivery of the messages to the delegate. When each of |
// deferred closures run they have to check the state of the object to |
// ensure it hasn't stopped in the meantime. |
for message in messages { |
self.deferred { state in |
guard case .started = state else { return } |
self.delegate?.didReceive(message: message, transport: self) |
} |
} |
// If we’re at EOF, schedule a `.stop` event to shut things down. If |
// not, we set up the next read. Both of these have to return an |
// appropriate new state. |
if isEOF { |
self.deferred { state in |
guard case .started = state else { return } |
self.process(event: .stop(error: nil, notify: true)) |
} |
return .started(started) |
} else { |
return self.setupRead(started: started) |
} |
} |
/// Starts a read request on the stream task. Called when the object is in |
/// the `.started` state and leaves the object in the same state. |
/// |
/// - Parameters: |
/// - started: The value associated with the `.started` state. |
/// - Returns: The new state. |
private func setupRead(started: Started) -> State { |
dispatchPrecondition(condition: .onQueue(queue)) |
started.streamTask.readData(ofMinLength: 1, maxLength: 2048, timeout: 60) { (data, isEOF, error) in |
self.deferred { state in |
guard case .started = state else { return } |
if let error = error { |
self.process(event: .stop(error: .streamTaskError(error), notify: true)) |
} else { |
self.process(event: .readCompleted(data: data ?? Data(), isEOF: isEOF)) |
} |
} |
} |
return .started(started) |
} |
} |
Copyright © 2018 Apple Inc. All Rights Reserved. Terms of Use | Privacy Policy | Updated: 2018-05-10