How to Run Parallel Data Analysis in Python using Dask Dataframes
Luciano Strika
Posted on April 14, 2019
Sometimes you open a big Dataset with Python’s Pandas, try to get a few metrics, and the whole thing just freezes horribly. Dask Dataframes may solve your problem.
If you work on Big Data, you know if you’re using Pandas, you can be waiting for up to a whole minute for a simple average of a Series, and let’s not even get into calling apply. And that’s just for a couple million rows! When you get to the billions, you better start using Spark or something.
I found out about this tool a short while ago: a way to speed up Data Analysis in Python, without having to get better infrastructure or switching languages. It will eventually feel limited if your Dataset is huge, but it scales a lot better than regular Pandas, and may be just the fit for your problem — especially if you’re not doing a lot of reindexing.
What is Dask?
Dask is an Open Source project that gives you abstractions over NumPy Arrays, Pandas Dataframes and regular lists, allowing you to run operations on them in parallel, using multicore processing.
Here’s an excerpt straight from the tutorial:
Dask provides high-level Array, Bag, and DataFrame collections that mimic NumPy, lists, and Pandas but can operate in parallel on datasets that don’t fit into main memory. Dask’s high-level collections are alternatives to NumPy and Pandas for large datasets.
It’s as awesome as it sounds! I set out to try the Dask Dataframes out for this Article, and ran a couple benchmarks on them.
For another use of Dask for Machine Learning on Parallel, check out this article here.
Reading the docs
What I did first was read the official documentation, to see what exactly was recommended to do in Dask’s instead of regular Dataframes. Here are the relevant parts from the official docs:
- Manipulating large datasets, even when those datasets don’t fit in memory
- Accelerating long computations by using many cores
- Distributed computing on large datasets with standard Pandas operations like groupby, join, and time series computations
And then below that, it lists some of the things that are really fast if you use Dask Dataframes:
- Arithmetic operations (multiplying or adding to a Series)
- Common aggregations (mean, min, max, sum, etc.)
- Calling apply (as long as it’s along the index -that is, not after a groupby(‘y’) where ‘y’ is not the index-)
- Calling value_counts(), drop_duplicates() or corr()
- Filtering with loc, isin, and row-wise selection
How to use Dask Dataframes
Dask Dataframes have the same API as Pandas Dataframes, except aggregations and apply_s are evaluated lazily, and need to be computed through calling the _compute method. To generate a Dask Dataframe, you can simply call the read_csv method just as you would in Pandas. Given a Pandas Dataframe df, you can also just call
dd = ddf.from\_pandas(df, npartitions=N)
ddf is the name you imported Dask Dataframes with, and npartitions is an argument telling the Dataframe how you want to partition it.
According to StackOverflow, it is advised to partition the Dataframe in about as many partitions as cores your computer has, or a couple times that number.
Each partition will run on a different thread, and communication between them will become too costly if there are too many.
Getting dirty: Let’s benchmark!
I made a Jupyter Notebook to try out the framework, and made it available on Github in case you want to check it out or even run it for yourself.
The benchmarking tests I ran are available in the notebook at Github, but here are the main ones:
Here df3 _is a regular Pandas Dataframe with 25 million rows. I generated it using the script from the previous article (columns are _name, surname _and salary_, sampled randomly from a list). I took a 50 rows Dataset and concatenated it 500000 times, since I wasn’t too interested in the analysis per se, but only in the time it took to run it.
dfn is simply the Dask Dataframe based on df3.
First batch of results: not too optimistic
I first tried the test with 3 partitions, as I only have 4 cores and didn’t want to overwork my PC. I had pretty bad results with Dask and had to wait a lot to get them too, but I feared it may had been because I’d made too few partitions:
204.313940048 seconds for get\_big\_mean
39.7543280125 seconds for get\_big\_mean\_old
131.600986004 seconds for get\_big\_max
43.7621600628 seconds for get\_big\_max\_old
120.027213097 seconds for get\_big\_sum
7.49701309204 seconds for get\_big\_sum\_old
0.581165790558 seconds for filter\_df
226.700095892 seconds for filter\_df\_old
You can see most of the operations turned a lot slower when I used Dask. That gave me the hint that I may have had to use more partitions. The amount that generating the lazy evaluations took was negligible as well (less than half a second in some cases), so it’s not like it would have got amortized over time if I reused them.
I also tried this test with the apply method:
And had pretty similar results:
369.541605949 seconds for apply\_random
157.643756866 seconds for apply\_random\_old
So generally, most operations became twice as slow as the original, except filter. I am worried maybe I should have called compute on that one as well, so take that result with a grain of salt.
More partitions: amazing speed up
After such discouraging results, I decided maybe I was just not using enough partitions. The whole point of this is running things in parallel, after all, so maybe I just needed to parallelize more? So I tried the same tests with 8 partitions, and here’s what I got (I omitted the results from the non-parallel dataframe, since they were basically the same):
3.08352184296 seconds for get\_big\_mean
1.3314101696 seconds for get\_big\_max
1.21639800072 seconds for get\_big\_sum
0.228978157043 seconds for filter\_df
112.135010004 seconds for apply\_random
50.2007009983 seconds for value\_count\_test
That’s right! Most operations are running over ten times faster than the regular Dataframe’s, and even the apply got faster! I also ran the value_count_test, which just calls the _value_count method on the salary Series. For context, keep in mind I had to kill the process when I ran this test on a regular Dataframe after ten whole minutes of waiting. This time it only took 50 seconds!
So basically I was just using the tool wrong, and it’s pretty darn fast. A lot faster than regular Dataframes.
Final take-away
Given we just operated with 25 million rows in under a minute on a pretty old 4-core PC, I can see how this would be huge in the industry. So my advice is try this Framework out next time you have to process a Dataset locally or from a single AWS instance. It’s pretty fast.
I hope you found this article interesting or useful! It took a lot more time to write it than I anticipated, as some of the benchmarks took so long. Please tell me if you’d ever heard of Dask before reading this, and whether you’ve ever used it in your job or for a project. Also tell me if there are any other cool features I didn’t cover, or some things I did plain wrong! Your feedback and comments are the biggest reason I write, as I am also learning from this.
Follow me on Medium or Twitter for more Python tutorials, tips and tricks! If you really liked this article, please tweet it!
The post How to Run Parallel Data Analysis in Python using Dask Dataframes appeared first on Data Stuff.
Posted on April 14, 2019
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.