Partitioning Cassandra for Fun and Timeouts
Timeout errors in Apache Cassandra occur when less than Consistency Level
number of replicas return to the coordinator. It’s the distributed systems way of shrugging and saying “not my problem mate”. From the Coordinator’s perspective the request may have been lost, the replica may have failed while doing the work, or the response may have been lost. Recently we wanted to test how write timeouts were handled as part of back porting CASSANDRA-8819 for a client. To do so we created a network partition that dropped response messages from a node.
The Cluster
We used CCM to create a local cluster and iptables to add the network partition. OSX does not ship with iptables so the work was done on Ubuntu Trusty. To start with I created a 3 node cluster using CCM:
ccm create -n 3 -v 2.1.8 timeout
ccm start
We will be using node2
for all the operations (no time to explain now) so let’s check that the cluster looks normal from its perspective:
$ ccm node2 status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns (effective) Host ID Rack
UN 127.0.0.1 46.57 KB 1 66.7% 25f8e2df-5ff1-46e7-aaef-adffbfeb1879 rack1
UN 127.0.0.2 46.58 KB 1 66.7% 62377aae-5af5-41b2-b9ec-a4ecd83188a7 rack1
UN 127.0.0.3 46.58 KB 1 66.7% 42460c04-f753-4766-87dc-0cd649bd045a rack1
Looks good, time to check we can insert some data. I want to ensure node2
can talk to all other nodes so I’ll use ALL
Consistentcy Level.
Connect to the cqlsh
shell on node2
using:
$ ccm node2 cqlsh
The give it a little work to do:
create keyspace timeout WITH replication = {'class':'NetworkTopologyStrategy', 'datacenter1':3};
use timeout;
create table foo (
foo text primary key,
bar text
);
consistency all;
insert into foo (foo, bar) values ('foo', 'bar');
These commands should complete successfully, proving that an insert sent to node2
can be successfully replicated to all nodes.
The Ports
To add our network partition we will need to understand how the nodes are talking to each other. Begin by getting the pid
for node3
, this is the node we will partition from node2
. Using these two nodes makes it a little easier to understand as they are listening on 127.0.0.3
and 127.0.0.2
respectively.
CCM stores the pid
in a local file:
$ cat ~/.ccm/timeout/node3/cassandra.pid
15603
And we can get the list of open IP files using:
$ sudo lsof -i -P | grep 15603
15603 vagrant 49u IPv4 230602 0t0 TCP localhost:7300 (LISTEN)
15603 vagrant 50u IPv4 230603 0t0 TCP localhost:46648 (LISTEN)
15603 vagrant 59u IPv4 230740 0t0 TCP 127.0.0.3:7000->localhost:55864 (ESTABLISHED)
15603 vagrant 60u IPv4 230632 0t0 TCP 127.0.0.3:7000 (LISTEN)
15603 vagrant 61u IPv4 230633 0t0 TCP 127.0.0.3:7000->localhost:55856 (ESTABLISHED)
15603 vagrant 62u IPv4 230741 0t0 TCP localhost:44855->127.0.0.2:7000 (ESTABLISHED)
15603 vagrant 66u IPv6 231370 0t0 TCP 127.0.0.3:9042 (LISTEN)
15603 vagrant 67u IPv4 231372 0t0 TCP 127.0.0.3:9160 (LISTEN)
15603 vagrant 68u IPv4 230637 0t0 TCP localhost:57876->localhost:7000 (ESTABLISHED)
15603 vagrant 81u IPv4 230732 0t0 TCP 127.0.0.3:7000->localhost:55857 (ESTABLISHED)
This shows the node is listening on five ports, has three incoming connections, and has two outgoing connections to other nodes. The tricky part here is that all outgoing connections from all nodes in our cluster are coming from the localhost
. This is because the 127.0.0.2
and 127.0.0.3
interfaces are aliases for 127.0.0.1
. It makes things a little more complex, but we can handle it.
The ports we are LISTEN
‘ing on are:
localhost:7300 (LISTEN)
for JMX, this non default value is set by the CCM tool.localhost:46648 (LISTEN)
I’m not sure what this is, may be JMX related ?127.0.0.3:7000 (LISTEN)
for unsecured internode traffic, this is known as the storage port.127.0.0.3:9042 (LISTEN)
for our old friend Thrift client connections.127.0.0.3:9160 (LISTEN)
for the groovy funky CQL over Native Binary client connections.
The three ESTABLISHED
incoming connections we have are connections to port 7000 (the storage port) on 127.0.0.3
:
- 127.0.0.3:7000->localhost:55864 (ESTABLISHED)
- 127.0.0.3:7000->localhost:55856 (ESTABLISHED)
- 127.0.0.3:7000->localhost:55857 (ESTABLISHED)
With two other nodes to talk to you may expect to only have two incoming connections. In fact each node has two outgoing connections to every other node, one called the ackCon
(Acknowledgement Connection) and one called the cmdCon
(Command Connection). If a node wants to get another to perform a task such as a mutation it will send a message on the cmdCon
. When the remote node has completed it will respond on the ackCon
it established.
This code sample from OutboundTcpConnectionPool shows the process:
OutboundTcpConnection getConnection(MessageOut msg)
{
Stage stage = msg.getStage();
return stage == Stage.REQUEST_RESPONSE || stage == Stage.INTERNAL_RESPONSE || stage == Stage.GOSSIP
? ackCon
: cmdCon;
}
We can now reason about why we have the three connections given what actions the cluster has performed. From the perspective node3
, both nodes 1 and 2 connected to port 7000
to establish an ackCon
to send Gossip messages used for cluster health. Additionally node2
connected a cmdCon
as the client sent an INSERT
statement that needed to be sent to node3
.
The two ESTABLISHED
outgoing connections we have are connections to port 7000 (the storage port) on nodes 1 and 2:
- localhost:44855->127.0.0.2:7000 (ESTABLISHED)
- localhost:57876->localhost:7000 (ESTABLISHED)
The reason that we only have two connections should now be clear. node3
has only sent Gossip messages and as such has not opened a cmdCon
to any nodes. Later we will want to know which connection is the ackCon
to node2
, running on 127.0.0.2
, so let’s make a note of it now:
15603 vagrant 62u IPv4 230741 0t0 TCP localhost:44855->127.0.0.2:7000 (ESTABLISHED)
Now is a good time to test our view of the world, and the best way to do that is make a prediction. If we connect to node3
and run an INSERT
that hits all the nodes node3
will open a second connection to 127.0.0.2
to serve as the cmdCon
.
We can run the command using:
$ ccm node3 cqlsh
Connected to timeout at 127.0.0.3:9042.
[cqlsh 5.0.1 | Cassandra 2.1.8-SNAPSHOT | CQL spec 3.2.0 | Native protocol v3]
Use HELP for help.
cqlsh> use timeout;
cqlsh:timeout> consistency all;
Consistency level set to ALL.
cqlsh:timeout> insert into foo (foo, bar) values ('foo', 'bar');
And then check the connections using:
$ sudo lsof -i -P | grep 15603
15603 vagrant 49u IPv4 230602 0t0 TCP localhost:7300 (LISTEN)
15603 vagrant 50u IPv4 230603 0t0 TCP localhost:46648 (LISTEN)
15603 vagrant 59u IPv4 230740 0t0 TCP 127.0.0.3:7000->localhost:55864 (ESTABLISHED)
15603 vagrant 60u IPv4 230632 0t0 TCP 127.0.0.3:7000 (LISTEN)
15603 vagrant 61u IPv4 230633 0t0 TCP 127.0.0.3:7000->localhost:55856 (ESTABLISHED)
15603 vagrant 62u IPv4 230741 0t0 TCP localhost:44855->127.0.0.2:7000 (ESTABLISHED)
15603 vagrant 66u IPv6 231370 0t0 TCP 127.0.0.3:9042 (LISTEN)
15603 vagrant 67u IPv4 231372 0t0 TCP 127.0.0.3:9160 (LISTEN)
15603 vagrant 68u IPv4 230637 0t0 TCP localhost:57876->localhost:7000 (ESTABLISHED)
15603 vagrant 74u IPv6 232267 0t0 TCP 127.0.0.3:9042->localhost:47639 (ESTABLISHED)
15603 vagrant 75u IPv6 232270 0t0 TCP 127.0.0.3:9042->localhost:47640 (ESTABLISHED)
15603 vagrant 76u IPv4 232276 0t0 TCP localhost:57892->localhost:7000 (ESTABLISHED)
15603 vagrant 77u IPv4 232278 0t0 TCP localhost:44868->127.0.0.2:7000 (ESTABLISHED)
15603 vagrant 81u IPv4 230732 0t0 TCP 127.0.0.3:7000->localhost:55857 (ESTABLISHED)
We now have two connections to 127.0.0.2
, one from port 44855
that we had previously and one from 44868
for the cmdCon
.
The Partition
Everything is working so it’s time to break it. We will block incoming traffic that is on the ackCon
from node3
to node2
. With this in place node3
should do the work and but fail to tell node2
.
We can block the traffic using the following:
iptables -A INPUT -p tcp --source 127.0.0.1 --source-port 44855 --destination 127.0.0.2 --destination-port 7000 -j DROP
There are a few things to note here.
--source 127.0.0.1
filters traffic starting from thelocalhost
because all connections originate from thelocalhost
.--source-port 44855
because this is is the port we identified as originating theackCon
fromnode3
tonode2
.--destination-port 7000
because we are blocking traffic that arrives at the storage port.
To remove this rule later run the same command with a -D
:
iptables -D INPUT -p tcp --source 127.0.0.1 --source-port 44855 --destination 127.0.0.2 --destination-port 7000 -j DROP
After running the command we can check the configuration using:
$ sudo iptables --list -n
Chain INPUT (policy ACCEPT)
target prot opt source destination
DROP tcp -- 127.0.0.1 127.0.0.2 tcp spt:44855 dpt:7000
Chain FORWARD (policy ACCEPT)
target prot opt source destination
Chain OUTPUT (policy ACCEPT)
target prot opt source destination
Strangely enough this change does not change the view of the cluster for either node2
or node3
:
$ ccm node2 status timeout
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns (effective) Host ID Rack
UN 127.0.0.1 66.46 KB 1 100.0% 25f8e2df-5ff1-46e7-aaef-adffbfeb1879 rack1
UN 127.0.0.2 66.47 KB 1 100.0% 62377aae-5af5-41b2-b9ec-a4ecd83188a7 rack1
UN 127.0.0.3 66.46 KB 1 100.0% 42460c04-f753-4766-87dc-0cd649bd045a rack1
$ ccm node3 status timeout
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns (effective) Host ID Rack
UN 127.0.0.1 66.46 KB 1 100.0% 25f8e2df-5ff1-46e7-aaef-adffbfeb1879 rack1
UN 127.0.0.2 66.47 KB 1 100.0% 62377aae-5af5-41b2-b9ec-a4ecd83188a7 rack1
UN 127.0.0.3 66.46 KB 1 100.0% 42460c04-f753-4766-87dc-0cd649bd045a rack1
This is because the other nodes are continuing to gossip about node3
to node2
. It’s still hearing that it is up. Finally we can run the INSERT
from node2
:
$ ccm node2 cqlsh
Connected to timeout at 127.0.0.2:9042.
[cqlsh 5.0.1 | Cassandra 2.1.8-SNAPSHOT | CQL spec 3.2.0 | Native protocol v3]
Use HELP for help.
cqlsh> use timeout;
cqlsh:timeout> consistency all;
Consistency level set to ALL.
cqlsh:timeout> insert into foo (foo, bar) values ('foo', 'bar');
WriteTimeout: code=1100 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out - received only 2 responses." info={'received_responses': 2, 'required_responses': 3, 'consistency': 'ALL'}
cqlsh:timeout>
This is what we wanted to see. From the Coordinator’s point of view there were enough nodes to start the request, so it sent commands to all the available replicas. But after that? Shrug. Our theory is that node3
did the work and sent a response to node2
that was blocked. We can confirm that by enabling tracing and running it again:
cqlsh:timeout> tracing on;
Now Tracing is enabled
cqlsh:timeout> insert into foo (foo, bar) values ('foo', 'bar');
WriteTimeout: code=1100 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out - received only 2 responses." info={'received_responses': 2, 'required_responses': 3, 'consistency': 'ALL'}
Statement trace did not complete within 10 seconds
We also need to remove the IP Tables rule to ensure we can SELECT
from the tracing tables in cassandra:
$ sudo iptables -D INPUT -p tcp -s 127.0.0.1 --source-port 44855 -d 127.0.0.2 --destination-port 7000 -j DROP
$ sudo iptables --list -n
Chain INPUT (policy ACCEPT)
target prot opt source destination
Chain FORWARD (policy ACCEPT)
target prot opt source destination
Chain OUTPUT (policy ACCEPT)
target prot opt source destination
Back to cqlsh
on node2
, there should be a single trace:
cqlsh:timeout> select session_id from system_traces.sessions;
session_id
--------------------------------------
53eb91c0-702b-11e5-b0eb-1ba3f54909df
And we can see the events in the trace by:
cqlsh:timeout> select activity, source, source_elapsed from system_traces.events where session_id = 53eb91c0-702b-11e5-b0eb-1ba3f54909df order by event_id;
activity | source | source_elapsed
-----------------------------------------------------------+-----------+----------------
MUTATION message received from /127.0.0.2 | 127.0.0.1 | 54
Parsing insert into foo (foo, bar) values ('foo', 'bar'); | 127.0.0.2 | 736
Preparing statement | 127.0.0.2 | 1202
Determining replicas for mutation | 127.0.0.2 | 1372
Sending MUTATION message to /127.0.0.3 | 127.0.0.2 | 1692
MUTATION message received from /127.0.0.2 | 127.0.0.3 | 52
Sending MUTATION message to /127.0.0.1 | 127.0.0.2 | 1891
Appending to commitlog | 127.0.0.2 | 4411
Adding to foo memtable | 127.0.0.2 | 4607
REQUEST_RESPONSE message received from /127.0.0.1 | 127.0.0.2 | 6924
Processing response from /127.0.0.1 | 127.0.0.2 | 11471
Appending to commitlog | 127.0.0.1 | 1887
Adding to foo memtable | 127.0.0.1 | 2040
Enqueuing response to /127.0.0.2 | 127.0.0.1 | 2259
Sending REQUEST_RESPONSE message to /127.0.0.2 | 127.0.0.1 | 2794
Appending to commitlog | 127.0.0.3 | 12936
Adding to foo memtable | 127.0.0.3 | 13081
Enqueuing response to /127.0.0.2 | 127.0.0.3 | 15809
Sending REQUEST_RESPONSE message to /127.0.0.2 | 127.0.0.3 | 16613
Write timeout; received 2 of 3 required replies | 127.0.0.2 | 2001743
Inspecting the events from 127.0.0.3
we can see it did the INSERT
, the “Appending to commitlog” and “Adding to foo memtable” events show the local write. It then sent the acknowledgement shown by the “Enqueuing response to /127.0.0.2” and “Sending REQUEST_RESPONSE message to /127.0.0.2” events. Then our Coordinator timed out after approximately 2 seconds as the iptables rule prevented the response from arriving.
Failing At QUORUM
A more common scenario is to fail when using QUORUM
Consistency Level. To do that we simply need to take one node down and run the process again. When I went through this I had restarted the cluster so I needed to setup the rules again. Here are all the steps again, assuming you have the cluster and the schema from above:
Shutdown node1
:
ccm node1 stop
Confirm it is down:
$ ccm node2 status timeout
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns (effective) Host ID Rack
DN 127.0.0.1 66.46 KB 1 100.0% 25f8e2df-5ff1-46e7-aaef-adffbfeb1879 rack1
UN 127.0.0.2 140.67 KB 1 100.0% 62377aae-5af5-41b2-b9ec-a4ecd83188a7 rack1
UN 127.0.0.3 122.28 KB 1 100.0% 42460c04-f753-4766-87dc-0cd649bd045a rack1
Get the pid
for node3
:
$ cat ~/.ccm/timeout/node3/cassandra.pid
15603
Get the source port for the ackCon
from node3
to node2
:
$ lsof -i -P | grep 15603 | grep 127.0.0.2
java 15603 vagrant 59u IPv4 421995 0t0 TCP localhost:60472->127.0.0.2:7000 (ESTABLISHED)
Add a rule to block traffic:
$ sudo iptables -A INPUT -p tcp --source 127.0.0.1 --source-port 60472 --destination 127.0.0.2 --destination-port 7000 -j DROP
Using cqlsh
on node2
:
$ ccm node2 cqlsh
Connected to timeout at 127.0.0.2:9042.
[cqlsh 5.0.1 | Cassandra 2.1.8-SNAPSHOT | CQL spec 3.2.0 | Native protocol v3]
Use HELP for help.
cqlsh> use timeout;
cqlsh:timeout> consistency quorum;
Consistency level set to QUORUM.
cqlsh:timeout> insert into foo (foo, bar) values ('foo', 'bar');
WriteTimeout: code=1100 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out - received only 1 responses." info={'received_responses': 1, 'required_responses': 2, 'consistency': 'QUORUM'}
Caveat Emptor
Nodes can and do reestablish connections. I’ve not dived into the code to understand the how’s and the why’s and the do-you-mind-if-I-dont’s. The thing to remember is that the source port is not guaranteed to be stable.