I've written the following as a code assignment (Python 2.7). The task is to aggregate the loans csv file from the accounts department by (Network, Product, Month) with the total currency amounts, and counts, without using standard Python aggregate or group functions. I would like advice and suggestions on my current solution in order to learn from it going forward as my code was reviewed already.
I've used Python dict OrderedDict instead of normal Python dictionary to keep the current order of the records. I do realise that using OrderedDict comes at a cost, how else could I keep the order of the records that will be written to the Output Summary CSV.
Input Loans CSV
Output Loans Summary CSV
"""
Created on 20 August 2017
Loans Summary
Validation
@author: Peter Wilson
"""
import re
import csv
import argparse
import collections
from datetime import datetime
def csv_to_dict(loans_csv):
"""Convert loans csv to
Python dictionary"""
loans_dict = csv.DictReader(open(loans_csv))
return loans_dict
def dict_to_agg_dict(loans_dict):
"""Aggregate loans by
(Network, Product, Month)"""
agg_dict = collections.OrderedDict()
for row in loans_dict:
network = re.sub(r"'", '', row['Network'])
product = re.sub(r"'", '', row['Product'])
# assumed month per year to be aggregated
month = re.findall(r'([A-z]+?-\d{4})', row['Date'])[0]
# assumed currency rounded to closet rounded number
currency = float(row['Amount'])
dict_key = (network, product, month)
agg_dict.setdefault(dict_key, []).append(currency)
return agg_dict
def agg_dict_to_lists(agg_dict):
"""Convert aggregated dictionary
to Python list of lists"""
summary_list = []
for key, values in agg_dict.iteritems():
values_list = list(key)
currency_sum = sum(values)
counts = len(values)
values_list.insert(len(values_list), currency_sum)
values_list.insert(len(values_list), counts)
summary_list.append(values_list)
return summary_list
def output_csv_summary(summary_list, output_csv_folder):
"""Write aggregated results
into output csv"""
current_date = datetime.today().strftime("{0}{1}{2}".format("%y", "%m", "%d"))
csv_name = "{0}_{1}.csv".format("Output", current_date)
output_csv = "{0}\\{1}".format(output_csv_folder, csv_name)
print("Writing {0} to {1}".format(csv_name, output_csv_folder))
csv_header = ["Network", "Product", "Month\Year", "Currency", "Count"]
with open(output_csv, 'wb') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(csv_header)
for row in summary_list:
try:
csvwriter.writerow(row)
except UnicodeError as e:
print(e)
def validate_loans(loans_csv, output_csv_folder):
"""Validate loans summary from
accounts with system"""
loans_dict = csv_to_dict(loans_csv)
agg_dict = dict_to_agg_dict(loans_dict)
summary_list = agg_dict_to_lists(agg_dict)
output_csv_summary(summary_list, output_csv_folder)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Validate accounting loans summary against system')
parser.add_argument('--loans_csv', metavar='path', required=True,
help='path to input loans csv')
parser.add_argument('--output_csv_folder', metavar='path', required=True,
help='path to output csv folder')
args = parser.parse_args()
validate_loans(loans_csv=args.loans_csv, output_csv_folder=args.output_csv_folder)

