Parallel Processing with POSIX Shell
How named pipes enable a general-purpose forking worker model
February 12, 2023
N.B. The main example in this post has been updated to use POSIX shell instead of bash.
Last week at work I had to whip up a script to process several thousand ids for product analysis on a new feature we’re launching. The processing included making HTTP requests to a service, and I was on a deadline, so the script had to be concurrent. Here’s what I came up with:
#!/bin/bash
startline=2
setsize=1200
for i in {1..5};do
tail -n+$startline ids.tsv \
| cut -f4 \
| head -n $setsize \
| ./get-service.bash \
> output/$startline.tsv&
startline=$((startline+setsize))
done
wait
It combines tail
and head
to split the source file into five streams that are piped to the get-service script, which reads from STDIN and makes an HTTP GET request for each line of input.
It worked, but a couple of things nagged at me. The first is that open
and seek
is called on the input file five times, but each line of input is only used once. What if the input file was really large, or an infinite stream? The second is that the script is too-specific — I wrote one just like this for another urgent data-processing request last month! So I wanted to generalize the pattern of distributing an input stream among a pool of workers.
I call the program forklift:
#!/bin/sh
num_workers=1
num_fifos=0
next_fifo=1
prefix_inp="${TMPDIR-/tmp}/forklift-$$"
prefix_sig="$prefix_inp-x"
cleanup() {
for sigchan in "$prefix_sig"*; do
echo 1 >"$sigchan"
done
rm -f "$prefix_inp"*
wait
}
trap cleanup ABRT EXIT INT TERM
# tails until told to stop
tailsig() {
inp="$1"
sig="$2"
tail -f "$inp" &
childpid=$!
read line <"$sig"
kill "$childpid" >/dev/null 2>&1
}
# process args
while getopts "w:" opt; do
case "$opt" in
'w') num_workers="$OPTARG" ;;
*)
printf "Usage:\n\tforklift [-w#] command\n"
exit 1
;;
esac
done
shift $((OPTIND - 1))
if [ $# -ne 1 ]; then
printf "Must provide 1 command.\nUsage:\n\tforklift [-w#] command\n"
exit 1
fi
# create worker pool
while [ "$num_fifos" -lt "$num_workers" ]; do
num_fifos=$((num_fifos + 1))
mkfifo "$prefix_inp-$num_fifos"
mkfifo "$prefix_sig-$num_fifos"
tailsig "$prefix_inp-$num_fifos" "$prefix_sig-$num_fifos" | eval "$1" &
done
# distribute input
while read -r line; do
echo "$line" >"$prefix_inp-$next_fifo"
if [ "$next_fifo" -eq "$num_workers" ]; then
next_fifo=1
else
next_fifo=$((next_fifo + 1))
fi
done
It starts by declaring a bunch of globals and a clean up function, which is called when the program receives a signal or is exiting normally.
The tailsig
function pairs a call to tail an input stream with a read from another stream which signals when to terminate. The read
utility will block until any data is received on the signal stream. This avoids watching a file or polling a process for a termination signal.
The next stanza processes arguments; it accepts a -w
option for the number of workers to use, and a command to run. The odd-looking command shift $((OPTIND-1))
removes processed-options from the argument stack so all that’s left (should be) the command to run.
Next it creates the worker pool. Every worker in the pool gets a named pipe (fifo) through which it is distributed its share of the input. Named pipes are like regular pipes except they can be passed around by filepath. A second named pipe is created for sending a shutdown signal. The named pipes synchronize communication between the parent and worker process - similar to channels in Go.
The final loop round-robin distributes input between the workers.
Once the parent process has distributed all of its input it “exits”, triggering the clean up routine, which tells the workers to shutdown by sending a message to their signal named pipe (this will block until the message is read by the worker). When all shutdown messages have been received, the parent deletes all the named pipes.
Now, using shell as a parallel stream processor is never going to break any speed records. Pipes and fifos are generally fast because they avoid filesystem IO, but are not the fastest option for IPC:
Data has to be copied from user space in one process to a kernel buffer and then back to user space which is expensive (message queues and sockets have the same disadvantage).
— Rochkind, Advanced Unix Programming 2nd edition, chapter 7
However, assuming the command is doing something slow like making a network request (i.e. worth parallelizing), this limitation won’t matter much.