Using python to transform and filter data in bash pipes

Posted on Thu 30 April 2020 in Articles

I've long been a fan of bash pipes and the unix philosophy of composability. The text stream interface is so simple to extend and build upon that once you create a simple command line tool that works over stdin and stdout you suddenly have interoperability with a tremendous number of tools and workflows.

I'm also a fan of python generators. Ever since watching David Beazley's talks on generators about 8 years ago, I have used generators extensively in my python code as a way to keep memory usage low and actions composable, using both the explicit yield syntax as well as the more compact list comprehension syntax. Thinking about operations as a series of transforms feels natural and lends itself to fairly high re-usability, especially for data processing workloads (cf. Apache Spark's data frame transformation).

While working with some complex JSON data recently, I realized that the tools I had available for filtering and transforming that data were awkward. I wanted to stay in bash (vs an ipython shell or a standalone script just for this processing) because of all the other tools available in bash, but I wasn't very excited about parsing data with sed, awk, and xargs.

  • I've already written a tool to avoid complex sed expressions in the past, mostly to avoid all the escaping necessary with sed.
  • I have written awk programs that are 10s of lines long, now I tend to just jump over to python when I want to do more complex processing.
  • xargs is pretty awesome, but the syntax has a lot of gotchas once you start wanting to compose more complex expressions from a line of input.

Inspired in part by ammonite (scala) and xon.sh (python), I wanted to be able to use a batteries-included programming language alongside bash to get things done. What I put together started out as ~50 lines of python and has since grown a bit to add more features (esp. multi-expression python and multiprocessing for parallel computation), but it is still small enough to live as a single file gist.

The tool is called pype (for python pipe). The name is, unsurprisingly, already used by a few projects, none of which are terribly active:

  • python-pype, similar to this project (bash + python)
  • pype, a pipe-like constructor for python operations
  • PyPE, an editor
  • More on github: https://github.com/search?q=python-pype

The source code and docs are included below. I'll be using this and likely adding to it over time. If it becomes part of my workflow I'll move it from a gist to a normal github repo, and add some tests and some packaging.

Let me know what you think by commenting below or reaching out on Twitter!

Pype

A simple python utility for filtering and transforming lines in bash pipes.

I created this because jq syntax is a bit hard for me to remember and awk can be annoying for JSON and other non-trivial formats.

You just write simple inline python and either return a string (to transform) or a truthy value (to filter). The line being evaluated is available as the variable "line", and the line number is available as "nline".

Example use

# Print out the "id" field of every JSON object in the file where the "enabled" field is set to a truthy value
cat my-file-full-of-json.txt \
  | pype -a filter -i "json" "json.loads(line).get('enabled')" \
  | pype -i "json" "json.loads(line).get('id')"

# Sleep a random amount of time for each line and print out the time slept next to the line number
# Run with 3 processes
cat my-file-full-of-json.txt \
  | pype -i "time;random" "sleep_duration = random.random()*3; time.sleep(sleep_duration); ret=(nline, sleep_duration)" -P 3

Installation

curl https://gist.githubusercontent.com/turtlemonvh/4558b8bc4377b6758e289316c0141d15/raw/98eb95ecbadd067fff2cc7b18fb6ef84d9e61147/pype.py -o pype
chmod +x pype
mv pype /usr/local/bin/

Confirm setup.

$ pype -h
usage: pype [-h] [-i IMPORTS] [-a {transform,filter}] [-ns] [-P PARALLELISM]
            cmd

Tool for filtering and transforming data in bash pipes using python.

positional arguments:
  cmd                   Python code to run. Should return a string in the case
                        of 'transform' or a truthy object in the case of
                        'filter'. The current line and current line number are
                        available as the variables 'line' and 'nline',
                        respectively. Responses from multi-statement code are
                        supported via setting the variable 'ret'. Single
                        statement code does not need to explicitly set 'ret'.

optional arguments:
  -h, --help            show this help message and exit
  -i IMPORTS, --imports IMPORTS
                        Imports to add, ; separated.
  -a {transform,filter}, --action {transform,filter}
                        Type of action to take in lines. Either filter or
                        transform.
  -ns, --strip-trailing-newlines
                        Set this flag to strip trailing newlines on each line.
                        Only relevant when calling 'transform'.
  -P PARALLELISM, --parallelism PARALLELISM
                        The number of processes to spin up to process results.
                        Similar to 'xargs -P' flag. Transformed or filtered
                        lines are still returned in order.
view raw README.md hosted with ❤ by GitHub
#!/usr/bin/env python
import sys
import argparse
import multiprocessing
import itertools
## PY2 vs PY3
if (sys.version_info > (3, 0)):
# Python 3
zipper = zip
else:
# Python 2
zipper = itertools.izip
## Transform and filter
def is_multi_statement(code):
if "ret=" in code or "ret =" in code:
return True
return False
def transform_line(nline, line, code, strip_trailing_newlines):
"""
Run the transformer on a single line
"""
ret = None
if is_multi_statement(code):
exec(code)
else:
ret = eval(code)
ret = str(ret)
if not strip_trailing_newlines:
ret = ret + "\n"
return ret
def transform_line_wrapper(wrapped_args):
try:
(nline, line), code, strip_trailing_newlines = wrapped_args
return transform_line(nline, line, code, strip_trailing_newlines)
except KeyboardInterrupt:
# Wait to be killed
pass
def filter_line(nline, line, code):
ret = None
if is_multi_statement(code):
exec(code)
else:
ret = eval(code)
if ret:
sys.stdout.write(line)
def filter_line_wrapper(wrapped_args):
try:
(nline, line), code = wrapped_args
return filter_line(nline, line, code)
except KeyboardInterrupt:
# Wait to be killed
pass
def transform_lines_async(lines, code, strip_trailing_newlines, parallelism=1):
"""
Transform each line using bounded async evaluation, still printing results in order.
"""
pool = multiprocessing.Pool(parallelism)
wrapped_args = zipper(enumerate(lines), itertools.repeat(code), itertools.repeat(strip_trailing_newlines))
try:
for r in pool.imap(transform_line_wrapper, wrapped_args, 1):
sys.stdout.write(r)
except KeyboardInterrupt:
pool.terminate()
def transform_lines(lines, code, strip_trailing_newlines):
"""
Filter lines by the output of eval(code).
The contents of the line is available as "line".
"""
for nline, line in enumerate(lines):
r = transform_line(nline, line, code, strip_trailing_newlines)
sys.stdout.write(r)
def filter_lines_async(lines, code, parallelism=1):
"""
Filter each line using bounded async evaluation, still printing results in order.
"""
pool = multiprocessing.Pool(parallelism)
wrapped_args = zipper(enumerate(lines), itertools.repeat(code))
try:
for r in pool.imap(filter_line_wrapper, wrapped_args, 1):
sys.stdout.write(r)
except KeyboardInterrupt:
pool.terminate()
def filter_lines(lines, code):
"""
Filter lines by the output of eval(code).
The contents of the line is available as "line".
"""
for nline, line in enumerate(lines):
filter_line(nline, line, code)
## Argument parsing
def get_parser():
p = argparse.ArgumentParser(description="Tool for filtering and transforming data in bash pipes using python.")
p.add_argument(
"-i", "--imports",
help="Imports to add, ; separated."
)
p.add_argument(
"-a", "--action",
action="store",
default="transform",
choices=["transform", "filter"],
help="Type of action to take in lines. Either filter or transform."
)
p.add_argument(
"-ns", "--strip-trailing-newlines",
action="store_true",
dest="strip_trailing_newlines",
help="Set this flag to strip trailing newlines on each line. Only relevant when calling 'transform'."
)
p.add_argument(
"-P", "--parallelism",
action="store",
type=int,
default=None,
dest="parallelism",
help="The number of processes to spin up to process results. Similar to 'xargs -P' flag. Transformed or filtered lines are still returned in order."
)
cmd_help = """
Python code to run.
Should return a string in the case of 'transform' or a truthy object in the case of 'filter'.
The current line and current line number are available as the variables 'line' and 'nline', respectively.
Responses from multi-statement code are supported via setting the variable 'ret'. Single statement code does not need to explicitly set 'ret'.
"""
p.add_argument("cmd", help=cmd_help)
return p
if __name__ == "__main__":
options = get_parser().parse_args()
# Import
if options.imports is not None:
for lib in options.imports.split(";"):
exec("import {}".format(lib))
# Lines as a generator
lines = (line for line in sys.stdin)
# Run action per each line
if options.action == "transform":
if options.parallelism is not None:
transform_lines_async(lines, options.cmd, options.strip_trailing_newlines, parallelism=options.parallelism)
else:
transform_lines(lines, options.cmd, options.strip_trailing_newlines)
elif options.action == "filter":
if options.parallelism is not None:
filter_lines_async(lines, options.cmd, parallelism=options.parallelism)
else:
filter_lines(lines, options.cmd)
view raw pype.py hosted with ❤ by GitHub