Announcing Tokio-Compat

December 18, 2019

The release of Tokio 0.2 was the culmination of a great deal of hard work from numerous contributors, and has brought several significant improvements to Tokio. Using std::future and async/await makes writing async code using Tokio much more ergonomic, and a new scheduler implementation makes Tokio 0.2’s thread pool as much as 10x faster. However, updating existing Tokio 0.1 projects to use 0.2 and std::future poses some new challenges. Therefore, we’re very excited to announce the release of the tokio-compat crate to help ease this transition, by providing a runtime compatible with both Tokio 0.1 and Tokio 0.2 futures.

You can find tokio-compat on crates.io and on GitHub.

Motivation

Both the transition from futures 0.1 to std::future and the breaking changes to Tokio’s APIs in 0.2 have made it challenging to update existing projects to benefit from the variety of improvements Tokio 0.2 offers. Additionally, these breaking changes make it difficult for projects to migrate incrementally: instead, they must do so all at once, requiring a lot of effort and sometimes putting other work on hold until the migration is complete. In order to enable incremental migration, we need a compatibility layer that allows us to use code written against both the legacy Tokio 0.1/futures 0.1 APIs and the new Tokio 0.2/std::future APIs in the same project.

The futures crate’s compat module provides interoperability between futures 0.1 and std::future future types, such as implementing std::future::Future for a type that implements the futures 0.1 Future This is a foundational part of the compatibility story, but, on its own, it is insufficient to allow most projects to be incrementally updated. Most code using Tokio relies on the runtime services that Tokio provides. These runtime services include the ability to spawn other tasks; the I/O driver, which allows tasks to be notified by the operating system’s async I/O APIs, and the timer. Futures which rely on Tokio 0.1’s runtime services will panic when they try to access those runtime services (such as by spawning a task or creating a timer) on the Tokio 0.2 runtime, even if they are converted to the std::future::Future trait. This is because the new runtime does not provide these services in a way compatible with Tokio 0.1’s APIs.

The tokio-compat crate helps to bridge this gap by providing a compatibility runtime, which provides runtime services compatible with both Tokio 0.1 and Tokio 0.2. For example, using tokio-compat, we can write code like this:

use futures_01::future::lazy;

tokio_compat::run(lazy(|| {
    // spawn a `futures` 0.1 future using the `spawn` function from the
    // `tokio` 0.1 crate:
    tokio_01::spawn(lazy(|| {
        println!("hello from tokio 0.1!");
        Ok(())
    }));

    // spawn an `async` block future on the same runtime using `tokio`
    // 0.2's `spawn`:
    tokio_02::spawn(async {
        println!("hello from tokio 0.2!");
    });

    Ok(())
}))

Similarly, we can run tasks that rely on both the 0.1 and 0.2 versions of runtime services like the timer and I/O driver:

use std::time::{Duration, Instant};
use tokio_compat::prelude::*;

tokio_compat::run_std(async {
    // Wait for a `tokio` 0.1 `Delay`...
    let when = Instant::now() + Duration::from_millis(10);
    tokio_01::timer::Delay::new(when)
        // convert the delay future into a `std::future` that we can `await`.
        .compat()
        .await
        .expect("tokio 0.1 timer should work!");
    println!("10 ms have elapsed");

    // Wait for a `tokio` 0.2 `Delay`...
    tokio_02::time::delay_for(Duration::from_millis(20)).await;
    println!("20 ms have elapsed");
});

Using tokio-compat

Primary use-cases for tokio-compat include:

  • Incrementally migrating applications: Updating a large project to use new APIs is challenging. This kind of change is often much easier when it can be made gradually module-by-module, or by requiring that new code added to the project use new APIs and slowly rewriting existing code as it’s changed for other reasons. Due to the incompatibility between 0.1 and 0.2 runtimes, however, this isn’t really possible for most projects. Instead, it’s necessary to update everything all at once, requiring a single large change that can often hold up other work. tokio-compat can allow projects to migrate incrementally instead.
  • Using legacy libraries in new code: If a new project that uses Tokio 0.2 needs functionality from a library written against 0.1, its authors are faced with a variety of choices, none of which are particularly good. They can block the progress of their features on the dependency being rewritten to use 0.2, which can take a long time; they can rewrite their project to use 0.1, giving up all the advantages of async/await and likely requiring another rewrite when the dependency is updated; or they can take on the responsibility of updating the dependency themselves. With tokio-compat, however, it is possible to use futures from libraries that expect Tokio 0.1 on the same runtime as Tokio 0.2 futures.

In all these cases, tokio-compat is hopefully a temporary necessity: ideally, most code should transition to using async/await, std::future, and Tokio 0.2. Although we’ve worked hard to make the compatibility layer as lightweight as possible, it is, by definition, an additional source of complexity in the system. Furthermore, async/await offers significant ergonomics advantages, and code using it is easier to understand and modify, so most projects will benefit greatly from moving to it. The role of tokio-compat is to make this transition easier and more incremental.

Getting Started

The APIs provided by tokio-compat are intended as a drop-in replacement for tokio 0.1’s APIs. The runtimes provided by tokio-compat expose functions with the same names and signatures as Tokio 0.1’s runtimes. Therefore, in many cases, getting started with tokio-compat is as simple as adding

tokio-compat = { version = "0.1", features = ["rt-full"] }

to your Cargo.toml, and changing import and paths referencing tokio 0.1’s Runtime module to tokio_compat’s. So, for example,

tokio::runtime::run(future);

becomes

tokio_compat::runtime::run(future);

Similarly,

use tokio::runtime::Runtime;

let mut rt = Runtime::new().unwrap();

rt.spawn(future);

rt.shutdown_on_idle()
    .wait()
    .unwrap();

becomes

use tokio_compat::runtime::Runtime;

let mut rt = Runtime::new().unwrap();

rt.spawn(future);

rt.shutdown_on_idle()
    .wait()
    .unwrap();

Only the tokio::runtime module and tokio::run function need to be replaced with tokio-compat’s versions. Other APIs exposed by Tokio 0.1, such as tokio::net, will work properly on the compatibility runtime (with some exceptions we’ll discuss shortly). When running on the compatibility runtime, code which spawns futures 0.1 tasks via tokio 0.1’s spawn function will work, as will code that spawns std::future tasks via tokio 0.2’s spawn.

Additionally, the tokio-compat runtimes, TaskExecutors, and other types also provide std::future-compatible methods. For example, the Runtime has both a spawn, which spawns 0.1 futures, and a spawn_std, which spawns a std::future future (or an async function/block). See this section in the tokio-compat API docs for details on spawning.

Once a project is running on the compatibility runtime, it’s easy to gradually migrate to std::future and async/await. One option is to simply perform a “one to one” translation of the existing codebase: for example, types implementing futures::future::Future are rewritten to implement std::future::Future, code using future combinators is changed to use the futures 0.3 versions of those combinators, and so on. In many cases, the required changes are fairly mechanical, e.g. changing imports, renaming Async::NotReady to Poll::Pending, et cetera. However, the Pin<&mut Self> receiver type for std::future::Future’s poll method can make migrating manual Future implementations challenging. Crates such as pin-project can be helpful when dealing with the stack pinning requirement.

In most cases, however, it is often significantly easier to rewrite the existing code to use async/await syntax, rather than implementing Future by hand or using future combinators. Although such a change is larger when measured in terms of how many lines of code were modified, the ergonomic benefits of async/await syntax can make the migration much easier — oftentimes, a large amount of boilerplate code can simply be removed. Furthermore, this has the benefit of resulting in more idiomatic, readable, and maintainable code. For most projects, switching to use async/await syntax is the recommended migration path.

Notes

There are a small number of “gotchas” to keep in mind in the current tokio-compat v0.1. In particular, it is important to note that he compatibility thread pool runtime does not currently support the tokio 0.1 tokio_threadpool::blocking API. Calls to the legacy version of blocking made on the compatibility runtime will currently fail. In the future, tokio-compat will allow transparently replacing legacy blocking with the tokio 0.2 blocking APIs, but in the meantime, it will be necessary to convert this code to call into the tokio 0.2 task::block_in_place and task::spawn_blocking APIs instead. Since tokio::fs relies on the blocking APIs, the Tokio 0.1 version of tokio::fs also will not currently work on the compatibility runtime. See here for details.

Additionally, it’s important to keep in mind that Tokio 0.1 and Tokio 0.2 provide subtly different behavior around spawning tasks and shutting down runtimes. In particular, Tokio 0.1 tracks whether a runtime is idle (i.e., it has no futures running on it), and provides a Runtime::shutdown_on_idle method which shuts down the runtime when it becomes idle. On the other hand, Tokio 0.2’s spawn functions return JoinHandles, which can be used to await the completion of spawned tasks, and users are instead expected to await these JoinHandles in order to determine when the runtime should shut down. Therefore, tokio-compat provides both APIs, but it is important to note that only tasks spawned without JoinHandles “count” against the runtime being considered idle. See this section in the documentation for details.

Case Study: Vector

Vector is a high-performance observability data router for collecting and transforming logs, metrics, and events. Vector is a production application that relies on the Tokio runtime, and currently depends on Tokio 0.1.

After switching Vector from Tokio 0.1 to Tokio 0.2, its maintainers observed significant performance improvements in benchmarks:

benchmark group time (0.1) thoughput (0.1) time (0.2) throughput (0.2) speedup
batch 10mb with 2mb batches 5.5±0.14ms 1732.8 MB/s 5.1±0.15ms 1864.0 MB/s x 1.08
buffers/in-memory 199.3±2.35ms 47.8 MB/s 99.6±4.40ms 95.7 MB/s x 2.00
http/http_gzip 191.1±3.17ms 49.9 MB/s 94.3±6.24ms 101.1 MB/s x 2.03
http/http_no_compression 191.8±4.68ms 49.7 MB/s 91.2±2.56ms 104.6 MB/s x 2.10
interconnected 238.7±5.66ms 79.9 MB/s 209.7±5.32ms 91.0 MB/s x 1.14
pipe 198.4±0.60ms 48.1 MB/s 106.0±6.52ms 90.0 MB/s x 1.87
pipe_with_many_writers 131.7±12.56ms 72.4 MB/s 150.7±10.56ms 63.3 MB/s x 0.86
pipe_with_tiny_lines 165.7±0.77ms 589.2 KB/s 84.1±0.66ms 1161.4 KB/s x 1.97
transforms 275.7±3.28ms 38.0 MB/s 167.5±8.07ms 62.6 MB/s x 1.65

Note how in many of these benchmarks, the Tokio 0.2 version (using tokio-compat) was as much as two times faster than the Tokio 0.1 version, simply by changing the Tokio runtime used.

The surface area of the change necessary to update Vector to use the compatibility runtime is also quite minimal. Here’s the full diff:

diff --git a/src/runtime.rs b/src/runtime.rs
index 07005a7d..4d64a106 100644
--- a/src/runtime.rs
+++ b/src/runtime.rs
@@ -1,15 +1,16 @@
 use futures::future::{ExecuteError, Executor, Future};
+use futures_util::future::{FutureExt, TryFutureExt};
 use std::io;
-use tokio::runtime::Builder;
+use tokio_compat::runtime::Builder;

 pub struct Runtime {
-    rt: tokio::runtime::Runtime,
+    rt: tokio_compat::runtime::Runtime,
 }

 impl Runtime {
     pub fn new() -> io::Result<Self> {
         Ok(Runtime {
-            rt: tokio::runtime::Runtime::new()?,
+            rt: tokio_compat::runtime::Runtime::new()?,
         })
     }

@@ -47,17 +48,17 @@ impl Runtime {
     }

     pub fn shutdown_on_idle(self) -> impl Future<Item = (), Error = ()> {
-        self.rt.shutdown_on_idle()
+        self.rt.shutdown_on_idle().unit_error().boxed().compat()
     }

     pub fn shutdown_now(self) -> impl Future<Item = (), Error = ()> {
-        self.rt.shutdown_now()
+        self.rt.shutdown_now().unit_error().boxed().compat()
     }
 }

 #[derive(Clone, Debug)]
 pub struct TaskExecutor {
-    inner: tokio::runtime::TaskExecutor,
+    inner: tokio_compat::runtime::TaskExecutor,
 }

 impl TaskExecutor {
@@ -71,6 +72,7 @@ where
     F: Future<Item = (), Error = ()> + Send + 'static,
 {
     fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
-        self.inner.execute(future)
+        self.inner.spawn(future);
+        Ok(())
     }
 }
diff --git a/src/sinks/kafka.rs b/src/sinks/kafka.rs
index 1b99328b..880374b2 100644
--- a/src/sinks/kafka.rs
+++ b/src/sinks/kafka.rs
@@ -6,7 +6,7 @@ use crate::{
     topology::config::{DataType, SinkConfig, SinkContext, SinkDescription},
 };
 use futures::{
-    future::{self, poll_fn, IntoFuture},
+    future::{self, IntoFuture},
     stream::FuturesUnordered,
     Async, AsyncSink, Future, Poll, Sink, StartSend, Stream,
 };
@@ -213,18 +213,14 @@ impl Sink for KafkaSink {
 fn healthcheck(config: KafkaSinkConfig) -> super::Healthcheck {
     let consumer: BaseConsumer = config.to_rdkafka().unwrap().create().unwrap();

-    let check = poll_fn(move || {
-        tokio_threadpool::blocking(|| {
-            consumer
-                .fetch_metadata(Some(&config.topic), Duration::from_secs(3))
-                .map(|_| ())
-                .map_err(|err| err.into())
-        })
-    })
-    .map_err(|err| err.into())
-    .and_then(|result| result.into_future());
+    let task = tokio02::task::block_in_place(|| {
+        consumer
+            .fetch_metadata(Some(&config.topic), Duration::from_secs(3))
+            .map(|_| ())
+            .map_err(|err| err.into())
+    });

-    Box::new(check)
+    Box::new(task.into_future())
 }

(thanks to @LucioFranco for trying out tokio-compat in Vector!)

Conclusion

Many major open-source projects have been using Tokio since the tokio-core days. Asynchronous programming in Rust has come incredibly far since then, and we hope tokio-compat makes it easy for all Tokio users to start seeing benefits from the new std::future/Tokio 0.2 ecosystem as painlessly as possible. Of course, as with all 0.1 releases, there’s still more to do, including:

  • Seamless support for tokio-threadpool 0.1’s blocking APIs
  • A compatibility layer for tokio-io 0.1’s AsyncRead and AsyncWrite traits
  • Reimplementations of APIs from Tokio 0.1 that were removed in 0.2
  • The obligatory bug fixes and performance improvements

You can find the tokio-compat repository on GitHub. As always, we would love feedback and contributions from the Tokio community. If you need help, or want to discuss tokio-compat details, please join us on Discord — all are welcome!

— Eliza Weisman (@hawkw)