7.8. Working with Large Files

Lazy sequences are useful in processing large files. In this section, we will explore the process of working with very large files. This might involve processing one large files or a number of large files. First, we illustrate processing a single large file as a lazy sequence. Then we adapt this approach to allow the processing of any number of files as a lazy stream. Finally, we look at more complicated lazy operations like reduceby and join.

7.8.1. Processing a single large file

When processing a large file, it is important to keep the entire streaming process lazy. We will do this by sticking to the following rules.

How to stream a large file

  1. Start with process with with_iter, which is a lazy sequence of lines from the file.

  2. Pipe this sequence through a series of lazy constructs like map, filter, drop, take, etc.

  3. Use side_effect with chucks and print to keep track of your progress.

  4. If we need to write the results to a file, we do the following

    1. Embed the whole pipe in a with statement.
    2. Use side_effect to print the sequence while maintaining the laziness of the process.
  5. Consume the sequence in the proper way. Make sure that you don’t keep too much information in memory. For example, comsuming the process with list could be a bad idea for a very large stream. Here are a couple of possible alternatives.

    1. Use consume to eat the whole sequence, one item at a time. This is the typical end of the pipe when writing the results to disk.
    2. Use reduce or reduceby to aggregate the data to a managable size. Many times a collection of statistics will be much smaller in size than the original file.

7.8.1.1. Example - Processing United States Census data

As an example of processing a large file, suppose that we want to compute statistics based off the 2011-2015 ACS 5-years PUMS. This file is fairly large, weighing in at just under 5GB. Let’s start use a lazy pipe to count the number of rows. First, we import all of the tools that we will need in this and the next example.

In [1]: from more_itertools import with_iter, consume, side_effect

In [2]: from csv import reader

In [3]: from toolz import compose, pipe, first

In [4]: from toolz.curried import get, take, filter, map, get, pluck, drop, reduceby, valmap, curry

In [5]: from toolz.curried import reduceby, valmap, curry

In [6]: from functools import reduce

In [7]: read_csv = compose(reader, with_iter, open)

Counting is a reduction, so we will create a helper function, a reduction, and then stream the file through the reduction.

In [7]: count = lambda cur_count, next_row: cur_count + 1

In [8]: count_rows = lambda seq: reduce(count, seq, 0)

In [9]: pipe('/Users/bn8210wy/Desktop/census/2011_2015/ss15pusa.csv',
   ...:      read_csv,
   ...:      count_rows)
   ...:
Out[9]: 4639126

This file represents a sample of the entire census and consequently, we need to use person weights (PWGTP) if we want to make a good estimate for the whole United States population. For example, suppose that we want to compute the average interest, dividends, and net rental income in the past 12 months (INTP) grouped by place of birth (POBP05). Since we want to focus on people that were born in the United States or its territories, so we must apply a filter to keep values of POBP05 between 1 and 56.

The formula for computing the average using the weights is given by

\[average = \frac{\sum_{i=1}^{N} PWGTP_i*INTP_I}{\sum_{i=1}^{N} PWGTP_i}\]

We start by inspecting the table header to determine the index of each of these columns.

In [8]: rows_to_keep = ['PWGTP', 'INTP', 'POBP05']

In [9]: keep_row = lambda tup: get(1, tup) in rows_to_keep

In [10]: header = pipe('/Users/bn8210wy/Desktop/census/2011_2015/ss15pusa.csv',
   ....:                read_csv,
   ....:                take(1),
   ....:                list,
   ....:                first,
   ....:                enumerate,
   ....:                filter(keep_row),
   ....:                list)
   ....: 

In [11]: header
Out[11]: [(7, 'PWGTP'), (33, 'INTP'), (112, 'POBP05')]

When processing a very large file, one valuable technique is preprocessing a small portion of the file to test our code. Let’s use take to read in 20 rows, keeping only those columns of interest.

In [12]: pipe('/Users/bn8210wy/Desktop/census/2011_2015/ss15pusa.csv',
   ....:       read_csv,
   ....:       take(20), # Temparary - remove after preprocessing
   ....:       pluck([7, 33, 112]), # Abstraction for map(get(indexs))
   ....:       list) # Temparary - remove after preprocessing
   ....: 
Out[12]: 
[('PWGTP', 'INTP', 'POBP05'),
 ('00012', '000000', '001'),
 ('00004', '000000', '001'),
 ('00006', '020000', '001'),
 ('00006', '002000', '013'),
 ('00007', '000000', '001'),
 ('00004', '000000', '001'),
 ('00020', '000000', '001'),
 ('00014', '000000', '001'),
 ('00031', '000000', '028'),
 ('00069', '000000', '001'),
 ('00043', '000000', '001'),
 ('00054', '000000', '001'),
 ('00110', '000000', '001'),
 ('00029', '000000', '001'),
 ('00017', '000000', '001'),
 ('00035', '000000', '001'),
 ('00020', '000000', '001'),
 ('00015', '000000', '034'),
 ('00015', '000000', '001')]

So far, we have been using the pattern map(get([7, 33, 112])) to pull out specific rows. There is a better lazy abstraction of this pattern called pluck available in toolz, which we can use to refactor that last batch of code as follows.

In [13]: pipe('/Users/bn8210wy/Desktop/census/2011_2015/ss15pusa.csv',
   ....:       read_csv,
   ....:       take(20), # Temparary - remove after preprocessing
   ....:       drop(1), # Drop the header
   ....:       map(get([7, 33, 112])),
   ....:       list) # Temparary - remove after preprocessing
   ....: 
Out[13]: 
[('00012', '000000', '001'),
 ('00004', '000000', '001'),
 ('00006', '020000', '001'),
 ('00006', '002000', '013'),
 ('00007', '000000', '001'),
 ('00004', '000000', '001'),
 ('00020', '000000', '001'),
 ('00014', '000000', '001'),
 ('00031', '000000', '028'),
 ('00069', '000000', '001'),
 ('00043', '000000', '001'),
 ('00054', '000000', '001'),
 ('00110', '000000', '001'),
 ('00029', '000000', '001'),
 ('00017', '000000', '001'),
 ('00035', '000000', '001'),
 ('00020', '000000', '001'),
 ('00015', '000000', '034'),
 ('00015', '000000', '001')]

Now we will filter out the POB05 labels for the United States and its territories (1-56).

In [14]: is_US = lambda row: int(get(-1, row)) <= 56

In [15]: pipe('/Users/bn8210wy/Desktop/census/2011_2015/ss15pusa.csv',
   ....:       read_csv,
   ....:       take(20), # Temparary - remove after preprocessing
   ....:       drop(1), # Drop the header
   ....:       pluck([7, 33, 112]),
   ....:       filter(is_US),
   ....:       list) # Temparary - remove after preprocessing
   ....: 
Out[15]: 
[('00012', '000000', '001'),
 ('00004', '000000', '001'),
 ('00006', '020000', '001'),
 ('00006', '002000', '013'),
 ('00007', '000000', '001'),
 ('00004', '000000', '001'),
 ('00020', '000000', '001'),
 ('00014', '000000', '001'),
 ('00031', '000000', '028'),
 ('00069', '000000', '001'),
 ('00043', '000000', '001'),
 ('00054', '000000', '001'),
 ('00110', '000000', '001'),
 ('00029', '000000', '001'),
 ('00017', '000000', '001'),
 ('00035', '000000', '001'),
 ('00020', '000000', '001'),
 ('00015', '000000', '034'),
 ('00015', '000000', '001')]

Before we group the data by the state identifier, let’s convert all three columns to integers.

In [16]: convert_row = compose(tuple, map(int))

In [17]: pipe('/Users/bn8210wy/Desktop/census/2011_2015/ss15pusa.csv',
   ....:       read_csv,
   ....:       take(20), # Temparary - remove after preprocessing
   ....:       drop(1), # Drop the header
   ....:       pluck([7, 33, 112]),
   ....:       filter(is_US),
   ....:       map(convert_row),
   ....:       list) # Temparary - remove after preprocessing
   ....: 
Out[17]: 
[(12, 0, 1),
 (4, 0, 1),
 (6, 20000, 1),
 (6, 2000, 13),
 (7, 0, 1),
 (4, 0, 1),
 (20, 0, 1),
 (14, 0, 1),
 (31, 0, 28),
 (69, 0, 1),
 (43, 0, 1),
 (54, 0, 1),
 (110, 0, 1),
 (29, 0, 1),
 (17, 0, 1),
 (35, 0, 1),
 (20, 0, 1),
 (15, 0, 34),
 (15, 0, 1)]

The task, compute the mean value per state, is an example of reduceby. To use this pattern, we need to write a key function that classifies each row and an update function used to reduce each group to a statistic. Because we need both the numerator and denominator, we will need to use a reduction that keeps track of both statistics simultaneously.

In [18]: convert_row = compose(tuple, map(int))

In [19]: group_by_state = get(2)

In [20]: weighted_sum_update = lambda acc, row: (acc[0] + get(0, row)*get(1, row),
   ....:                                         acc[1] + get(0, row))
   ....: 

In [21]: pipe('/users/bn8210wy/desktop/census/2011_2015/ss15pusa.csv',
   ....:       read_csv,
   ....:       take(20), # temparary - remove after preprocessing
   ....:       drop(1), # drop the header
   ....:       pluck([7, 33, 112]),
   ....:       filter(is_US),
   ....:       map(convert_row),
   ....:       reduceby(group_by_state, weighted_sum_update, init=(0,0)))
   ....: 
Out[21]: {1: (120000, 459), 13: (12000, 6), 28: (0, 31), 34: (0, 15)}

Now we need to divide the numerators and denominators for each state, which is accomplished with valmap.

In [22]: convert_row = compose(tuple, map(int))

In [23]: group_by_state = get(2)

In [24]: weighted_sum_update = lambda acc, row: (acc[0] + get(0, row)*get(1, row),
   ....:                                         acc[1] + get(0, row))
   ....: 

In [25]: divide = lambda tup: tup[0]/tup[1]

In [26]: pipe('/users/bn8210wy/desktop/census/2011_2015/ss15pusa.csv',
   ....:       read_csv,
   ....:       take(20), # temparary - remove after preprocessing
   ....:       drop(1), # drop the header
   ....:       pluck([7, 33, 112]),
   ....:       filter(is_US),
   ....:       map(convert_row),
   ....:       reduceby(group_by_state, weighted_sum_update, init=(0,0)),
   ....:       valmap(divide))
   ....: 
Out[26]: {1: 261.437908496732, 13: 2000.0, 28: 0.0, 34: 0.0}

Now that we have prototyped the process on a small number of nodes, it is time to remove the guards (take(20)) and let the process go on the whole file. We save the result of this computation (a dictionary with one pair per state) for later processing.

In [27]: convert_row = compose(tuple, map(int))

In [28]: group_by_state = get(2)

In [29]: weighted_sum_update = lambda acc, row: (acc[0] + get(0, row)*get(1, row),
   ....:                                         acc[1] + get(0, row))
   ....: 

In [30]: divide = lambda tup: tup[0]/tup[1]

In [31]: state_means = pipe('/users/bn8210wy/desktop/census/2011_2015/ss15pusa.csv',
   ....:                    read_csv,
   ....:                    drop(1), # drop the header
   ....:                    filter(is_US),
   ....:                    pluck([7, 33, 112]),
   ....:                    map(convert_row),
   ....:                    reduceby(group_by_state, weighted_sum_update, init=(0,0)),
   ....:                    valmap(divide))
   ....: 
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-31-b8c818f2c7e2> in <module>()
      6                    map(convert_row),
      7                    reduceby(group_by_state, weighted_sum_update, init=(0,0)),
----> 8                    valmap(divide))

/Users/bn8210wy/.pyenv/versions/3.5.2/envs/runestone/lib/python3.5/site-packages/toolz/functoolz.py in pipe(data, *funcs)
    550     """
    551     for func in funcs:
--> 552         data = func(data)
    553     return data
    554 

/Users/bn8210wy/.pyenv/versions/3.5.2/envs/runestone/lib/python3.5/site-packages/toolz/functoolz.py in __call__(self, *args, **kwargs)
    281     def __call__(self, *args, **kwargs):
    282         try:
--> 283             return self._partial(*args, **kwargs)
    284         except TypeError as exc:
    285             if self._should_curry(args, kwargs, exc):

/Users/bn8210wy/.pyenv/versions/3.5.2/envs/runestone/lib/python3.5/site-packages/toolz/itertoolz.py in reduceby(key, binop, seq, init)
    609         key = getter(key)
    610     d = {}
--> 611     for item in seq:
    612         k = key(item)
    613         if k not in d:

/Users/bn8210wy/.pyenv/versions/3.5.2/envs/runestone/lib/python3.5/site-packages/toolz/functoolz.py in __call__(self, *args, **kwargs)
    466         ret = self.first(*args, **kwargs)
    467         for f in self.funcs:
--> 468             ret = f(ret)
    469         return ret
    470 

ValueError: invalid literal for int() with base 10: ''

What happened above is a common occurance. I thought the code was working, but there is a case in the larger file that I didn’t see or consider: missing data. We have to make a decision on how to deal with these cases. Our options are (1) use a default value of 0 when missing data or (2) filter out these cases. Both solutions have their drawbacks. In this case, I will simply filter out all rows that are missing one of the three entries.

Since we are back to prototyping a solution, this time to fix a bug, we reintroduce take to make the testing fast.

In [32]: convert_row = compose(tuple, map(int))

In [33]: group_by_state = get(2)

In [34]: weighted_sum_update = lambda acc, row: (acc[0] + get(0, row)*get(1, row),
   ....:                                         acc[1] + get(0, row))
   ....: 

In [35]: divide = lambda tup: tup[0]/tup[1]

In [36]: is_not_empty = lambda s: s != ''

In [37]: non_empty_row = compose(all, map(is_not_empty))

In [38]: state_means = pipe('/users/bn8210wy/desktop/census/2011_2015/ss15pusa.csv',
   ....:                    read_csv,
   ....:                    drop(1), # drop the header
   ....:                    take(10000), # temparary for prototyping
   ....:                    pluck([7, 33, 112]),
   ....:                    filter(is_US),
   ....:                    filter(non_empty_row),
   ....:                    map(convert_row),
   ....:                    reduceby(group_by_state, weighted_sum_update, init=(0,0)),
   ....:                    valmap(divide))
   ....: 

In [39]: list(state_means.items())[:10]
Out[39]: 
[(1, 802.0135422729918),
 (2, 77.96610169491525),
 (4, 0.0),
 (5, 2055.7834101382487),
 (6, 1985.8930276981853),
 (8, 3.696098562628337),
 (9, 494.94163424124514),
 (10, 0.0),
 (11, 527.4725274725274),
 (12, 489.65094943460633)]

A process like this can take a long time to complete. To help track progress, we will use side_effect from the more_itertools library to print a message every 1,000,000 rows.

In [40]: from more_itertools import side_effect

In [41]: side_effect = curry(side_effect)

In [42]: print_progress = lambda x: print("Processing another 1,000,000 rows")

#
## Main processing stream
#
In [43]: state_means = pipe('/users/bn8210wy/desktop/census/2011_2015/ss15pusa.csv',
   ....:                    read_csv,
   ....:                    drop(1), # drop the header
   ....:                    take(10000), # temparary for prototyping
   ....:                    side_effect(print_progress, chunk_size=1000000),
   ....:                    pluck([7, 33, 112]),
   ....:                    filter(is_US),
   ....:                    filter(non_empty_row),
   ....:                    map(convert_row),
   ....:                    reduceby(group_by_state, weighted_sum_update, init=(0,0)),
   ....:                    valmap(divide))
   ....: 
Processing another 1,000,000 rows

Before processing the whole file, we should reexamine the process and make sure that all of the parts that are streaming the files are lazy. In this case, the top 6 lines represent lazy sequences and the first item that will accumulate data in memory will be reduceby. Luckily, we are only keeping a couple of number per state, so this shouldn’t be a problem with memory. If we were accumulating more information than memory would allow, we would need to look to another solution. Since we have confirmed that this process is safe, let’s remove the guard and process the whole file.

In [44]: state_means = pipe('/users/bn8210wy/desktop/census/2011_2015/ss15pusa.csv',
   ....:                    read_csv,
   ....:                    drop(1), # drop the header
   ....:                    side_effect(print_progress, chunk_size=1000000),
   ....:                    pluck([7, 33, 112]),
   ....:                    filter(is_US),
   ....:                    filter(non_empty_row),
   ....:                    map(convert_row),
   ....:                    reduceby(group_by_state, weighted_sum_update, init=(0,0)),
   ....:                    valmap(divide))
   ....:
Processing another 1,000,000 rows
Processing another 1,000,000 rows
Processing another 1,000,000 rows
Processing another 1,000,000 rows
Processing another 1,000,000 rows

In [45]: state_means
Out[45]:
{-9: 2046.2699365722697,
 1: 999.9947105822915,
 2: 1005.0048362739664,
 4: 726.756242052686,
 5: 1009.466231775314,
 6: 1558.4499375623134,
 8: 1552.1935989799554,
 9: 1789.1496099356484,
 10: 899.4040456732163,
 11: 2079.9369871958665,
 12: 838.0531860436864,
 13: 951.6220401800159,
 15: 1625.5767279905683,
 16: 2832.0063575622594,
 17: 3858.14536902721,
 18: 2790.0015546250092,
 19: 3448.747840879351,
 20: 2956.399048848033,
 21: 3252.290016740979,
 22: 1535.0876356557521,
 23: 1757.8102677368959,
 24: 2239.297897364191,
 25: 3124.465451212315,
 26: 2960.5169941729014,
 27: 3628.6608676898745,
 28: 1870.5233594943845,
 29: 3238.267152158706,
 30: 2588.821377840909,
 31: 3129.632818078311,
 32: 1472.4052393272962,
 33: 3214.308993515647,
 34: 3439.9406829790187,
 35: 1304.4119480317108,
 36: 4411.493735561985,
 37: 1673.9786311982366,
 38: 3999.247241640697,
 39: 3203.140221656785,
 40: 2484.5488916707764,
 41: 2206.3035717426096,
 42: 3680.1409606887096,
 44: 1798.405566658829,
 45: 1492.2188974478797,
 46: 2788.7323254961084,
 47: 1963.930496697591,
 48: 2079.411455332534,
 49: 2899.093137254902,
 50: 1741.443880688807,
 51: 2296.1152456323725,
 53: 3046.7845175626767,
 54: 2883.3576292748635,
 55: 3694.9032502831255,
 56: 1816.7817351598173}

Finally, we will print this information out to a file. To do this, we first need to convert the current dictionary into a table using the items dictionary method, which is accomplished with the helper function dict_to_table. Then we will convert each row to a comma separated string and finally use side_effect to print each row out to a file with consume at the end to “eat” the strings, keeping memory clean. To make this work, we move the pipe inside a with statement that opens and closes the output file. Note that the helper function for printing to the output file needs to be constructed inside the with statement to allso access to the file handler.

In [44]: dict_to_table = lambda d: d.items()

In [45]: comma_sep_row = lambda row: ','.join(map(str, row))

In [46]: with open('interest_by_state.csv', 'w') as outfile:
   ....:     print_to_outfile = lambda row_str: print(row_str, file=outfile)
   ....:     pipe(state_means,
   ....:          dict_to_table,
   ....:          map(comma_sep_row),
   ....:          side_effect(print_to_outfile),
   ....:          consume)
   ....: 

7.8.2. Processing a number of large files

7.8.3. Reducing and joining large files

Next Section - 7.9. Exercises