7.8. Working with Large Files¶
Lazy sequences are useful in processing large files. In this section, we will
explore the process of working with very large files. This might involve
processing one large files or a number of large files. First, we illustrate
processing a single large file as a lazy sequence. Then we adapt this approach
to allow the processing of any number of files as a lazy stream. Finally, we
look at more complicated lazy operations like reduceby
and join
.
7.8.1. Processing a single large file¶
When processing a large file, it is important to keep the entire streaming process lazy. We will do this by sticking to the following rules.
How to stream a large file
Start with process with
with_iter
, which is a lazy sequence of lines from the file.Pipe this sequence through a series of lazy constructs like
map
,filter
,drop
,take
, etc.Use
side_effect
with chucks andprint
to keep track of your progress.If we need to write the results to a file, we do the following
- Embed the whole pipe in a
with
statement. - Use
side_effect
to print the sequence while maintaining the laziness of the process.
- Embed the whole pipe in a
Consume the sequence in the proper way. Make sure that you don’t keep too much information in memory. For example, comsuming the process with
list
could be a bad idea for a very large stream. Here are a couple of possible alternatives.- Use
consume
to eat the whole sequence, one item at a time. This is the typical end of the pipe when writing the results to disk. - Use
reduce
orreduceby
to aggregate the data to a managable size. Many times a collection of statistics will be much smaller in size than the original file.
- Use
7.8.1.1. Example - Processing United States Census data¶
As an example of processing a large file, suppose that we want to compute statistics based off the 2011-2015 ACS 5-years PUMS. This file is fairly large, weighing in at just under 5GB. Let’s start use a lazy pipe to count the number of rows. First, we import all of the tools that we will need in this and the next example.
In [1]: from more_itertools import with_iter, consume, side_effect
In [2]: from csv import reader
In [3]: from toolz import compose, pipe, first
In [4]: from toolz.curried import get, take, filter, map, get, pluck, drop, reduceby, valmap, curry
In [5]: from toolz.curried import reduceby, valmap, curry
In [6]: from functools import reduce
In [7]: read_csv = compose(reader, with_iter, open)
Counting is a reduction, so we will create a helper function, a reduction, and then stream the file through the reduction.
In [7]: count = lambda cur_count, next_row: cur_count + 1
In [8]: count_rows = lambda seq: reduce(count, seq, 0)
In [9]: pipe('/Users/bn8210wy/Desktop/census/2011_2015/ss15pusa.csv',
...: read_csv,
...: count_rows)
...:
Out[9]: 4639126
This file represents a sample of the entire census and consequently, we need to use person weights (PWGTP) if we want to make a good estimate for the whole United States population. For example, suppose that we want to compute the average interest, dividends, and net rental income in the past 12 months (INTP) grouped by place of birth (POBP05). Since we want to focus on people that were born in the United States or its territories, so we must apply a filter to keep values of POBP05 between 1 and 56.
The formula for computing the average using the weights is given by
We start by inspecting the table header to determine the index of each of these columns.
In [8]: rows_to_keep = ['PWGTP', 'INTP', 'POBP05']
In [9]: keep_row = lambda tup: get(1, tup) in rows_to_keep
In [10]: header = pipe('/Users/bn8210wy/Desktop/census/2011_2015/ss15pusa.csv',
....: read_csv,
....: take(1),
....: list,
....: first,
....: enumerate,
....: filter(keep_row),
....: list)
....:
In [11]: header
Out[11]: [(7, 'PWGTP'), (33, 'INTP'), (112, 'POBP05')]
When processing a very large file, one valuable technique is preprocessing a
small portion of the file to test our code. Let’s use take
to read in 20
rows, keeping only those columns of interest.
In [12]: pipe('/Users/bn8210wy/Desktop/census/2011_2015/ss15pusa.csv',
....: read_csv,
....: take(20), # Temparary - remove after preprocessing
....: pluck([7, 33, 112]), # Abstraction for map(get(indexs))
....: list) # Temparary - remove after preprocessing
....:
Out[12]:
[('PWGTP', 'INTP', 'POBP05'),
('00012', '000000', '001'),
('00004', '000000', '001'),
('00006', '020000', '001'),
('00006', '002000', '013'),
('00007', '000000', '001'),
('00004', '000000', '001'),
('00020', '000000', '001'),
('00014', '000000', '001'),
('00031', '000000', '028'),
('00069', '000000', '001'),
('00043', '000000', '001'),
('00054', '000000', '001'),
('00110', '000000', '001'),
('00029', '000000', '001'),
('00017', '000000', '001'),
('00035', '000000', '001'),
('00020', '000000', '001'),
('00015', '000000', '034'),
('00015', '000000', '001')]
So far, we have been using the pattern map(get([7, 33, 112]))
to pull out
specific rows. There is a better lazy abstraction of this pattern called
pluck
available in toolz
, which we can use to refactor that last batch
of code as follows.
In [13]: pipe('/Users/bn8210wy/Desktop/census/2011_2015/ss15pusa.csv',
....: read_csv,
....: take(20), # Temparary - remove after preprocessing
....: drop(1), # Drop the header
....: map(get([7, 33, 112])),
....: list) # Temparary - remove after preprocessing
....:
Out[13]:
[('00012', '000000', '001'),
('00004', '000000', '001'),
('00006', '020000', '001'),
('00006', '002000', '013'),
('00007', '000000', '001'),
('00004', '000000', '001'),
('00020', '000000', '001'),
('00014', '000000', '001'),
('00031', '000000', '028'),
('00069', '000000', '001'),
('00043', '000000', '001'),
('00054', '000000', '001'),
('00110', '000000', '001'),
('00029', '000000', '001'),
('00017', '000000', '001'),
('00035', '000000', '001'),
('00020', '000000', '001'),
('00015', '000000', '034'),
('00015', '000000', '001')]
Now we will filter out the POB05 labels for the United States and its territories (1-56).
In [14]: is_US = lambda row: int(get(-1, row)) <= 56
In [15]: pipe('/Users/bn8210wy/Desktop/census/2011_2015/ss15pusa.csv',
....: read_csv,
....: take(20), # Temparary - remove after preprocessing
....: drop(1), # Drop the header
....: pluck([7, 33, 112]),
....: filter(is_US),
....: list) # Temparary - remove after preprocessing
....:
Out[15]:
[('00012', '000000', '001'),
('00004', '000000', '001'),
('00006', '020000', '001'),
('00006', '002000', '013'),
('00007', '000000', '001'),
('00004', '000000', '001'),
('00020', '000000', '001'),
('00014', '000000', '001'),
('00031', '000000', '028'),
('00069', '000000', '001'),
('00043', '000000', '001'),
('00054', '000000', '001'),
('00110', '000000', '001'),
('00029', '000000', '001'),
('00017', '000000', '001'),
('00035', '000000', '001'),
('00020', '000000', '001'),
('00015', '000000', '034'),
('00015', '000000', '001')]
Before we group the data by the state identifier, let’s convert all three columns to integers.
In [16]: convert_row = compose(tuple, map(int))
In [17]: pipe('/Users/bn8210wy/Desktop/census/2011_2015/ss15pusa.csv',
....: read_csv,
....: take(20), # Temparary - remove after preprocessing
....: drop(1), # Drop the header
....: pluck([7, 33, 112]),
....: filter(is_US),
....: map(convert_row),
....: list) # Temparary - remove after preprocessing
....:
Out[17]:
[(12, 0, 1),
(4, 0, 1),
(6, 20000, 1),
(6, 2000, 13),
(7, 0, 1),
(4, 0, 1),
(20, 0, 1),
(14, 0, 1),
(31, 0, 28),
(69, 0, 1),
(43, 0, 1),
(54, 0, 1),
(110, 0, 1),
(29, 0, 1),
(17, 0, 1),
(35, 0, 1),
(20, 0, 1),
(15, 0, 34),
(15, 0, 1)]
The task, compute the mean value per state, is an example of reduceby
. To
use this pattern, we need to write a key function that classifies each row and
an update function used to reduce each group to a statistic. Because we need
both the numerator and denominator, we will need to use a reduction that keeps
track of both statistics simultaneously.
In [18]: convert_row = compose(tuple, map(int))
In [19]: group_by_state = get(2)
In [20]: weighted_sum_update = lambda acc, row: (acc[0] + get(0, row)*get(1, row),
....: acc[1] + get(0, row))
....:
In [21]: pipe('/users/bn8210wy/desktop/census/2011_2015/ss15pusa.csv',
....: read_csv,
....: take(20), # temparary - remove after preprocessing
....: drop(1), # drop the header
....: pluck([7, 33, 112]),
....: filter(is_US),
....: map(convert_row),
....: reduceby(group_by_state, weighted_sum_update, init=(0,0)))
....:
Out[21]: {1: (120000, 459), 13: (12000, 6), 28: (0, 31), 34: (0, 15)}
Now we need to divide the numerators and denominators for each state, which is
accomplished with valmap
.
In [22]: convert_row = compose(tuple, map(int))
In [23]: group_by_state = get(2)
In [24]: weighted_sum_update = lambda acc, row: (acc[0] + get(0, row)*get(1, row),
....: acc[1] + get(0, row))
....:
In [25]: divide = lambda tup: tup[0]/tup[1]
In [26]: pipe('/users/bn8210wy/desktop/census/2011_2015/ss15pusa.csv',
....: read_csv,
....: take(20), # temparary - remove after preprocessing
....: drop(1), # drop the header
....: pluck([7, 33, 112]),
....: filter(is_US),
....: map(convert_row),
....: reduceby(group_by_state, weighted_sum_update, init=(0,0)),
....: valmap(divide))
....:
Out[26]: {1: 261.437908496732, 13: 2000.0, 28: 0.0, 34: 0.0}
Now that we have prototyped the process on a small number of nodes, it is time
to remove the guards (take(20)
) and let the process go on the whole file.
We save the result of this computation (a dictionary with one pair per state)
for later processing.
In [27]: convert_row = compose(tuple, map(int))
In [28]: group_by_state = get(2)
In [29]: weighted_sum_update = lambda acc, row: (acc[0] + get(0, row)*get(1, row),
....: acc[1] + get(0, row))
....:
In [30]: divide = lambda tup: tup[0]/tup[1]
In [31]: state_means = pipe('/users/bn8210wy/desktop/census/2011_2015/ss15pusa.csv',
....: read_csv,
....: drop(1), # drop the header
....: filter(is_US),
....: pluck([7, 33, 112]),
....: map(convert_row),
....: reduceby(group_by_state, weighted_sum_update, init=(0,0)),
....: valmap(divide))
....:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-31-b8c818f2c7e2> in <module>()
6 map(convert_row),
7 reduceby(group_by_state, weighted_sum_update, init=(0,0)),
----> 8 valmap(divide))
/Users/bn8210wy/.pyenv/versions/3.5.2/envs/runestone/lib/python3.5/site-packages/toolz/functoolz.py in pipe(data, *funcs)
550 """
551 for func in funcs:
--> 552 data = func(data)
553 return data
554
/Users/bn8210wy/.pyenv/versions/3.5.2/envs/runestone/lib/python3.5/site-packages/toolz/functoolz.py in __call__(self, *args, **kwargs)
281 def __call__(self, *args, **kwargs):
282 try:
--> 283 return self._partial(*args, **kwargs)
284 except TypeError as exc:
285 if self._should_curry(args, kwargs, exc):
/Users/bn8210wy/.pyenv/versions/3.5.2/envs/runestone/lib/python3.5/site-packages/toolz/itertoolz.py in reduceby(key, binop, seq, init)
609 key = getter(key)
610 d = {}
--> 611 for item in seq:
612 k = key(item)
613 if k not in d:
/Users/bn8210wy/.pyenv/versions/3.5.2/envs/runestone/lib/python3.5/site-packages/toolz/functoolz.py in __call__(self, *args, **kwargs)
466 ret = self.first(*args, **kwargs)
467 for f in self.funcs:
--> 468 ret = f(ret)
469 return ret
470
ValueError: invalid literal for int() with base 10: ''
What happened above is a common occurance. I thought the code was working, but there is a case in the larger file that I didn’t see or consider: missing data. We have to make a decision on how to deal with these cases. Our options are (1) use a default value of 0 when missing data or (2) filter out these cases. Both solutions have their drawbacks. In this case, I will simply filter out all rows that are missing one of the three entries.
Since we are back to prototyping a solution, this time to fix a bug, we
reintroduce take
to make the testing fast.
In [32]: convert_row = compose(tuple, map(int))
In [33]: group_by_state = get(2)
In [34]: weighted_sum_update = lambda acc, row: (acc[0] + get(0, row)*get(1, row),
....: acc[1] + get(0, row))
....:
In [35]: divide = lambda tup: tup[0]/tup[1]
In [36]: is_not_empty = lambda s: s != ''
In [37]: non_empty_row = compose(all, map(is_not_empty))
In [38]: state_means = pipe('/users/bn8210wy/desktop/census/2011_2015/ss15pusa.csv',
....: read_csv,
....: drop(1), # drop the header
....: take(10000), # temparary for prototyping
....: pluck([7, 33, 112]),
....: filter(is_US),
....: filter(non_empty_row),
....: map(convert_row),
....: reduceby(group_by_state, weighted_sum_update, init=(0,0)),
....: valmap(divide))
....:
In [39]: list(state_means.items())[:10]
Out[39]:
[(1, 802.0135422729918),
(2, 77.96610169491525),
(4, 0.0),
(5, 2055.7834101382487),
(6, 1985.8930276981853),
(8, 3.696098562628337),
(9, 494.94163424124514),
(10, 0.0),
(11, 527.4725274725274),
(12, 489.65094943460633)]
A process like this can take a long time to complete. To help track progress,
we will use side_effect
from the more_itertools
library to print a
message every 1,000,000 rows.
In [40]: from more_itertools import side_effect
In [41]: side_effect = curry(side_effect)
In [42]: print_progress = lambda x: print("Processing another 1,000,000 rows")
#
## Main processing stream
#
In [43]: state_means = pipe('/users/bn8210wy/desktop/census/2011_2015/ss15pusa.csv',
....: read_csv,
....: drop(1), # drop the header
....: take(10000), # temparary for prototyping
....: side_effect(print_progress, chunk_size=1000000),
....: pluck([7, 33, 112]),
....: filter(is_US),
....: filter(non_empty_row),
....: map(convert_row),
....: reduceby(group_by_state, weighted_sum_update, init=(0,0)),
....: valmap(divide))
....:
Processing another 1,000,000 rows
Before processing the whole file, we should reexamine the process and make sure
that all of the parts that are streaming the files are lazy. In this case, the
top 6 lines represent lazy sequences and the first item that will accumulate
data in memory will be reduceby
. Luckily, we are only keeping a couple of
number per state, so this shouldn’t be a problem with memory. If we were
accumulating more information than memory would allow, we would need to look to
another solution. Since we have confirmed that this process is safe, let’s
remove the guard and process the whole file.
In [44]: state_means = pipe('/users/bn8210wy/desktop/census/2011_2015/ss15pusa.csv',
....: read_csv,
....: drop(1), # drop the header
....: side_effect(print_progress, chunk_size=1000000),
....: pluck([7, 33, 112]),
....: filter(is_US),
....: filter(non_empty_row),
....: map(convert_row),
....: reduceby(group_by_state, weighted_sum_update, init=(0,0)),
....: valmap(divide))
....:
Processing another 1,000,000 rows
Processing another 1,000,000 rows
Processing another 1,000,000 rows
Processing another 1,000,000 rows
Processing another 1,000,000 rows
In [45]: state_means
Out[45]:
{-9: 2046.2699365722697,
1: 999.9947105822915,
2: 1005.0048362739664,
4: 726.756242052686,
5: 1009.466231775314,
6: 1558.4499375623134,
8: 1552.1935989799554,
9: 1789.1496099356484,
10: 899.4040456732163,
11: 2079.9369871958665,
12: 838.0531860436864,
13: 951.6220401800159,
15: 1625.5767279905683,
16: 2832.0063575622594,
17: 3858.14536902721,
18: 2790.0015546250092,
19: 3448.747840879351,
20: 2956.399048848033,
21: 3252.290016740979,
22: 1535.0876356557521,
23: 1757.8102677368959,
24: 2239.297897364191,
25: 3124.465451212315,
26: 2960.5169941729014,
27: 3628.6608676898745,
28: 1870.5233594943845,
29: 3238.267152158706,
30: 2588.821377840909,
31: 3129.632818078311,
32: 1472.4052393272962,
33: 3214.308993515647,
34: 3439.9406829790187,
35: 1304.4119480317108,
36: 4411.493735561985,
37: 1673.9786311982366,
38: 3999.247241640697,
39: 3203.140221656785,
40: 2484.5488916707764,
41: 2206.3035717426096,
42: 3680.1409606887096,
44: 1798.405566658829,
45: 1492.2188974478797,
46: 2788.7323254961084,
47: 1963.930496697591,
48: 2079.411455332534,
49: 2899.093137254902,
50: 1741.443880688807,
51: 2296.1152456323725,
53: 3046.7845175626767,
54: 2883.3576292748635,
55: 3694.9032502831255,
56: 1816.7817351598173}
Finally, we will print this information out to a file. To do this, we first
need to convert the current dictionary into a table using the items
dictionary method, which is accomplished with the helper function
dict_to_table
. Then we will convert each row to a comma separated string
and finally use side_effect
to print each row out to a file with consume
at the end to “eat” the strings, keeping memory clean. To make this work, we
move the pipe
inside a with
statement that opens and closes the output
file. Note that the helper function for printing to the output file needs to be
constructed inside the with
statement to allso access to the file handler.
In [44]: dict_to_table = lambda d: d.items()
In [45]: comma_sep_row = lambda row: ','.join(map(str, row))
In [46]: with open('interest_by_state.csv', 'w') as outfile:
....: print_to_outfile = lambda row_str: print(row_str, file=outfile)
....: pipe(state_means,
....: dict_to_table,
....: map(comma_sep_row),
....: side_effect(print_to_outfile),
....: consume)
....: