Fastsandra: Faster Python Cassandra Driver

0x26res

0x26res

Posted on January 5, 2020

Fastsandra: Faster Python Cassandra Driver

Make the most of the Datastax Cassandra driver for python by tuning it correctly.

Quick example

Throughout this tutorial we'll use a Cassandra table called time_series to store a simple time series. Here's the code to set up the table:


CREATE KEYSPACE fastsandra
  WITH REPLICATION = { 
   'class' : 'SimpleStrategy', 
   'replication_factor' : 1 
  };


CREATE TABLE fastsandra.time_series ( 
  event_date date,
  instrument_id int,
  event_timestamp timestamp,
  value double,
  PRIMARY KEY (event_date, instrument_id, event_timestamp)
);

Enter fullscreen mode Exit fullscreen mode

And here's how to query the data for given date, using Cluster.execute:

import cassandra.cluster

cluster = cassandra.cluster.Cluster() # default cluster, pointing to localhost

with cluster.connect('fastsandra') as session:    
    results = session.execute(
        "SELECT * from fastsandra.time_series where event_date = '2019-10-01'"
    )
    rows = [r for r in results]
rows[0]
# Row(event_date=Date(18170), instrument_id=1, event_timestamp=datetime.datetime(2019, 10, 1, 8, 0), value=0.2210726153252428)
rows[0].value
# 0.2210726153252428
Enter fullscreen mode Exit fullscreen mode

The driver returns a bunch of rows, each of them is a named tuple.

It's very easy to convert the results into a pandas DataFrame:

import pandas as pd
pd.DataFrame(rows)
Enter fullscreen mode Exit fullscreen mode
   event_date  instrument_id     event_timestamp     value
0  2019-10-01              1 2019-10-01 08:00:00  0.221073
1  2019-10-01              1 2019-10-01 08:15:00  0.661251
2  2019-10-01              1 2019-10-01 08:30:00  0.927390
3  2019-10-01              1 2019-10-01 08:45:00  0.083483
4  2019-10-01              1 2019-10-01 09:00:00  0.934817

Enter fullscreen mode Exit fullscreen mode

Speed up the driver with NumpyProtocolHandler

If you're dealing with a lot of data, there is a big performance and memory overhead when using named tuples and plain python objects (int, float etc). It's much better to use numpy arrays and cython for this.

The good news is that you can set up the Cassandra driver so it reads the data directly into numpy array, using cython, saving both CPU and memory.

Installing the cython version of the Cassandra driver with NumpyProtocolHandler

First you need to install the driver correctly. Here's how to do it on ubuntu, for python 3.7:

# First make sure you have your the correct library installed on your system:
sudo apt install python3.7-dev libev4 libev-dev
# Activate your your virtual environment and first install Cython and numpy
pip install Cython==0.29.14
pip install numpy==1.17.1
# Then install the cassandra-driver (this should take a few minutes)
pip install cassandra-driver
# Check that it worked:
from cassandra.protocol import NumpyProtocolHandler
# Should print:
# <class 'cassandra.protocol.cython_protocol_handler.<locals>.CythonProtocolHandler'>
# If it doesn't print anything, it didn't work
Enter fullscreen mode Exit fullscreen mode

This can be a bit tricky so here are some trouble shooting tips:

  • You must have the python development library installed
  • You must install Cython before you install the driver
  • If you installed cassandra-driver previously you may have to clear your pip cache
  • Another option is to use pip --no-cache-dir
  • If it still does not work, run the installation with -v and check the logs
pip -v --no-cache-dir install cassandra-driver
Enter fullscreen mode Exit fullscreen mode

Using NumpyProtocolHandler

from cassandra.protocol import NumpyProtocolHandler
from cassandra.query import tuple_factory

with cluster.connect('fastsandra') as session:
    session.row_factory = tuple_factory  #required for Numpy results
    session.client_protocol_handler = NumpyProtocolHandler  # for a dict of NumPy arrays as result
    results = session.execute(
        "SELECT * from fastsandra.time_series where event_date = '2019-10-01'"
    )
    np_rows = [r for r in results]
df = pd.concat([pd.DataFrame(r) for r in rows])

Enter fullscreen mode Exit fullscreen mode

With Session client_protocol_handler set to NumpyProtocolHandler the content of the result set is changed.
Instead of a sequence of named tuples, it is now a sequence of dict of numpy ndarray.
For example np_rows[0]['values'] is a masked array of double:

masked_array(data=[0.2210726153252428, 0.6612507337531311,
                   0.9273900637252853, ..., 0.1700777337201711,
                   0.6348330019120819, 0.23939705731588268],
             mask=[False, False, False, ..., False, False, False],
       fill_value=1e+20)
Enter fullscreen mode Exit fullscreen mode

And np_rows[0]['instrument_id'] is a masked array of int32:

masked_array(data=[1, 1, 1, ..., 136, 136, 136],
             mask=[False, False, False, ..., False, False, False],
       fill_value=999999,
            dtype=int32)
Enter fullscreen mode Exit fullscreen mode

This means that we save a lot of memory and CPU time by not having the intermediate objects (integer, float, named tuple).

Tuning the driver further

Numpy is much faster when the data type of the array is a native type: double, integer and timestamp.
On the other hand it is very slow when the data type is object.
Unfortunately the cassandra driver doesn't have support timestamp.
Our date and timestamp columns are objects:

np_rows[0]['event_date'].dtype
# dtype('O')
np_rows[0]['event_timestamp'].dtype
# dtype('O')
Enter fullscreen mode Exit fullscreen mode

But there is an easy way to monkey patch the driver so it supports datetime (and date).

numpy_parser defines mapping between the cassandra and numpy data types

_cqltype_to_numpy = {
    cqltypes.LongType:          np.dtype('>i8'),
    cqltypes.CounterColumnType: np.dtype('>i8'),
    cqltypes.Int32Type:         np.dtype('>i4'),
    cqltypes.ShortType:         np.dtype('>i2'),
    cqltypes.FloatType:         np.dtype('>f4'),
    cqltypes.DoubleType:        np.dtype('>f8'),
}
Enter fullscreen mode Exit fullscreen mode

All we need to do is add support for date and timestamp.

Add support for timestamps

import cassandra.cqltypes
import cassandra.numpy_parser as numpy_parser


numpy_parser._cqltype_to_numpy.update({
    cassandra.cqltypes.DateType: np.dtype('datetime64[ms]'),
    cassandra.cqltypes.TimestampType: np.dtype('datetime64[ms]'),
})
Enter fullscreen mode Exit fullscreen mode

And if we reload the data from cassandra we can see:

np_rows[0]['event_timestamp'].dtype
# dtype('>M8[ms]')
Enter fullscreen mode Exit fullscreen mode

This is very easy because the behind the scene Cassandra represent timestamp as the number of millis since epoch, the same way numpy does.

Add support for dates

For dates it requires more work. Because of the way Cassandra represents date:

# Values of the 'date'` type are encoded as 32-bit unsigned integers
# representing a number of days with epoch (January 1st, 1970) at the center of the
# range (2^31).
Enter fullscreen mode Exit fullscreen mode

When loading date fields we need to do some massaging:

  • load unsigned int (32bit)
  • withdraw the EPOCH_OFFSET (2^31)
  • Convert to datetime64[D]
numpy_parser._cqltype_to_numpy.update({
  cassandra.cqltypes.SimpleDateType: np.dtype('>u4'),
})

def result_set_to_df(results: cassandra.cluster.ResultSet) -> pd.DataFrame:
    df = pd.DataFrame(pd.concat((pd.DataFrame(r) for r in  results)))
    for name, dtype in zip(results.column_names, results.column_types):
        if dtype == cassandra.cqltypes.SimpleDateType:
            df[name] = (df[name] - cassandra.cqltypes.SimpleDateType.EPOCH_OFFSET_DAYS).astype('datetime64[D]')
    return df
Enter fullscreen mode Exit fullscreen mode

And you can run this example:

with cluster.connect('fastsandra') as session:
    session.row_factory = tuple_factory
    session.client_protocol_handler = NumpyProtocolHandler

    results = session.execute(
        "SELECT * from fastsandra.time_series where event_date = '2019-10-01'"
    )
    df = result_set_to_df(results)
Enter fullscreen mode Exit fullscreen mode

The resulting DataFrame has got the following dtypes:

event_date         datetime64[ns]
instrument_id               int32
event_timestamp    datetime64[ns]
value                     float64
Enter fullscreen mode Exit fullscreen mode

There is zero object overhead, everything goes straight from the wire format to numpy arrays.

💖 💪 🙅 🚩
0x26res
0x26res

Posted on January 5, 2020

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related