Cassandra Time Series Data Modeling For Massive Scale
One of the big challenges people face when starting out working with Cassandra and time series data is understanding the impact of how your write workload will affect your cluster. Writing too quickly to a single partition can create hot spots that limit your ability to scale out. Partitions that get too large can lead to issues with repair, streaming, and read performance. Reading from the middle of a large partition carries a lot of overhead, and results in increased GC pressure. Cassandra 4.0 should improve the performance of large partitions, but it won’t fully solve the other issues I’ve already mentioned. For the foreseeable future, we will need to consider their performance impact and plan for them accordingly.
In this post, I’ll discuss a common Cassandra data modeling technique called bucketing. Bucketing is a strategy that lets us control how much data is stored in each partition as well as spread writes out to the entire cluster. This post will discuss two forms of bucketing. These techniques can be combined when a data model requires further scaling. Readers should already be familiar with the anatomy of a partition and basic CQL commands.
When we first learn about data modeling with Cassandra, we might see something like the following:
CREATE TABLE raw_data (
sensor text,
ts timeuuid,
readint int,
primary key(sensor, ts)
) WITH CLUSTERING ORDER BY (ts DESC)
AND compaction = {'class': 'TimeWindowCompactionStrategy',
'compaction_window_size': 1,
'compaction_window_unit': 'DAYS'};
This is a great first data model for storing some very simple sensor data. Normally the data we collect is more complex than an integer, but in this post we’re going to focus on the keys. We’re leveraging TWCS as our compaction strategy. TWCS will help us deal with the overhead of compacting large partitions, which should keep our CPU and I/O under control. Unfortunately it still has some significant limitations. If we aren’t using a TTL, as we take in more data, our partition size will grow constantly, unbounded. As mentioned above, large partitions carry significant overhead when repairing, streaming, or reading from arbitrary time slices.
To break up this big partition, we’ll leverage our first form of bucketing. We’ll break our partitions into smaller ones based on time window. The ideal size is going to keep partitions under 100MB. For example, one partition per sensor per day would be a good choice if we’re storing 50-75MB of data per day. We could just as easily use week (starting from some epoch), or month and year as long as the partitions stay under 100MB. Whatever the choice, leaving a little headroom for growth is a good idea.
To accomplish this, we’ll add another component to our partition key. Modifying our earlier data model, we’ll add a day
field:
CREATE TABLE raw_data_by_day (
sensor text,
day text,
ts timeuuid,
reading int,
primary key((sensor, day), ts)
) WITH CLUSTERING ORDER BY (ts DESC)
AND COMPACTION = {'class': 'TimeWindowCompactionStrategy',
'compaction_window_unit': 'DAYS',
'compaction_window_size': 1};
Inserting into the table requires using the date as well as the now()
value (you could also generate a TimeUUID in your application code):
INSERT INTO raw_data_by_day (sensor, day, ts, reading)
VALUES ('mysensor', '2017-01-01', now(), 10);
This is one way of limiting the amount of data per partition. For fetching large amounts of data across multiple days, you’ll need to issue one query per day. The nice part about querying like this is we can spread the work over the entire cluster rather than asking a single node to perform a lot of work. We can also issue these queries in parallel by relying on the async calls in the driver. The Python driver even has a convenient helper function for this sort of use case:
from itertools import product
from cassandra.concurrent import execute_concurrent_with_args
days = ["2017-07-01", "2017-07-12", "2017-07-03"] # collecting three days worth of data
session = Cluster(["127.0.0.1"]).connect("blog")
prepared = session.prepare("SELECT day, ts, reading FROM raw_data_by_day WHERE sensor = ? and day = ?")
args = product(["mysensor"], days)
# args: ('test', '2017-07-01'), ('test', '2017-07-12'), ('test', '2017-07-03')
# driver handles concurrency for you
results = execute_concurrent_with_args(session, prepared, args)
# Results:
#[ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d36750>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d36a90>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d36550>)]
A variation on this technique is to use a different table per time window. For instance, using a table per month means you’d have twelve tables per year:
CREATE TABLE raw_data_may_2017 (
sensor text,
ts timeuuid,
reading int,
primary key(sensor, ts)
) WITH COMPACTION = {'class': 'TimeWindowCompactionStrategy',
'compaction_window_unit': 'DAYS',
'compaction_window_size': 1};
This strategy has a primary benefit of being useful for archiving and quickly dropping old data. For instance, at the beginning of each month, we could archive last month’s data to HDFS or S3 in parquet format, taking advantage of cheap storage for analytics purposes. When we don’t need the data in Cassandra anymore, we can simply drop the table. You can probably see there’s a bit of extra maintenance around creating and removing tables, so this method is really only useful if archiving is a requirement. There are other methods to archive data as well, so this style of bucketing may be unnecessary.
The above strategies focuses on keeping partitions from getting too big over a long period of time. This is fine if we have a predictable workload and partition sizes that have very little variance. It’s possible to be ingesting so much information that we can overwhelm a single node’s ability to write data out, or the ingest rate is significantly higher for a small percentage of objects. Twitter is a great example, where certain people have tens of millions of followers but it’s not the common case. It’s common to have a separate code path for these types of accounts where we need massive scale
The second technique uses multiple partitions at any given time to fan out inserts to the entire cluster. The nice part about this strategy is we can use a single partition for low volume, and many partitions for high volume.
The tradeoff we make with this design is on reads we need to use a scatter gather, which has significantly higher overhead. This can make pagination more difficult, amongst other things. We need to be able to track how much data we’re ingesting for each gizmo we have. This is to ensure we can pick the right number of partitions to use. If we use too many buckets, we end up doing a lot of really small reads across a lot of partitions. Too few buckets, we end up with really large partitions that don’t compact, repair, stream well, and have poor read performance.
For this example, we’ll look at a theoretical model for someone who’s following a lot of users on a social network like Twitter. Most accounts would be fine to have a single partition for incoming messages, but some people / bots might follow millions of accounts.
Disclaimer: I have no knowledge of how Twitter is actually storing their data, it’s just an easy example to discuss.
CREATE TABLE tweet_stream (
account text,
day text,
bucket int,
ts timeuuid,
message text,
primary key((account, day, bucket), ts)
) WITH CLUSTERING ORDER BY (ts DESC)
AND COMPACTION = {'class': 'TimeWindowCompactionStrategy',
'compaction_window_unit': 'DAYS',
'compaction_window_size': 1};
This data model extends our previous data model by adding bucket
into the partition key. Each day can now have multiple buckets to fetch from. When it’s time to read, we need to fetch from all the partitions, and take the results we need. To demonstrate, we’ll insert some data into our partitions:
cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 0, now(), 'hi');
cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 1, now(), 'hi2');
cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 2, now(), 'hi3');
cqlsh:blog> insert into tweet_stream (account, day, bucket, ts, message) VALUES ('jon_haddad', '2017-07-01', 3, now(), 'hi4');
If we want the ten most recent messages, we can do something like this:
from itertools import chain
from cassandra.util import unix_time_from_uuid1
prepared = session.prepare("SELECT ts, message FROM tweet_stream WHERE account = ? and day = ? and bucket = ? LIMIT 10")
# let's get 10 buckets
partitions = range(10)
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
args = product(["jon_haddad"], ["2017-07-01"], partitions)
result = execute_concurrent_with_args(session, prepared, args)
# [ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1e6d0>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d710>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d4d0>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d950>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1db10>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1dfd0>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1dd90>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1d290>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1e250>),
# ExecutionResult(success=True, result_or_exc=<cassandra.cluster.ResultSet object at 0x106d1e490>)]
results = [x.result_or_exc for x in result]
# append all the results together
data = chain(*results)
sorted_results = sorted(data, key=lambda x: unix_time_from_uuid1(x.ts), reverse=True)
# newest stuff first
# [Row(ts=UUID('e1c59e60-7406-11e7-9458-897782c5d96c'), message=u'hi4'),
# Row(ts=UUID('dd6ddd00-7406-11e7-9458-897782c5d96c'), message=u'hi3'),
# Row(ts=UUID('d4422560-7406-11e7-9458-897782c5d96c'), message=u'hi2'),
# Row(ts=UUID('d17dae30-7406-11e7-9458-897782c5d96c'), message=u'hi')]
This example is only using a LIMIT of 10 items, so we can be lazy programmers, merge the lists, and then sort them. If we wanted to grab a lot more elements we’d want to use a k-way merge algorithm. We’ll come back to that in a future blog post when we expand on this topic.
At this point you should have a better understanding of how you can distribute your data and requests around the cluster, allowing it to scale much further than if a single partition were used. Keep in mind each problem is different, and there’s no one size fits all solution.