16310

How do I convert an iterator into a stream on success or an empty stream on failure?

<h3>Question</h3>

I'd like to take a regular iterator and turn it into a stream so that I can do further stream processing. The trouble is that I may have an iterator or an error to deal with. I think I'm pretty close with this:

#[macro_use] extern crate log; extern crate futures; // 0.1.21 extern crate tokio; use futures::prelude::*; use futures::{future, stream}; use std::fmt::Debug; use std::net::{SocketAddr, ToSocketAddrs}; fn resolve(addrs: impl ToSocketAddrs + Debug) -> impl Stream<Item = SocketAddr, Error = ()> { match addrs.to_socket_addrs() { Ok(iter) => stream::unfold(iter, |iter| match iter.next() { Some(a) => Some(future::ok((a, iter))), None => None, }), Err(e) => { error!("could not resolve socket addresses {:?}: {:?}", addrs, e); stream::empty() } } } fn main() { let task = resolve("1.2.3.4:12345") .map_err(|e| error!("{:?}", e)) .for_each(|addr| info!("{:?}", addr)) .fold(); tokio::run(task); }

playground

<pre class="lang-none prettyprint-override">error[E0308]: match arms have incompatible types --> src/main.rs:12:5 | 12 | / match addrs.to_socket_addrs() { 13 | | Ok(iter) => stream::unfold(iter, |iter| match iter.next() { 14 | | Some(a) => Some(future::ok((a, iter))), 15 | | None => None, ... | 20 | | } 21 | | } | |_____^ expected struct `futures::stream::Unfold`, found struct `futures::stream::Empty` | = note: expected type `futures::stream::Unfold<<impl ToSocketAddrs + Debug as std::net::ToSocketAddrs>::Iter, [closure@src/main.rs:13:42: 16:10], futures::FutureResult<(std::net::SocketAddr, <impl ToSocketAddrs + Debug as std::net::ToSocketAddrs>::Iter), _>>` found type `futures::stream::Empty<_, _>` note: match arm with an incompatible type --> src/main.rs:17:19 | 17 | Err(e) => { | ___________________^ 18 | | error!("could not resolve socket addresses {:?}: {:?}", addrs, e); 19 | | stream::empty() 20 | | } | |_________^ error[E0277]: the trait bound `(): futures::Future` is not satisfied --> src/main.rs:27:10 | 27 | .for_each(|addr| info!("{:?}", addr)) | ^^^^^^^^ the trait `futures::Future` is not implemented for `()` | = note: required because of the requirements on the impl of `futures::IntoFuture` for `()` error[E0599]: no method named `fold` found for type `futures::stream::ForEach<futures::stream::MapErr<impl futures::Stream, [closure@src/main.rs:26:18: 26:39]>, [closure@src/main.rs:27:19: 27:45], ()>` in the current scope --> src/main.rs:28:10 | 28 | .fold(); | ^^^^ | = note: the method `fold` exists but the following trait bounds were not satisfied: `&mut futures::stream::ForEach<futures::stream::MapErr<impl futures::Stream, [closure@src/main.rs:26:18: 26:39]>, [closure@src/main.rs:27:19: 27:45], ()> : futures::Stream` `&mut futures::stream::ForEach<futures::stream::MapErr<impl futures::Stream, [closure@src/main.rs:26:18: 26:39]>, [closure@src/main.rs:27:19: 27:45], ()> : std::iter::Iterator`

The hint is pretty obvious. The two Results I'm returning from the match differ and should be the same. Now, how can I do that so that I return a stream?


<h3>Answer1:</h3>

Rust is a statically typed language which means that the return type of a function has to be a single type, known at compile time. You are attempting to return multiple types, decided at runtime.

The closest solution to your original is to always return the Unfold stream:

fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> { stream::unfold(addrs.to_socket_addrs(), |r| { match r { Ok(mut iter) => iter.next().map(|addr| future::ok((addr, Ok(iter)))), Err(_) => None, } }) } <hr />

But why reinvent the wheel?

futures::stream::iter_ok

<blockquote>

Converts an Iterator into a Stream which is always ready to yield the next value.

</blockquote>

Subsequent versions of the futures crate implement Stream for Either, which makes this very elegant:

fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> { match addrs.to_socket_addrs() { Ok(iter) => stream::iter_ok(iter).left_stream(), Err(_) => stream::empty().right_stream(), } }

It's straightforward to backport this functionality to futures 0.1 (maybe someone should submit it as a PR for those who are stuck on 0.1...):

enum MyEither<L, R> { Left(L), Right(R), } impl<L, R> Stream for MyEither<L, R> where L: Stream, R: Stream<Item = L::Item, Error = L::Error>, { type Item = L::Item; type Error = L::Error; fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { match self { MyEither::Left(l) => l.poll(), MyEither::Right(r) => r.poll(), } } } trait EitherStreamExt { fn left_stream<R>(self) -> MyEither<Self, R> where Self: Sized; fn right_stream<L>(self) -> MyEither<L, Self> where Self: Sized; } impl<S: Stream> EitherStreamExt for S { fn left_stream<R>(self) -> MyEither<Self, R> { MyEither::Left(self) } fn right_stream<L>(self) -> MyEither<L, Self> { MyEither::Right(self) } } <hr />

Even better, use the fact that Result is an iterator and Stream::flatten exists:

fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> { stream::iter_ok(addrs.to_socket_addrs()) .map(stream::iter_ok) .flatten() }

Or if you really want to print errors:

fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> { stream::once(addrs.to_socket_addrs()) .map(stream::iter_ok) .map_err(|e| eprintln!("err: {}", e)) .flatten() }

See also:

<ul><li>Conditionally return empty iterator from flat_map</li> <li>Conditionally iterate over one of several possible iterators</li> <li>What is the correct way to return an Iterator (or any other trait)?</li> </ul>

来源:https://stackoverflow.com/questions/51114794/how-do-i-convert-an-iterator-into-a-stream-on-success-or-an-empty-stream-on-fail

Recommend

  • Limiting Fps in python
  • Divide and conquer - why does it work?
  • hibernate 类型转换
  • React/Redux + super agent, first call gets terminated
  • standalone java webservice client
  • Do warp vote functions synchronize threads in the warp?
  • Create .so file in android studio and used it in another application in Android
  • RestSharp PUT XML, RestSharp is sending it as GET?
  • iPhone communication using Sockets [closed]
  • select query in wordpress
  • Saved Core Data does not persist after app closes 80% of the time
  • JSON data - Group by days of the week(Sun,Mon,Tue, Wed,Thu,Fri,Sat) using Javascript/Jquery
  • Android MediaCodec appears to buffer H264 frames
  • CRM Dynamics How to set short list - long list relationship
  • locationManager avoid (null) string in a Label
  • get value using jquery
  • Unknown type name with typedef struct in C
  • Joining across databases with dbplyr
  • Firebase suddenly reports invalid signature
  • Can someone explain how Yii minimizing assets is supposed to work on Heroku?
  • Unable to connect to Azure MySQL Database through Azure Function - C#
  • How to get the Owner of the ContextMenu (from Silverlight 4 toolkit)?
  • cSPADE data mining in R using arulesSequences - Error while converting to “transactions” format
  • 'Edit' function for forum posts and such
  • How to redirect into different page by user type in php and mysql
  • Separating definition/instantiation of template classes without 'extern'
  • Add font awesome icon to custom add to cart button in Woocommerce 3
  • how to read to huge file into buffer
  • How to use Streams api peek() function and make it work?
  • How to handle div that is created dynamically in a table
  • Google App Engine backend servlet not responding