Skip to main content
2 of 6
edited tags
200_success
  • 145.7k
  • 22
  • 191
  • 481

Subprocess CSV validator and multiprocessing on Python

This Python script takes a directory of CSV files and calls a Scala script which tests whether a file's contents match a given regular expression. The link the this Scala script can be found in the 3rd line.

That script takes a single file as argument, hence my creation of this Python script to feel it an entire directory worth of files, while also utilizing the maximum possible amount of CPU power.

Do you see any issues or potential improvements in my code?

"""
Command line API to CSV validator using Scala implementation from:
http://digital-preservation.github.io/csv-validator/#toc7
"""

PATH_TO_VALIDATOR = r"C:\prog\csv\csv-validator-cmd-1.2-RC2\bin\validate.bat"
PATH_TO_CSV_FOLDER = r"C:\prog\csv\CSVFiles"
PATH_TO_CSV_SCHEMA = r"C:\prog\csv\ocr-schema.csvs"

# Set defaults
CSV_ENCODING = "windows-1252"
CSV_SCHEMA_ENCODING = "UTF-8"


def open_csv(CSV_LIST):
    import subprocess

    # To be used to display a simple progress indicator
    TOTAL_FILE_COUNT = len(CSV_LIST)
    current_file_count = 1

    with open("output.txt", 'w') as output:
        for filename in CSV_LIST:
            print("Processing file " + str(current_file_count) + "/" + str(TOTAL_FILE_COUNT))

            output.write(filename + ': ')
            validator = subprocess.Popen(
                [PATH_TO_VALIDATOR, PATH_TO_CSV_FOLDER + "/" + filename, PATH_TO_CSV_SCHEMA, "--csv-encoding",
                 CSV_ENCODING, "--csv-schema-encoding", CSV_SCHEMA_ENCODING, '--fail-fast', 'true'], stdout=subprocess.PIPE)
            result = validator.stdout.read()
            output.write(result.decode('windows-1252'))

            current_file_count += 1


# Split a list into n sublists of roughly equal size
def split_list(alist, wanted_parts=1):
    length = len(alist)
    return [alist[i * length // wanted_parts: (i + 1) * length // wanted_parts]
            for i in range(wanted_parts)]


if __name__ == '__main__':
    import argparse
    import multiprocessing
    import os

    parser = argparse.ArgumentParser(description="Command line API to Scala CSV validator")
    parser.add_argument('-pv', '--PATH_TO_VALIDATOR', help="Specify the path to csv-validator-cmd/bin/validator.bat",
                        required=True)
    parser.add_argument('-pf', '--PATH_TO_CSV_FOLDER', help="Specify the path to the folder containing the csv files "
                                                            "you want to validate", required=True)
    parser.add_argument('-ps', '--PATH_TO_CSV_SCHEMA', help="Specify the path to CSV schema you want to use to "
                                                            "validate the given files", required=True)

    parser.add_argument('-cenc', '--CSV_ENCODING', help="Optional parameter to specify the encoding used by the CSV "
                                                        "files. Choose UTF-8 or windows-1252. Default windows-1252")
    parser.add_argument('-csenc', '--CSV_SCHEMA_ENCODING', help="Optional parameter to specify the encoding used by "
                                                                "the CSV Schema. Choose UTF-8 or windows-1252. "
                                                                "Default UTF-8")

    args = vars(parser.parse_args())

    if args['CSV_ENCODING'] is not None:
        CSV_ENCODING = args['CSV_ENCODING']

    if args['CSV_SCHEMA_ENCODING'] is not None:
        CSV_SCHEMA_ENCODING = args['CSV_SCHEMA_ENCODING']

    PATH_TO_VALIDATOR = args["PATH_TO_VALIDATOR"]
    PATH_TO_CSV_SCHEMA = args["PATH_TO_CSV_SCHEMA"]
    PATH_TO_CSV_FOLDER = args["PATH_TO_CSV_FOLDER"]

    CPU_COUNT = multiprocessing.cpu_count()

    split_csv_directory = split_list(os.listdir(args["PATH_TO_CSV_FOLDER"]), wanted_parts=CPU_COUNT)

    # Spawn a Process for each CPU on the system
    for csv_list in split_csv_directory:
        p = multiprocessing.Process(target=open_csv, args=(csv_list,))
        p.start()