Parallel Processing with Bash
How named pipes enable a general-purpose forking worker model
February 12, 2023
Last week at work I had to whip up a script to process several thousand ids for product analysis on a new feature we’re launching. The processing included making HTTP requests to a service, and I was on a deadline, so the script had to be concurrent. Here’s what I came up with:
#!/bin/bash
startline=2
setsize=1200
for i in {1..5};do
tail -n+$startline ids.tsv \
| cut -f4 \
| head -n $setsize \
| ./get-service.bash \
> output/$startline.tsv&
startline=$((startline+setsize))
done
wait
It combines tail
and head
to split the source file into five streams that are piped to the get-service script, which reads from STDIN and makes an HTTP GET request for each line of input.
It worked, but a couple of things nagged at me. The first is that open
and seek
is called on the input file five times, but each line of input is only used once. What if the input file was really large, or an infinite stream? The second is that the script is too-specific — I wrote one just like this for another urgent data-processing request last month! So I wanted to generalize the pattern of distributing an input stream among a pool of workers.
I call the program forklift:
#!/bin/bash
set -euo pipefail
declare -i num_workers=1
declare -i num_fifos=0
declare -i next_fifo=1
declare -a fifos=("") # zeroth unused
new_fifo() {
local fn="${TMPDIR-/tmp}/forklift-$1"
fifos[$1]="$fn"
num_fifos=$((num_fifos+1))
mkfifo "$fn"
eval "tail -f '$fn' --pid $$ | ${cmd/\{\{w\}\}/$1} &"
}
cleanup() {
for i in $(seq 1 ${num_fifos});do
rm -f "${fifos[$i]}"
done
}
trap cleanup EXIT
# process args
while getopts "w:" opt;do
case "$opt" in
'w') num_workers="$OPTARG";;
esac
done
shift $((OPTIND-1))
if [ $# -ne 1 ]; then
echo -e "Must provide 1 command. Usage:\n\tforklift [-w #] command"
exit 1
fi
declare cmd="$1"
# process input
while read -r;do
((next_fifo > num_fifos)) && new_fifo $next_fifo
echo "$REPLY" > "${fifos[$next_fifo]}"
if ((next_fifo==num_workers)); then
next_fifo=1
else
next_fifo=$((next_fifo+1))
fi
done
It starts by declaring a bunch of globals and two functions: new_fifo
creates a named pipe (fifo) and forks a worker to read from it. Named pipes are like regular pipes except they can be passed around by filepath, not just inherited. That enables the script to fork a worker and have tail -f
block on read until the script starts writing to the fifo. One downside of fifos is they need to be explicitly removed, and that’s where the cleanup
routine comes in. It deletes any fifos that were created and is triggered by the EXIT signal handler. The reason for this indirection is multiple trap
calls for the same signal name replace, rather than stack.
The next stanza processes arguments; it accepts a -w
option for the number of workers to use, and a command to run. The odd-looking command shift $((OPTIND-1))
removes processed-options from the argument stack so all that’s left (should be) the command to run.
The final paragraph loops over each line of input and creates a fifo worker if needed. This laziness avoids the edge case where fewer lines of input are received than workers forked, leaving zombies. The loop round-robin distributes input between the fifos.
Speed
Using bash as a forking server is never going to break any speed records. Pipes and fifos are generally fast because they avoid filesystem IO, but are not the fastest option for IPC:¹
Data has to be copied from user space in one process to a kernel buffer and then back to user space which is expensive (message queues and sockets have the same disadvantage).
However, assuming the forked command is doing something slow like making an internet request per line of input, these limitations won’t matter much.
Capturing Output
What if the forked workers produce output that you need to capture? Imagine:
cat input.csv | forklift -w 5 'foo > output.tsv'
This will fork five workers, but they will all write to output.tsv
, clobbering each others' results. My original script used the line number offset in the filename: output/$startline.tsv
. This works, but the filenames have odd numbers in them (2, 1202, 2402 …).
The fix for this is to include the template variable {{w}}
which forklift replaces with the worker id:
cat input.csv | forklift -w 5 'foo > output/{{w}}.tsv'
This will create output/1.tsv
, output/2.tsv
and so on.
Notes
- Rochkind, Advanced Unix Programming 2nd edition, chapter 7, pp 414.