1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use std::io;
use std::marker;

use BindServer;
use super::Pipeline;
use super::lift::{LiftBind, LiftTransport};
use simple::LiftProto;

use streaming::{self, Message};
use streaming::pipeline::StreamingPipeline;
use tokio_core::reactor::Handle;
use tokio_service::Service;
use futures::{stream, Stream, Sink, Future, IntoFuture, Poll};

type MyStream<E> = stream::Empty<(), E>;

/// A pipelined server protocol.
///
/// The `T` parameter is used for the I/O object used to communicate, which is
/// supplied in `bind_transport`.
///
/// For simple protocols, the `Self` type is often a unit struct. In more
/// advanced cases, `Self` may contain configuration information that is used
/// for setting up the transport in `bind_transport`.
pub trait ServerProto<T: 'static>: 'static {
    /// Request messages.
    type Request: 'static;

    /// Response messages.
    type Response: 'static;

    /// Errors produced by the service.
    type Error: From<io::Error> + 'static;

    /// The message transport, which works with I/O objects of type `T`.
    ///
    /// An easy way to build a transport is to use `tokio_core::io::Framed`
    /// together with a `Codec`; in that case, the transport type is
    /// `Framed<T, YourCodec>`. See the crate docs for an example.
    type Transport: 'static +
        Stream<Item = Self::Request, Error = io::Error> +
        Sink<SinkItem = Self::Response, SinkError = io::Error>;

    /// A future for initializing a transport from an I/O object.
    ///
    /// In simple cases, `Result<Self::Transport, Self::Error>` often suffices.
    type BindTransport: IntoFuture<Item = Self::Transport, Error = io::Error>;

    /// Build a transport from the given I/O object, using `self` for any
    /// configuration.
    ///
    /// An easy way to build a transport is to use `tokio_core::io::Framed`
    /// together with a `Codec`; in that case, `bind_transport` is just
    /// `io.framed(YourCodec)`. See the crate docs for an example.
    fn bind_transport(&self, io: T) -> Self::BindTransport;
}

impl<T: 'static, P: ServerProto<T>> BindServer<Pipeline, T> for P {
    type ServiceRequest = P::Request;
    type ServiceResponse = P::Response;
    type ServiceError = P::Error;

    fn bind_server<S>(&self, handle: &Handle, io: T, service: S)
        where S: Service<Request = Self::ServiceRequest,
                         Response = Self::ServiceResponse,
                         Error = P::Error> + 'static
    {
        BindServer::<StreamingPipeline<MyStream<P::Error>>, T>::bind_server(
            LiftProto::from_ref(self), handle, io, LiftService(service)
        )
    }
}

impl<T, P> streaming::pipeline::ServerProto<T> for LiftProto<P> where
    T: 'static, P: ServerProto<T>
{
    type Request = P::Request;
    type RequestBody = ();

    type Response = P::Response;
    type ResponseBody = ();

    type Error = P::Error;

    type Transport = LiftTransport<P::Transport, P::Error>;
    type BindTransport = LiftBind<T, <P::BindTransport as IntoFuture>::Future, P::Error>;

    fn bind_transport(&self, io: T) -> Self::BindTransport {
        LiftBind::lift(ServerProto::bind_transport(self.lower(), io).into_future())
    }
}

struct LiftService<S>(S);

impl<S: Service> Service for LiftService<S> {
    type Request = streaming::Message<S::Request, streaming::Body<(), S::Error>>;
    type Response = streaming::Message<S::Response, MyStream<S::Error>>;
    type Error = S::Error;
    type Future = LiftFuture<S::Future, stream::Empty<(), S::Error>>;

    fn call(&mut self, req: Self::Request) -> Self::Future {
        match req {
            Message::WithoutBody(msg) => {
                LiftFuture(self.0.call(msg), marker::PhantomData)
            }
            Message::WithBody(..) => panic!("bodies not supported"),
        }
    }
}

struct LiftFuture<F, T>(F, marker::PhantomData<fn() -> T>);

impl<F: Future, T> Future for LiftFuture<F, T> {
    type Item = Message<F::Item, T>;
    type Error = F::Error;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        let item = try_ready!(self.0.poll());
        Ok(Message::WithoutBody(item).into())
    }
}