Disaster Recovery for Solr – a Story from the Field

We had Solr deployed in our production and one day my manager asked that we will prepare a disaster recovery (DR) plan for it. My company already had a DR data center that was deployed exactly the same nodes as our production so the main challenge was to keep the Solr on the DR data center up to date with the data on the production Solr. Oh, and one more thing – the network between the production and DR data centers was slow.

Our first thought was: lets add the Solr nodes in the DR data center to the production Solr cluster (more accurate SolrCloud) and let Solr handle the replication for us.  But we realized that this will cause bad performance when indexing to the Solr as Solr uses two phase commit replication for strong consistency: when a document is indexed – the relevant Solr shard leader verifies that all shard’s replica nodes have committed the document to their transaction log before acknowledge the request. This means that each index request takes the max commit time of all Solr nodes that participate in the SolrCloud and as there is slow network to the DR Solr then every index will be slow. Thats bad. Our application required fast indexing and so this option was removed from the table.

Next, we thought using a scheduler process running on the production Solr that copy the updates of the Solr index files to the DR Solr using a utility like Rsync. Thinking this through we understood that this will not work as Solr files might be in inconsistent state while Solr is up, as some of its state is stored in Solr application memory that might not be persisted to disk at he time of replication. So, we concluded that we need to get the changes in the production Solr from the application that uses Solr.

Finally, we came up with the following scheme:

    1. In the production site, we introduced a replicator thread that continually indexed documents that were updated from production Solr to DR Solr. It replicated a fix number of updates  (keeping the order of updated), then sleep for sometime – releasing resources, repeating this process as long as updates required.  
    2. The replicator queried DR Solr for its latest update timestamp.
    3. The replicator search for all docs in the production Solr that have timestamp older than the one in target Solr – then it reindex them in DR Solr.
    4. Special care was needed to handle documents that were deleted: this is a challenge as the above scheme can’t track which documents need to be deleted in the DR Solr as the production Solr does not contain them anymore. For this we indexed a special document (tombstone) in the production Solr for each doc that is deleted. we removed the tombstones in production Solr once we delete the associated doc in DR Solr.

Architecture of ZAB – ZooKeeper Atomic Broadcast protocol


ZooKeeper support clients reading and updating key values pairs with high availability. High availability is achieved by replicating the data to multiple nodes and let clients read from any node. Critical to the design of Zookeeper is the observation that each state change is incremental with respect to the previous state, so there is an implicit dependence on the order of the state changes. Zookeeper Atomic Broadcast (ZAB) is the protocol under the hood that drives ZooKeeper replication order guarantee. It also handles electing a leader and recovery of failing leaders and nodes. This post is about ZAB.


  • leader and followers-  in ZooKeeper cluster, one of the nodes has a  leader role and the rest have followers roles. The leader is responsible for accepting all incoming state changes from the clients and replicate them to itself and to the followers. read requests are load balanced between all followers and leader.
  • transactions –  client state changes that a leader propagates to its followers.
  • ‘e’ – the epoch of a leader. epoch is an integer that is generated by a leader when he start to lead and should be larger than epoch’s of previous leaders.
  • ‘c’ – a sequence number that is generated by the leader, starting at 0 and increasing. it is used together with an epoch to order the incoming clients state changes.
  • ‘F.history’ – follower’s history queue. used for committing incoming transactions in the order they arrived.
  • outstanding transactions – the set of transactions in the F.History that have sequence number smaller than current COMMIT sequence number.

ZAB Requirements

  1. Replication guarantees
    1. Reliable delivery – If a transaction, M, is committed by one server, it will be eventually committed by all servers.
    2. Total order – If transaction A is committed before transaction B by one server, A will be committed before B by all servers. If A and B are committed messages, either A will be committed before B or B will be committed before A.
    3. Causal order – If a transaction B is sent after a transaction A has been committed by the sender of B, A must be ordered before B. If a sender sends C after sending B, C must be ordered after B.
  2. Transactions are replicated as long as majority (quorum) of nodes are up.
  3. When nodes fail and later restarted – it should catch up the transactions that were replicated during the time it was down.

ZAB Implementation

  • clients read from any of the ZooKeeper nodes.
  • clients write state changes to any of the ZooKeeper nodes and this state changes are forward to the leader node.
  • ZooKeeper uses a variation of  two-phase-commit protocol for replicating transactions to followers. When a leader receive a change update from a client it generate a transaction with sequel number c and the leader’s epoch e (see definitions) and send the transaction to all followers. a follower adds the transaction to its history queue and send ACK to the leader. When a leader receives ACK’s from a quorum it send the the quorum COMMIT for that transaction. a follower that accept COMMIT will commit this transaction unless c is higher than any sequence number in its history queue. It will wait for receiving COMMIT’s for all its earlier transactions (outstanding transactions) before commiting.

Screen Shot 2015-11-20 at 1.56.51 PM

picture taken from reference [4]

  • Upon leader crashes, nodes execute a recovery protocol both to agree upon a common consistent state before resuming regular operation and to establish a new leader to broadcast state changes.
  • To exercise the leader role, a node must have the support of a quorum of nodes. As nodes can crash and recover, there can be over time multiple leaders and in fact the same nodes may exercise the node role multiple times.
  • node’s life cycle: each node executes one iteration of this protocol at a time, and at any time, a process may drop the current iteration and start a new one by proceeding to Phase 0.
    • Phase 0 –  prospective leader election
    • Phase 1 – discovery
    • Phase 2 – synchronization
    • Phase 3 – broadcast
  • Phases 1 and 2 are important for bringing the ensemble to a mutually consistent state, specially when recovering from crashes.
  • Phase 1 – Discovery
    In this phase, followers communicate with their prospective leader, so that the leader gathers information about the most recent transactions that its followers accepted. The purpose of this phase is to discover the most updated sequence of accepted transactions among a quorum, and to establish a new epoch so that previous leaders cannot commit new proposals. Because quorum of the followers have all changes accepted by the previous leader- then it is promised that at least one of the followers in current quorum has in its history queue all the changes accepted by previous leader which means that the new leader will have them as well. Phase 1 exact algorithm available here.
  • Phase 2 – Synchronization
    The Synchronization phase concludes the recovery part of the protocol, synchronizing the replicas in the cluster using the leader’s updated history from the discovery phase. The leader communicates with the followers, proposing transactions from its history. Followers acknowledge the proposals if their own history is behind the leader’s history. When the leader sees acknowledgements from a quorum, it issues a commit message to them. At that point, the leader is said to be established, and not anymore prospective. Phase 2 exact algorithm available here.
  • Phase 3 – Broadcast
    If no crashes occur, peers stay in this phase indefinitely, performing broadcast of transactions as soon as a ZooKeeper client issues a write request.  Phase 3 exact algorithm available here.
  • To detect failures, Zab employs periodic heartbeat messages between followers and their leaders. If a leader does not receive heartbeats from a quorum of followers within a given timeout, it abandons its leadership and shifts to state election and Phase 0. A follower also goes to Leader Election Phase if it does not receive heartbeats from its leader within a timeout.


  1. Flavio P. Junqueira, Benjamin C. Reed, and Marco Serafini, “Zab: High-performance broadcast for primary-backup systems” 
  2. Andr´e Medeiros, “ZooKeeper’s atomic broadcast protocol: Theory and practice”
  3. ZooKeeper Apache documentation
  4. Benjamin Reed,Flavio P. Junqueira, A simple totally ordered broadcast protocol”


What are we talking about here?

We want to commit a transaction in several nodes so it will be committed in all of the nodes or in none.


  1. One nodes is designed as the coordinator.
  2. Each node has persistent storage.
  3. Each node can eventually communicate with any other node.
  4. Each node will recover from a crash eventually i.e., fail-recovery model.


  1. Coordinator sends COMMIT_REQUEST to all other nodes.
  2. A node that receive COMMIT_REQUEST write it to its a transaction log, hold the required commit resources and send AGREED message to the coordinator. If node can not commit, then it will send ABORT message to the coordinator
  3. If the coordinator did not received response from a node, it will re-send the node another COMMIT_REQUEST message or after sometime it will sent ABORT message to all the nodes.


  1. if the coordinator received AGREED from all nodes, it will send a COMMIT message to all nodes.
  2. If the coordinator received ABORT from any of the nodes, it will send an ABORT message to all nodes.
  3. If the coordinator did not received response from a node, it will re-send the node another COMMIT message.
  4. A node the receive COMMIT message will commit and sent a response COMMITTED.
  5. The coordinator will complete a commit after it received COMMITTED message from all the nodes.
  6. A node that receive an ABORT message will roll back the transaction.


  1. This protocol is blocking i.e., a node will hold commit related resources until coordinator will receive COMMITTED message from all nodes.
  2. We assume that all nodes will recover after a crash – but this is not always the case and in case the coordinator will crash after nodes approved a commit and before he sent them COMMIT message – then nodes will wait for manager and hold resources forever.


  1. http://courses.cs.vt.edu/~cs5204/fall00/distributedDBMS/duckett/tpcp.html
  2. http://the-paper-trail.org/blog/consensus-protocols-two-phase-commit/