os_pipe/
lib.rs

1//! A cross-platform library for opening OS pipes, like those from
2//! [`pipe`](https://man7.org/linux/man-pages/man2/pipe.2.html) on Linux or
3//! [`CreatePipe`](https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-createpipe)
4//! on Windows. The Rust standard library provides
5//! [`Stdio::piped`](https://doc.rust-lang.org/std/process/struct.Stdio.html#method.piped) for
6//! simple use cases involving child processes, ~~but it doesn't support creating pipes directly.
7//! This crate fills that gap.~~ **Update:** Rust 1.87 added
8//! [`std::io::pipe`](https://doc.rust-lang.org/std/io/fn.pipe.html), so this crate is no longer
9//! needed except to support older compiler versions.
10//!
11//! - [Docs](https://docs.rs/os_pipe)
12//! - [Crate](https://crates.io/crates/os_pipe)
13//! - [Repo](https://github.com/oconnor663/os_pipe.rs)
14//!
15//! # Common deadlocks related to pipes
16//!
17//! When you work with pipes, you often end up debugging a deadlock at
18//! some point. These can be confusing if you don't know why they
19//! happen. Here are two things you need to know:
20//!
21//! 1. Pipe reads block until some bytes are written or all writers are
22//!    closed. **If you forget to close a writer, reads can block
23//!    forever.** This includes writers inside a
24//!    [`std::process::Command`](https://doc.rust-lang.org/std/process/struct.Command.html)
25//!    object or writers given to child processes.
26//! 2. Pipes have an internal buffer of some fixed size. On Linux for
27//!    example, pipe buffers are 64 KiB by default. Pipe writes block
28//!    until buffer space is available or all readers are closed. **If
29//!    you have readers open but not reading, writes can block
30//!    forever.**
31//!
32//! Deadlocked reads caused by a forgotten writer usually show up
33//! immediately, which makes them relatively easy to fix once you know
34//! what to look for. (See "Avoid a deadlock!" in the example code
35//! below.) However, deadlocked writes caused by full pipe buffers are
36//! trickier. These might only show up for larger inputs, and they might
37//! be timing-dependent or platform-dependent. If you find that writing
38//! to a pipe deadlocks sometimes, think about who's supposed to be
39//! reading from that pipe and whether that thread or process might be
40//! blocked on something else. For more on this, see the [Gotchas
41//! Doc](https://github.com/oconnor663/duct.py/blob/master/gotchas.md#using-io-threads-to-avoid-blocking-children)
42//! from the [`duct`](https://github.com/oconnor663/duct.rs) crate. (And
43//! consider whether [`duct`](https://github.com/oconnor663/duct.rs)
44//! might be a good fit for your use case.)
45//!
46//! # Examples
47//!
48//! Here we write a single byte into a pipe and read it back out:
49//!
50//! ```rust
51//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
52//! use std::io::prelude::*;
53//!
54//! let (mut reader, mut writer) = os_pipe::pipe()?;
55//! // XXX: If this write blocks, we'll never get to the read.
56//! writer.write_all(b"x")?;
57//! let mut output = [0];
58//! reader.read_exact(&mut output)?;
59//! assert_eq!(b"x", &output);
60//! # Ok(())
61//! # }
62//! ```
63//!
64//! This is a minimal working example, but as discussed in the section
65//! above, reading and writing on the same thread like this is
66//! deadlock-prone. If we wrote 100 KB instead of just one byte, this
67//! example would block on `write_all`, it would never make it to
68//! `read_exact`, and that would be a deadlock. Doing the read and write
69//! from different threads or different processes would fix the
70//! deadlock.
71//!
72//! For a more complex example, here we join the stdout and stderr of a
73//! child process into a single pipe. To do that we open a pipe, clone
74//! its writer, and set that pair of writers as the child's stdout and
75//! stderr. (This is possible because `PipeWriter` implements
76//! `Into<Stdio>`.) Then we can read interleaved output from the pipe
77//! reader. This example is deadlock-free, but note the comment about
78//! closing the writers.
79//!
80//! ```rust
81//! # use std::io::prelude::*;
82//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
83//! // We're going to spawn a child process that prints "foo" to stdout
84//! // and "bar" to stderr, and we'll combine these into a single pipe.
85//! let mut command = std::process::Command::new("python");
86//! command.args(&["-c", r#"
87//! import sys
88//! sys.stdout.write("foo")
89//! sys.stdout.flush()
90//! sys.stderr.write("bar")
91//! sys.stderr.flush()
92//! "#]);
93//!
94//! // Here's the interesting part. Open a pipe, clone its writer, and
95//! // set that pair of writers as the child's stdout and stderr.
96//! let (mut reader, writer) = os_pipe::pipe()?;
97//! let writer_clone = writer.try_clone()?;
98//! command.stdout(writer);
99//! command.stderr(writer_clone);
100//!
101//! // Now start the child process running.
102//! let mut handle = command.spawn()?;
103//!
104//! // Avoid a deadlock! This parent process is still holding open pipe
105//! // writers inside the Command object, and we have to close those
106//! // before we read. Here we do this by dropping the Command object.
107//! drop(command);
108//!
109//! // Finally we can read all the output and clean up the child.
110//! let mut output = String::new();
111//! reader.read_to_string(&mut output)?;
112//! handle.wait()?;
113//! assert_eq!(output, "foobar");
114//! # Ok(())
115//! # }
116//! ```
117//!
118//! Note that the [`duct`](https://github.com/oconnor663/duct.rs) crate
119//! can reproduce the example above in a single line of code, with no
120//! risk of deadlocks and no risk of leaking [zombie
121//! children](https://en.wikipedia.org/wiki/Zombie_process).
122
123use std::fs::File;
124use std::io;
125use std::process::Stdio;
126
127#[cfg(not(windows))]
128#[path = "unix.rs"]
129mod sys;
130#[cfg(windows)]
131#[path = "windows.rs"]
132mod sys;
133
134/// The reading end of a pipe, returned by [`pipe`](fn.pipe.html).
135///
136/// `PipeReader` implements `Into<Stdio>`, so you can pass it as an argument to
137/// `Command::stdin` to spawn a child process that reads from the pipe.
138#[derive(Debug)]
139pub struct PipeReader(
140    // We use std::fs::File here for two reasons: OwnedFd and OwnedHandle are platform-specific,
141    // and this gives us read/write/flush for free.
142    File,
143);
144
145impl PipeReader {
146    pub fn try_clone(&self) -> io::Result<PipeReader> {
147        self.0.try_clone().map(PipeReader)
148    }
149}
150
151impl io::Read for PipeReader {
152    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
153        self.0.read(buf)
154    }
155}
156
157impl<'a> io::Read for &'a PipeReader {
158    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
159        (&self.0).read(buf)
160    }
161}
162
163impl From<PipeReader> for Stdio {
164    fn from(p: PipeReader) -> Stdio {
165        p.0.into()
166    }
167}
168
169/// The writing end of a pipe, returned by [`pipe`](fn.pipe.html).
170///
171/// `PipeWriter` implements `Into<Stdio>`, so you can pass it as an argument to
172/// `Command::stdout` or `Command::stderr` to spawn a child process that writes
173/// to the pipe.
174#[derive(Debug)]
175pub struct PipeWriter(File);
176
177impl PipeWriter {
178    pub fn try_clone(&self) -> io::Result<PipeWriter> {
179        self.0.try_clone().map(PipeWriter)
180    }
181}
182
183impl io::Write for PipeWriter {
184    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
185        self.0.write(buf)
186    }
187
188    fn flush(&mut self) -> io::Result<()> {
189        self.0.flush()
190    }
191}
192
193impl<'a> io::Write for &'a PipeWriter {
194    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
195        (&self.0).write(buf)
196    }
197
198    fn flush(&mut self) -> io::Result<()> {
199        (&self.0).flush()
200    }
201}
202
203impl From<PipeWriter> for Stdio {
204    fn from(p: PipeWriter) -> Stdio {
205        p.0.into()
206    }
207}
208
209/// Open a new pipe and return a [`PipeReader`] and [`PipeWriter`] pair.
210///
211/// This corresponds to the `pipe2` library call on Posix and the
212/// `CreatePipe` library call on Windows (though these implementation
213/// details might change). These pipes are non-inheritable, so new child
214/// processes won't receive a copy of them unless they're explicitly
215/// passed as stdin/stdout/stderr.
216///
217/// [`PipeReader`]: struct.PipeReader.html
218/// [`PipeWriter`]: struct.PipeWriter.html
219pub fn pipe() -> io::Result<(PipeReader, PipeWriter)> {
220    sys::pipe()
221}
222
223/// Get a duplicated copy of the current process's standard input, as a
224/// [`PipeReader`].
225///
226/// Reading directly from this pipe isn't recommended, because it's not
227/// synchronized with [`std::io::stdin`]. [`PipeReader`] implements
228/// [`Into<Stdio>`], so it can be passed directly to [`Command::stdin`]. This is
229/// equivalent to [`Stdio::inherit`], though, so it's usually not necessary
230/// unless you need a collection of different pipes.
231///
232/// [`std::io::stdin`]: https://doc.rust-lang.org/std/io/fn.stdin.html
233/// [`PipeReader`]: struct.PipeReader.html
234/// [`Into<Stdio>`]: https://doc.rust-lang.org/std/process/struct.Stdio.html
235/// [`Command::stdin`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stdin
236/// [`Stdio::inherit`]: https://doc.rust-lang.org/std/process/struct.Stdio.html#method.inherit
237pub fn dup_stdin() -> io::Result<PipeReader> {
238    sys::dup(io::stdin()).map(PipeReader::from)
239}
240
241/// Get a duplicated copy of the current process's standard output, as a
242/// [`PipeWriter`](struct.PipeWriter.html).
243///
244/// Writing directly to this pipe isn't recommended, because it's not
245/// synchronized with [`std::io::stdout`]. [`PipeWriter`] implements
246/// [`Into<Stdio>`], so it can be passed directly to [`Command::stdout`] or
247/// [`Command::stderr`]. This can be useful if you want the child's stderr to go
248/// to the parent's stdout.
249///
250/// [`std::io::stdout`]: https://doc.rust-lang.org/std/io/fn.stdout.html
251/// [`PipeWriter`]: struct.PipeWriter.html
252/// [`Into<Stdio>`]: https://doc.rust-lang.org/std/process/struct.Stdio.html
253/// [`Command::stdout`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stdout
254/// [`Command::stderr`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stderr
255/// [`Stdio::inherit`]: https://doc.rust-lang.org/std/process/struct.Stdio.html#method.inherit
256pub fn dup_stdout() -> io::Result<PipeWriter> {
257    sys::dup(io::stdout()).map(PipeWriter::from)
258}
259
260/// Get a duplicated copy of the current process's standard error, as a
261/// [`PipeWriter`](struct.PipeWriter.html).
262///
263/// Writing directly to this pipe isn't recommended, because it's not
264/// synchronized with [`std::io::stderr`]. [`PipeWriter`] implements
265/// [`Into<Stdio>`], so it can be passed directly to [`Command::stdout`] or
266/// [`Command::stderr`]. This can be useful if you want the child's stdout to go
267/// to the parent's stderr.
268///
269/// [`std::io::stderr`]: https://doc.rust-lang.org/std/io/fn.stderr.html
270/// [`PipeWriter`]: struct.PipeWriter.html
271/// [`Into<Stdio>`]: https://doc.rust-lang.org/std/process/struct.Stdio.html
272/// [`Command::stdout`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stdout
273/// [`Command::stderr`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stderr
274/// [`Stdio::inherit`]: https://doc.rust-lang.org/std/process/struct.Stdio.html#method.inherit
275pub fn dup_stderr() -> io::Result<PipeWriter> {
276    sys::dup(io::stderr()).map(PipeWriter::from)
277}
278
279#[cfg(test)]
280mod tests {
281    use std::env::consts::EXE_EXTENSION;
282    use std::io::prelude::*;
283    use std::path::{Path, PathBuf};
284    use std::process::Command;
285    use std::sync::Once;
286    use std::thread;
287
288    fn path_to_exe(name: &str) -> PathBuf {
289        // This project defines some associated binaries for testing, and we shell out to them in
290        // these tests. `cargo test` doesn't automatically build associated binaries, so this
291        // function takes care of building them explicitly, with the right debug/release flavor.
292        static CARGO_BUILD_ONCE: Once = Once::new();
293        CARGO_BUILD_ONCE.call_once(|| {
294            let mut build_command = Command::new("cargo");
295            build_command.args(&["build", "--quiet"]);
296            if !cfg!(debug_assertions) {
297                build_command.arg("--release");
298            }
299            let build_status = build_command.status().unwrap();
300            assert!(
301                build_status.success(),
302                "Cargo failed to build associated binaries."
303            );
304        });
305        let flavor = if cfg!(debug_assertions) {
306            "debug"
307        } else {
308            "release"
309        };
310        Path::new("target")
311            .join(flavor)
312            .join(name)
313            .with_extension(EXE_EXTENSION)
314    }
315
316    #[test]
317    fn test_pipe_some_data() {
318        let (mut reader, mut writer) = crate::pipe().unwrap();
319        // A small write won't fill the pipe buffer, so it won't block this thread.
320        writer.write_all(b"some stuff").unwrap();
321        drop(writer);
322        let mut out = String::new();
323        reader.read_to_string(&mut out).unwrap();
324        assert_eq!(out, "some stuff");
325    }
326
327    #[test]
328    fn test_pipe_some_data_with_refs() {
329        // As with `File`, there's a second set of impls for shared
330        // refs. Test those.
331        let (reader, writer) = crate::pipe().unwrap();
332        let mut reader_ref = &reader;
333        {
334            let mut writer_ref = &writer;
335            // A small write won't fill the pipe buffer, so it won't block this thread.
336            writer_ref.write_all(b"some stuff").unwrap();
337        }
338        drop(writer);
339        let mut out = String::new();
340        reader_ref.read_to_string(&mut out).unwrap();
341        assert_eq!(out, "some stuff");
342    }
343
344    #[test]
345    fn test_pipe_no_data() {
346        let (mut reader, writer) = crate::pipe().unwrap();
347        drop(writer);
348        let mut out = String::new();
349        reader.read_to_string(&mut out).unwrap();
350        assert_eq!(out, "");
351    }
352
353    #[test]
354    fn test_pipe_a_megabyte_of_data_from_another_thread() {
355        let data = vec![0xff; 1_000_000];
356        let data_copy = data.clone();
357        let (mut reader, mut writer) = crate::pipe().unwrap();
358        let joiner = thread::spawn(move || {
359            writer.write_all(&data_copy).unwrap();
360            // This drop happens automatically, so writing it out here is mostly
361            // just for clarity. For what it's worth, it also guards against
362            // accidentally forgetting to drop if we switch to scoped threads or
363            // something like that and change this to a non-moving closure. The
364            // explicit drop forces `writer` to move.
365            drop(writer);
366        });
367        let mut out = Vec::new();
368        reader.read_to_end(&mut out).unwrap();
369        joiner.join().unwrap();
370        assert_eq!(out, data);
371    }
372
373    #[test]
374    fn test_pipes_are_not_inheritable() {
375        // Create pipes for a child process.
376        let (input_reader, mut input_writer) = crate::pipe().unwrap();
377        let (mut output_reader, output_writer) = crate::pipe().unwrap();
378
379        // Create a bunch of duplicated copies, which we'll close later. This
380        // tests that duplication preserves non-inheritability.
381        let ir_dup = input_reader.try_clone().unwrap();
382        let iw_dup = input_writer.try_clone().unwrap();
383        let or_dup = output_reader.try_clone().unwrap();
384        let ow_dup = output_writer.try_clone().unwrap();
385
386        // Spawn the child. Note that this temporary Command object takes
387        // ownership of our copies of the child's stdin and stdout, and then
388        // closes them immediately when it drops. That stops us from blocking
389        // our own read below. We use our own simple implementation of cat for
390        // compatibility with Windows.
391        let mut child = Command::new(path_to_exe("cat"))
392            .stdin(input_reader)
393            .stdout(output_writer)
394            .spawn()
395            .unwrap();
396
397        // Drop all the dups now that the child is spawned.
398        drop(ir_dup);
399        drop(iw_dup);
400        drop(or_dup);
401        drop(ow_dup);
402
403        // Write to the child's stdin. This is a small write, so it shouldn't
404        // block.
405        input_writer.write_all(b"hello").unwrap();
406        drop(input_writer);
407
408        // Read from the child's stdout. If this child has accidentally
409        // inherited the write end of its own stdin, then it will never exit,
410        // and this read will block forever. That's what this test is all
411        // about.
412        let mut output = Vec::new();
413        output_reader.read_to_end(&mut output).unwrap();
414        child.wait().unwrap();
415
416        // Confirm that we got the right bytes.
417        assert_eq!(b"hello", &*output);
418    }
419
420    #[test]
421    fn test_parent_handles() {
422        // This test invokes the `swap` test program, which uses parent_stdout() and
423        // parent_stderr() to swap the outputs for another child that it spawns.
424
425        // Create pipes for a child process.
426        let (reader, mut writer) = crate::pipe().unwrap();
427
428        // Write input. This shouldn't block because it's small. Then close the write end, or else
429        // the child will hang.
430        writer.write_all(b"quack").unwrap();
431        drop(writer);
432
433        // Use `swap` to run `cat_both`. `cat_both will read "quack" from stdin
434        // and write it to stdout and stderr with different tags. But because we
435        // run it inside `swap`, the tags in the output should be backwards.
436        let output = Command::new(path_to_exe("swap"))
437            .arg(path_to_exe("cat_both"))
438            .stdin(reader)
439            .output()
440            .unwrap();
441
442        // Check for a clean exit.
443        assert!(
444            output.status.success(),
445            "child process returned {:#?}",
446            output
447        );
448
449        // Confirm that we got the right bytes.
450        assert_eq!(b"stderr: quack", &*output.stdout);
451        assert_eq!(b"stdout: quack", &*output.stderr);
452    }
453
454    #[test]
455    fn test_parent_handles_dont_close() {
456        // Open and close each parent pipe multiple times. If this closes the
457        // original, subsequent opens should fail.
458        let stdin = crate::dup_stdin().unwrap();
459        drop(stdin);
460        let stdin = crate::dup_stdin().unwrap();
461        drop(stdin);
462
463        let stdout = crate::dup_stdout().unwrap();
464        drop(stdout);
465        let stdout = crate::dup_stdout().unwrap();
466        drop(stdout);
467
468        let stderr = crate::dup_stderr().unwrap();
469        drop(stderr);
470        let stderr = crate::dup_stderr().unwrap();
471        drop(stderr);
472    }
473
474    #[test]
475    fn test_try_clone() {
476        let (reader, writer) = crate::pipe().unwrap();
477        let mut reader_clone = reader.try_clone().unwrap();
478        let mut writer_clone = writer.try_clone().unwrap();
479        // A small write won't fill the pipe buffer, so it won't block this thread.
480        writer_clone.write_all(b"some stuff").unwrap();
481        drop(writer);
482        drop(writer_clone);
483        let mut out = String::new();
484        reader_clone.read_to_string(&mut out).unwrap();
485        assert_eq!(out, "some stuff");
486    }
487
488    #[test]
489    fn test_debug() {
490        let (reader, writer) = crate::pipe().unwrap();
491        _ = format!("{:?} {:?}", reader, writer);
492    }
493}