# Kafka -Distributed Message Queue

Kafka is a scalable distributed message queue. This means, it allows producers to push high volume of messages to a queue and consumers to pull messages from the queue.  In this post I will describe Kafka’s key abstractions and architecture principles.

A Kafka cluster is composed from nodes which are called brokers. Producer client connects to a broker and push a message. Internally, the broker create a sequence number per message,  append the message to a log file located in the node’s local hard disk. For effcient fetch of a message – the brokers manage an offset table with a sequence number as the key and value of the seek location of the message in the log file and its size. In Kafka, the consumer is responsible to track which messages he already consumed and not the server. Other message queues approach this tracking differently – their server track this data for each of their consumers. After a message is consumed by a client it is not removed by the server so other consumers can consume it later including the consumer that just consumed it. This can be helpful in case data need to processed again (replay) by the consumer after some fault in the first processing. The messages will be kept for some period  – this is also handled differently in other message queue systems: usually they delete the message from the queue once its consumer consumed the message. To emphasis – the queue is actually implemented by continuous log file on node’s local disk.

You might wonder at this point how Kafka manages efficient reads and writes to local disk. Kafka interaction with the local disk has two characteristic: 1) it appends new writes to an open file(i.e., the message-log-file) and 2) it usually reads continues area from disk.  This characteristic lets Kafka rely on OS disk cache for optimizing its reads from and writes to disk.  The OS cache is highly efficient, always enabled and it’s optimized for appends calls and for multi reads from a continues area on disk. More advantages of using OS level cache is that the Kafka’s processes uses little JVM’s heap space so avoiding all GC related issues, and in case the broker restart its does not need to run any cache-warm-up as the OS cache is not cleared when process terminates. Another optimization strategy used by Kafka is to reduce unnecessary copy of data – for example copy of data from network buffers from the OS into the application buffers. This is achieved by using Linux’s sendfile API that read data directly from disk (hopefully from OS disk cache) and routing it directly to a network socket without passing through the application and so avoid the unnecessary copy of buffers.

Now, consider the use case where several producers and consumers are interested in different type of messages. In order to support this use case – Kafka is using the abstraction of ‘topic‘. Producer is sending messages to a topic and consumers are reading messages from a topic.

In order to support large volume of messages and parallel consumption – Kafka introduced the concept of topic’s partitioning which means that when producing to a topic – the producer also specify the method for partitioning the messages e.g., via round robin or by specifying a partition function and message’s field that together define a partition per message. for example if there are  5 partitions with a single topic on a Kafka cluster with 5 brokers – then each broker will manage  (lead) one of the partitions. Order between messages is kept between messages in the same partition only. High availability is achieved by replicating partition to other brokers so in case of a failure in one node – another node starts to manage the partitions. Usually, there are more topics and partitions than the number of cluster nodes so a typical broker will have several managed partitions and several replication for other partitions which are managed in different brokers. Replication to brokers achieved by a broker act like a regular client that consumes a specific partition. All producers of a partition write to the same broker that manage this partition and the same goes for consumers of a partition- they all read from the same broker.

Kafka introduced the concept of consumer-group which allow consumers to share the consumption of a topic so each message is return to only one consumer. a topic can be consumed by multiple consumer groups. in case a set of consumers would like to consume the same messages of a topic – they all need to belong to different consumer-group.

References

# Sketchnote for One line of working code is worth 500 of specification

This is a sketchnote for ‘One line of working code is worth 500 of specification’ essay by Allison Randal.

Appeared in ‘97 things software every architecture should know‘.

# Disaster Recovery for Solr – a Story from the Field

We had Solr deployed in our production and one day my manager asked that we will prepare a disaster recovery (DR) plan for it. My company already had a DR data center that was deployed exactly the same nodes as our production so the main challenge was to keep the Solr on the DR data center up to date with the data on the production Solr. Oh, and one more thing – the network between the production and DR data centers was slow.

Our first thought was: lets add the Solr nodes in the DR data center to the production Solr (more accurate SolrCloud) and let Solr handle the replication for us.  But we realized that this will cause bad performance when indexing to the Solr as Solr uses two phase commit replication for strong consistent. When a document is indexed the relevant Solr shard leader verifies that all shard’s replica nodes have committed the document to their transaction log before acknowledge the request. This means that each index request takes the max commit time of all Solr nodes participate in the SolrCloud and as there is slow network to the DR Solr then every index will be slow. Our application required fast indexing and so this option was removed from the table.

Next, we thought using daily batch process on the production Solr that copy the updates of the Solr index files to the DR Solr using a utility like Rsync. Thinking this through we understood that this will not work as Solr files might be in inconsistent state while Solr is up, as some of its state is stored in Solr application memory that might not be persisted to disk yet. So, we concluded that we need to get the changes in the production Solr from the Solr application itself.

Finally, we came up with the following scheme:

1. In the production site, we introduced a replicator application that continually indexed documents that were updated from production Solr to DR Solr. It replicated a fix number of updates  (keeping the order of updated), then sleep for sometime – releasing resources, repeating this process as long as updates required.
2. The replicator queried DR Solr for its latest update timestamp.
3. The replicator search for all docs in the production Solr that have timestamp older than the one in target Solr – then it reindex them in DR Solr.
4. Special care was needed to handle documents that were deleted: this is a challenge as the above scheme can’t track which documents need to be deleted in the DR Solr as the production Solr does not contain them anymore. For this we indexed a special document (tombstone) in the production Solr for each doc that is deleted. we removed the tombstones in production Solr once we delete the associated doc in DR Solr.

# Privilege Seaparation for Linux Security Enhancement

## Problem Description

I was working on a monitoring product that was using agents that were installed on customer hosts and reported metrics to backend servers.  Customers preferred to run agents as unprivileged processes i.e., not as root user to prevent the agent from harming their host in case of an agent’s bug or malicious code. Our agent supported also running third parties plugins for monitoring specific applications and those plugins were that not reviewed by our company and so increase the risk of harm by the agent. But our agent required to be run as root in order to execute all its functionality e.g., checking server’s reachability by sending ICMP raw packets.

I wanted the agent to run most of its actions in an unprivileged mode but allow it to run a limited list of predefined actions in privileged mode. Linux security model does not support changing the process security level after it is started. Generally speaking, the process privileges are determined by the set of privileged owned by the user that launch the process (Linux also support file capability configuration). Here is a summary of the requirements:

1. allow the agent to execute unprivileged code (obvious).
2. allow the agent to execute privileged actions from a set of limited actions.
3. block the agent from executing privileged actions not in the set of limited actions.
4. Third party plugins code should not be changed to support the new requirements – i.e., the solution need to support legacy plugins especially custom plugins.

## Solution Sketch

My solution was inspired by an online security course I took, we will implement privilege separation principle i.e., separate a set of privileged actions from the rest of the system, limit the access to them and block any other privileged action.

The agent will run in an unprivileged process and will send request for privileged actions to a second process that will run with privilege user.

The privileged action request will be defined by a strict API in the privileged process.

The privileged process will be accessible only by the agent process via a shared secret and privileged action requests will be logged for auditing.

This solution will enable file system access granularity compared to OS access control: the agent will be able to read any file on the file system and can write to a sandboxed part of the file system.

This approach is straight forward for actions that are stateless like InetAddress.isReachable action but is more challenging for actions that has state like reading a file. The second process will need to track the state and handle life cycle aspects like cleaning after actions that are finish.

## Solution Details

1. agent will have 2 processes:
1. process #1 – agent core process i.e., a process that will run much like the current agent. This process will run as unprivileged process.
2. process #2 – helper process that will be privileged.  and will execute request from the agent core process and reply with results
2. process #1 will instrument plugins code and replace implementation of privileged a set of privileged action e.g., Java/Sigar classes with our own new implementation, for example InetAddress class that sends ICMP will be replaced with MyInetAddress class. This can be achieved with the JVM’s javaagent hook for instrumentation entry point and javassit library for instrumenting replacement of classes, it might also be done via AspectJ. Our implementation will simply forward a request to process #2, process #2 will actually run the action and return the response to process #1. plugin in process#1 is not aware of the new implementation. In case the plugin running in process #1, will execute a privileged action that was not instrumented i.e., not in the set of allowed privileged actions – it will be blocked by the OS as process#1 is unprivileged.
3. process #2 will have a single entry point e.g., a TCP port that will be accessible only by process #1 to prevent other unprivileged processes execute privileged actions. This can be done by sharing a secret between process#1 and process#2. process #1 will authenticate when opening a connection to process #2.

# Huffman coding – the optimal prefix code

## Short story

Recently, I remembered that when I was a student, I read about Huffman coding which is a clever compressing algorithm and ever since wanted to implement it but did not found a chance. Last week – I finally did it. To test my implementation – I took a 160 KB file containing the text of ‘Alice in Wonderland’ book and compressed it using my Huffman implementation down to 94 KB binary file. Hurrah!

## What is prefix code?

There are many ways to map a set of characters to a set of bits and so represent text in bits. For example ANSI is a map that takes the latin alphabet and map each character to a byte. When you get a file in ANSI encoding you know that you need to scan the file reading sets of 8 bits (one byte) and translate the byte to a character. Giving the same number of bits to represent each character simplifies decoding as you only need to read fix number of bits and translate each to relevant character but is not optimal from size perspective as often used characters get the same representation as rarely used characters.

Some encoding techniques maps characters to bits so that different characters are mapped to list of bits with different size.When more often used characters are mapped to a smaller list of bits than you expect the encode file to take less space than one encode in ANSI (more on how to measure space efficiency later in the post). The challenge with this approach is to know when you are decoding a list of bits when one character representations ends and another begins. One solution for this is to use some list of bits to represent a marker between list of bits representing different characters, and another solution is to use binary trees like Huffman tree (more on this later in the post.)

prefix code is a type of code that let you decode the encoded text without special marker. for example, the map {a=0, b=10, c=11} is a prefix code as no marker is needed for decryption of any string. If you have ‘000101011’ then it is clear when one list of bits for character ends and another begins you translate it directly to ‘aaabbc’. a counter example, the map {a=0, b=1, c=11} is not prefix code as the string ‘111’ can be translated to ‘bbb’ or ‘bc’ i.e., we need some separator marker between list of bits in this code.

## How to measure coding efficiency?

If the coding is represented by a map from a character $c$ to a list of bits then we will define $length(c)$ be the number of bits representing the character $c$ and define $freq(c)$ to be the frequency that character $c$ as appear in the text.

The code efficiency is calculated by $\sum_{c\in Alphabet} length(c)\times freq(c)$ i.e., the weight average of encoding lengths according to their frequencies.

Every prefix code can also be represented as a binary tree where each edge is marked as ‘0’ or ‘1’ and the leaf are marked with a character so the list of edges to a leaf represent the character’s code. Here is a picture that show this idea:

The depth of a leaf is actually the size of a character prefix code and so this representation provide an alternative to the efficient of the code in the language of trees i.e.,

$B(T) = \sum_{c\in Alphabet} depth(c)\times freq(c)$ where $depth(c)$ is the depth of the leaf $c$ in the tree.

## How does Huffman encoding work?

Here is the algorithm to build Huffman tree:

1. Create a node for each character with its frequency and insert it to a list.
2. Remove from the list the two nodes with minimum frequencies. Then create a tree where the two nodes are “siblings” leafs and their parent is marked with the sum of their frequencies (the root has no character associated to it).
3. add the root node into the list. The list now is shorter by one element.
4. goto 3 until a single node remains in the list. This node will be the root to the Huffman tree.

Example:

Let look at a text with 20 ‘a’,  15 ‘b’, 5 ‘c’,  15 ‘d’, 45 ‘e’.

Step 1 says we will prepare a list {a/20 , b/15, c/5, d/15, e/45}.

Iteration 1:

Step 2 says we need to choose two minimum nodes e.g., ‘c’ and ‘d’ (we could also take the pairs ‘c’ and ‘b’ as the minimum frequencies nodes.) and create a tree:

we give this node alias ‘n1’ for consistent labeling of nodes but there is no use of this value. note that its frequency is 20 = 5 + 15.

step 3 says that we will add ‘n1’ to the list, ending up with the list {a/20, n1/20, b/15, e/45}.

Iteration 2:

Step 2, similarly as we did above, we choose ‘a’ and ‘b’ and produce a new node n2 and a tree:

step 3, the list end up looking like this {n2/35, n1/20, e/45}.

Iteration 3

step 2, combines n2/35 and n1/20 under the new node n3/55:

step 3, the list end up looking like this {n3/55, e/45}.

It should be clear that after the 4th iteration we will end up with the Huffman tree shown in the previous section.

## Why Huffman encoding is optimal? – a mathematical proof

In our context, optimal encoding means that if you take a fix alphabet with known frequencies then the Huffman code will have the minimum code efficiency value as calculated above when compared to all possible prefix codes available for this alphabet (with the same frequencies).

Lemma 1: Consider the two letters, x and y with the smallest frequencies. Then there is an optimal tree in which these two letters are sibling leaves in the tree in the lowest level.

Proof:  First, we notice that an optimal tree T must be full in the sense that all internal nodes should have two “child” nodes – otherwise if there is an internal node P with a single child node Q then we could remove P from the tree and set Q (and all its sub tree) in P place – which will reduce the depth value of all the nodes under Q by one and so reduce B(T) value in contrast to the fact that B(T) has minimum value.

Next, let T be an optimum prefix code tree, and let b and c be two siblings at the maximum depth of the tree (must exist because T is full). Since x and y have the two smallest frequencies it follows that $freq(x)\le freq(b)$ and $freq(y)\le freq(c)$ without loss of generality. Because b and c are at the deepest level of the tree we know that $depth(b)\le depth(x)$ and $depth(c)\le depth(y)$.

Now switch the positions of x and b in the tree resulting in a different tree T’ and see how the cost changes.

$B(T')=B(T)-freq(b)\times depth(b) -freq(x)\times depth(x) + freq(b)\times depth (x) + freq(x)\times depth(b)$

rearranging the left side of the equation,

$B(T')=B(T)- [(freq(x) - freq(b))\times (depth(x)-depth(b))] \le B(T)$ but as T is optimal then

$B(T)\le B(T')$ and so $B(T)=B(T')$.

Repeating this argument by replacing y and c will result with the required optimal tree.

Q.E.D

Theorem: The Huffman coding has code efficiency which is lower than all prefix coding of this alphabet.

Proof: We will prove this by induction on the size of the alphabet.

Starting with an alphabet of size 2, Huffman encoding will generate a tree with one root and two leafs. it is obvious that this tree is the smallest one and so the coding efficiency of this tree is minimal.

Assume the theorem is true to all alphabets of size n-1 and we will prove that it is also true for all alphabet of size n.

Let A be an alphabet with n characters and let T be the optimal tree for A. T has two leafs  ‘x’ and ‘y’ which are “siblings” with minimum frequencies due to Lemma 1. we will  look at T’ which is constructed from T by removing the leafs ‘x’ and ‘y’. We can look at T’ as a prefix code for the A’ alphabet which is constructed from A by removing characters ‘x’ and ‘y’ and adding a new character ‘z’ which is their parent node in T. Any character requires a frequency associated with so we will set the frequency of ‘z’ to be $freq(x)+freq(y)$.

We have

(1) $depth(x)=depth(y)=depth(z)+1$, therefore $depth(z)=depth(x)-1=depth(y)-1$.

(2) $freq(z) = freq(x)+freq(y)$.

(3) $B(T') =B(T)-freq(x)\times depth(x) - freq(y)\times depth(y) + freq(z)\times depth(z)$.

substituting $freq(z)$ and $depth(z)$ in (3) we get

$B(T') = B(T) -freq(x) - freq(y)$ and so

(4) $B(T)=B(T')+freq(x) + freq(y)$.

Let H’ be the Huffman tree for A’ . According to our assumption  H’ is optimal and so

(5) $B(H') \le B(T')$.

Adding to H’ two leafs ‘x’ and ‘y’ under the parent ‘z’ will leave us with Huffman tree H for alphabet A.

This is true as the first step in building H according to the algorithm above is to add ‘x’ and ‘y’ under ‘z’, then remove from A the characters ‘x’ and ‘y’ and replace them by ‘z’ i.e., the rest of algorithm is building Huffman tree for A’ which is exactly H’ (I find this part of the proof to be the hardest part to grasp).

Following the same argument as we had for calculating $B(T)$ we get

(6) $B(H)=B(H') + freq(x)+freq(y)$.

From (5) and (6) we get

(7) $B(H)=B(H')+freq(x)+freq(y)\le B(T')+freq(x)+freq(y)$.

from (7) and (4) we get

(8) $B(H)\le B(T)$.

As T is an optimal tree, we also have

(9) $B(T)\le B(H)$.

From (8) and (9) we get

$B(T)= B(H)$ i.e., the Huffman tree H is optimal.

This complete the induction proof.

Q.E.D

# rsync – a method for replicating updates

Background

rsync is an algorithm for efficient remote update of files over a high latency, low bandwidth link.

Definitions

• T (target) – the process at the end that has access to a version of a file that is older than the version S has.

Requirements

• Sync changes between target file T that has older version of a source file S.
• Minimize the network traffic for the synchronization.

Implementation

1. T (the target) divides the file into blocks with fixed size L.
2. For each block, T calculates a signature that is fast but not unique.
3. For each block, T calculates a signature that is slow but unique.
4. T sends S a list of pairs with fast and slow signatures for each block.
5. S starts checking its first block.
6. T created a temporary empty file X that will replace its current file with the version from S side.
7. S calculate the fast signature for the block,  if it is not equal to any of T fast signatures then S sends the first byte to T and T append this byte to file X. if the fast signature is equal, then S calculate the slow signature of the block and compare it to the matching slow signature. if the two signatures are equals then S sends T the id of the block. T append the block from its local copy. if the slow  signature is not equal then S will send the first byte to T.
8. The next block S will check will be the one starting in the next byte if the previous block did not match or the one starting in an offset of block size if the previous block matched. S may need to calculate the fast signature to every possible block  (i.e. blocks generated from the previous block by shifting one byte at a time, starting from the first block and ending in the last) – so rsync is using a method similar to Adler checksum that can be calculate efficiently in iterations based on previous block signature plus small number of steps.
9. repeat steps 7-8 until S reach its end.

Reference:

1. Andrew Tridgell,  “Efficient Algorithms for Sorting and Synchronization” , https://rsync.samba.org/~tridge/phd_thesis.pdf

# Architecture of ZAB – ZooKeeper Atomic Broadcast protocol

Background

ZooKeeper support clients reading and updating key values pairs with high availability. High availability is achieved by replicating the data to multiple nodes and let clients read from any node. Critical to the design of Zookeeper is the observation that each state change is incremental with respect to the previous state, so there is an implicit dependence on the order of the state changes. Zookeeper Atomic Broadcast (ZAB) is the protocol under the hood that drives ZooKeeper replication order guarantee. It also handles electing a leader and recovery of failing leaders and nodes. This post is about ZAB.

Definitions

• leader and followers-  in ZooKeeper cluster, one of the nodes has a  leader role and the rest have followers roles. The leader is responsible for accepting all incoming state changes from the clients and replicate them to itself and to the followers. read requests are load balanced between all followers and leader.
• transactions –  client state changes that a leader propagates to its followers.
• ‘e’ – the epoch of a leader. epoch is an integer that is generated by a leader when he start to lead and should be larger than epoch’s of previous leaders.
• ‘c’ – a sequence number that is generated by the leader, starting at 0 and increasing. it is used together with an epoch to order the incoming clients state changes.
• ‘F.history’ – follower’s history queue. used for committing incoming transactions in the order they arrived.
• outstanding transactions – the set of transactions in the F.History that have sequence number smaller than current COMMIT sequence number.

ZAB Requirements

1. Replication guarantees
1. Reliable delivery – If a transaction, M, is committed by one server, it will be eventually committed by all servers.
2. Total order – If transaction A is committed before transaction B by one server, A will be committed before B by all servers. If A and B are committed messages, either A will be committed before B or B will be committed before A.
3. Causal order – If a transaction B is sent after a transaction A has been committed by the sender of B, A must be ordered before B. If a sender sends C after sending B, C must be ordered after B.
2. Transactions are replicated as long as majority (quorum) of nodes are up.
3. When nodes fail and later restarted – it should catch up the transactions that were replicated during the time it was down.

ZAB Implementation

• clients read from any of the ZooKeeper nodes.
• clients write state changes to any of the ZooKeeper nodes and this state changes are forward to the leader node.
• ZooKeeper uses a variation of  two-phase-commit protocol for replicating transactions to followers. When a leader receive a change update from a client it generate a transaction with sequel number c and the leader’s epoch e (see definitions) and send the transaction to all followers. a follower adds the transaction to its history queue and send ACK to the leader. When a leader receives ACK’s from a quorum it send the the quorum COMMIT for that transaction. a follower that accept COMMIT will commit this transaction unless c is higher than any sequence number in its history queue. It will wait for receiving COMMIT’s for all its earlier transactions (outstanding transactions) before commiting.

picture taken from reference [4]

• Upon leader crashes, nodes execute a recovery protocol both to agree upon a common consistent state before resuming regular operation and to establish a new leader to broadcast state changes.
• To exercise the leader role, a node must have the support of a quorum of nodes. As nodes can crash and recover, there can be over time multiple leaders and in fact the same nodes may exercise the node role multiple times.
• node’s life cycle: each node executes one iteration of this protocol at a time, and at any time, a process may drop the current iteration and start a new one by proceeding to Phase 0.
• Phase 0 –  prospective leader election
• Phase 1 – discovery
• Phase 2 – synchronization
• Phases 1 and 2 are important for bringing the ensemble to a mutually consistent state, specially when recovering from crashes.
• Phase 1 – Discovery
In this phase, followers communicate with their prospective leader, so that the leader gathers information about the most recent transactions that its followers accepted. The purpose of this phase is to discover the most updated sequence of accepted transactions among a quorum, and to establish a new epoch so that previous leaders cannot commit new proposals. Because quorum of the followers have all changes accepted by the previous leader- then it is promised that at least one of the followers in current quorum has in its history queue all the changes accepted by previous leader which means that the new leader will have them as well. Phase 1 exact algorithm available here.
• Phase 2 – Synchronization
The Synchronization phase concludes the recovery part of the protocol, synchronizing the replicas in the cluster using the leader’s updated history from the discovery phase. The leader communicates with the followers, proposing transactions from its history. Followers acknowledge the proposals if their own history is behind the leader’s history. When the leader sees acknowledgements from a quorum, it issues a commit message to them. At that point, the leader is said to be established, and not anymore prospective. Phase 2 exact algorithm available here.
If no crashes occur, peers stay in this phase indefinitely, performing broadcast of transactions as soon as a ZooKeeper client issues a write request.  Phase 3 exact algorithm available here.
• To detect failures, Zab employs periodic heartbeat messages between followers and their leaders. If a leader does not receive heartbeats from a quorum of followers within a given timeout, it abandons its leadership and shifts to state election and Phase 0. A follower also goes to Leader Election Phase if it does not receive heartbeats from its leader within a timeout.

References:

1. Flavio P. Junqueira, Benjamin C. Reed, and Marco Serafini, “Zab: High-performance broadcast for primary-backup systems”
2. Andr´e Medeiros, “ZooKeeper’s atomic broadcast protocol: Theory and practice”
3. ZooKeeper Apache documentation
4. Benjamin Reed,Flavio P. Junqueira, A simple totally ordered broadcast protocol”

# Algorithm for Zab Phase 2 – Synchronization

This is part of my main post on architecture of ZAB.

There are four variables that constitute the persistent state of a node, which are used during the recovery part of the protocol:
history: a log of transaction proposals accepted;
acceptedEpoch: the epoch number of the last NEWEPOCH message accepted;
currentEpoch: the epoch number of the last NEWLEADER message accepted;
lastZxid: zxid of the last proposal in the history;

References: Andr´e Medeiros, “ZooKeeper’s atomic broadcast protocol: Theory and practice”, http://www.tcs.hut.fi/Studies/T-79.5001/reports/2012-deSouzaMedeiros.pdf