Sunday, April 17, 2016

Time, Clocks and the Order of Events in a Distributed System

THE PROBLEM

In a distributed system, it is a non-trivial problem to be able to order the events occurring at various points in the system ie. it is hard to establish a "happened before" relationship among events. One might say, why not just use physical clocks and order events according to the physical clock reading. But then the question arises, how to keep those physical clocks in sync? Independent physical clocks may not show the same reading and may drift apart with time.

This paper first introduces a way to order only a subset of events in a distributed system. This method is then extended to impose an order upon all the events in the system. Further, we see that how this total order may be in conflict with the order of events in the real world - a method using physical clocks is proposed to counter such anomalous behavior. The paper signs off with a method to keep the physical clocks in sync (within certain limits) - required to avoid any anomalous behavior.

PARTIAL ORDER OF EVENTS

A simple happened-before relationship ( --> ) between a pair of events in the system can be established as follows:

  1. If the two events occur at the same process P, then the events can be ordered according to their order on the process P.
  2. Let A be the event of sending message M from process P1 to process P2. Let B be the event of receiving message M on P2. Then, A --> B.
  3. If A --> B, B --> C ==> A --> C
Note that these 3 rules may not be able to establish a happened-before relation on just any pair of events in the system. That's why, the happened-before relationship ( --> ) is said to impose only a partial order on the events in the system. If !(A --> B) && !(B --> A), then events A, B are said to be concurrent.

In this diagram, p3 and q4 are concurrent. But, q1 --> p3.

LOGICAL CLOCKS

In above, we established a theoretical underpinning of the happened-before relationship. To move towards a more real-world system, we introduce Logical clocks (unrelated to physical time). In absence of relation to physical time, the logical clocks are deemed correct if
                                                          a --> b ==> C(a) < C(b)
where, C(a) and C(b) are the logical clock readings of events a & b.

Note that, C(a) < C(b) does not imply a --> b. Because, if it did, then any concurrent events would need to have C(a) == C(b) - a condition which does not apply to p3 and q4 in the above diagram, for example. 

To ensure that logical clocks are correct as defined above, the clocks need to follow just 2 rules:
  1. Between any 2 events on a process P, the logical clock ticks.
  2. When a process P sends a message M to process Q, it stamps it with the local clock reading Tm. When Q receives M, it sets the local logical clock to max(Tm + 1, Local Clock Reading Cq).
The above 2 conditions make sure that if a --> b, then C(a) < C(b). 

ORDER ORDER! TOTAL ORDER!

With our logical clocks in place, we can now go from Partial order ( --> ) to Total order ( ==> ). We simply order the events by the logical clock reading. If C(a) == C(b), then we use an arbitrary ordering O of the processes to order the corresponding events. The total ordering thus arrived at, is not unique however. It depends upon the logical clocks as well as the the ordering O. 

Note that since our logical clocks are correct, for any two events that have a causal relationship or a happened-before relationship, ie. a --> b, C(a) < C(b). For otherwise concurrent events in our previous model, we impose an arbitrary order based on logical clock readings and O. 

Being able to have a total order in a distributed system is a very useful capability. Total ordering of requests for a lock, can be, for example, be used to grant those requests in order. Total order in the system is the end, to which, the logical clocks are the means.

WHEN ORDER IS CHAOS!

Note that our total order in the previous section is a bit arbitrary. In fact, so arbitrary, that it may well conflict with the order of events observed in the real world. For example, event A may occur at process P at logical time C(a), and the observer may then initiate an event B on process Q at time C(b). Although, the observer initiated event B after A, the system (and the associated logical clocks) knows nothing about it, and may very well have C(a) > C(b), thus putting them in the exact opposite order of what was observed in the real (Newtonian) world. 

This is Chaos! And probably acceptable to observers in the Newtonian world!

BRINGING ORDER TO CHAOS!

One of the very interesting things in this paper is that the total order of events in a system can be made consistent with the order of things in the real world by constructing our system with physical clocks. Provided that each individual clock works correctly (dC/dt ~ 1), and the distinct clocks are synced within boundaries (Ci - Cj < E), it can be made sure that for any a ==> b (total order), b occurs after a in real world as well. 

Note that for a ==> b to be consistent with real world observations, individual physical clocks should be in sync within limits. Real physical clocks would drift apart in general and become out of sync. The paper outlines a simple algorithm to sync physical clocks within an error bound. 

This is an incredibly useful result as this means that we now have a total order that is also consistent with real world observations. This is what we set out to achieve in the first place: To be able to order events (possibly using physical clocks that are not synced). This paper not only provides a method to sync physical clocks within a bound, but also a method to use it to arrive at a total order of events that is consistent with real world observations as well.

KEY TAKEAWAYS

  1. Total order of events in a distributed system, though a great enabler, is a non-trivial problem to solve.
  2. A system of synced physical clocks (within a bound) can be used to arrive at a total order of events in the distributed system that is consistent with order observed in the real world. 
  3. If real world order of events is unimportant, we can very well make do with a system of logical clocks that provides total ordering of events in the system. 

REFERENCE

[1] http://research.microsoft.com/en-us/um/people/lamport/pubs/time-clocks.pdf












Sunday, April 10, 2016

Zookeeper: A Wait-Free Coordination Kernel

WHAT IS ZOOKEEPER ANYWAY?

A service for coordinating processes of a distributed application.

Some common forms of coordination required in a distributed application include leader election, name registry, configuration management, group membership etc. Zookeeper provides a set of primitives which can be used to implement solutions to such common coordination problems. 

A common way of using Zookeeper happens to be through the Apache Curator Framework, which is a high-level API over Zookeeper and also provides recipes - ready to use, out of the box solutions for things like leader election, shared locks etc. See http://curator.apache.org/curator-recipes/index.html

TERMINOLOGY

Client: A user of the Zookeeper service

Server: A process providing Zookeeper service. Usually, we have an ensemble of such processes for replication and high availability.

Znode: An in-memory node to hold Zookeeper data.

Data Tree: The Znodes are arranged in a hierarchical name space much like the traditional file system. The complete set of Znodes is referred to as the Data Tree. 

SERVICE OVERVIEW

Zookeeper provides an abstraction of data nodes (called Znodes) in a hierarchical namespace. A Znode can be referred to (for a read/write operation) using the full path (like in a UNIX system) of the said znode. For example, the node p3 in the following diagram can be referred to, using the path: /app1/p_3, where / is the path of the root znode and /app1 is the path of the node app1.


Clients connect to zookeeper via sessions and perform read/write operations on the Znodes using their full path. 

There are 2 types of Znodes: 
  1. Regular: Clients manipulate regular znodes by creating and deleting them explicitly. 
  2. Ephemeral: Clients delete such Znodes explicitly or let the system remove them when the session that creates them terminates (deliberately or due to failure).

WATCHES

Zookeeper implements watches on Znodes so that clients can be notified of the changes in Znode value without requiring polling. When a client retrieves a value of a node, it can optionally set a watch on it, so that whenever the returned value changes, the client is notified. Note that watches are one-shot affairs and have to be set again once a notification for a changed value is received. Also, watches do NOT persist across sessions.

DATA MODEL

Although similar in spirit to the traditional file system, an important difference in Zookeeper is that the Znodes can be read or written upon only in full. One can not read or write them partially. 
Also, Znodes are not designed for general purpose data storage. By default, Znodes can store only 1 MB data.

Znodes are also associated with timestamps (to record when a change was made) and versions (to allow for conditional updates)

CLIENT API

Following is the API exposed by Zookeeper service (Apache Curator builds on top of this vanilla API):
  1. create(path, data, flags): Create a Znode with the ‘data’. Flags indicate whether the node is to be regular/ephemeral etc.
  2. delete(path, version): Delete a Znode if its version matches the specified version.
  3. exists(path, watch): Check for the existence of a Znode and set a watch if it does (to notify of its deletion).
  4. getData(path, watch): The read operation on a Znode with the specified path. Sets a watch too on the Znode to provide back a notification if its value changes. Note that the notification does not contain the changed value, only that the value has changed. Client needs to do a read again to get the updated value.
  5. setData(path, data, version): The proverbial write on a Znode. Conditional upon version matching.
  6. getChildren(path, watch): Get the children of the Znode at the specified path
  7. sync(path): Covered later.
An important point to note here is that there are no open/close calls in this API, since all methods use the full path name of the Znode. This not only simplifies the API, but also obviates storing unnecessary state on the servers. Win win all around!

All the above calls have an async as well as a sync version. The async version allows a client to have multiple calls outstanding on Zookeeper whilst having a low latency for each of them. 

ZOOKEEPER GUARANTEES

  1. Linearizable writes: All writes appear to have been done in some order. And that perceived order of the writes is the same across all the clients. That is, if a client observes (by doing reads) that a write W1 was performed followed by W2 , every other client in the system observes either W1 followed by W2, or just W2, but never does any client observe a W2 followed by W1.

    Watch notifications are also processed along with writes. What this means is that if a client receives notifications in the order: N1 (for a write W1) followed by N2 (for a write W2), all other clients in the system receive notifications in the same order (or just the N1 notification, as it is sufficient to suggest that the value with the client has gone stale).
  2. FIFO order of client operations: Operations executed by a client are performed in FIFO order. Note that the FIFO order is imposed upon the ops performed by a client, not upon the ops performed across clients.

PRIMITIVES USING THE ZOOKEEPER

1. NAME REGISTRY

A name registry is a registry (at a well-defined location) that publicizes the availability of services at particular ip address/port. Implementing such a registry is very simple with Zookeeper. All one needs is to have a well-defined znode under which each service creates a new znode with its own IP address/port when it becomes available. Any client that has done a getChildren on the well-defined Znode with a watch gets to know about the newly added/deleted Znode (under the well-defined znode) as and when services become available/go away. Clients can also set watches on the service znodes as well to get intimated about any changes in the address/port of the service.

2. GROUP MEMBERSHIP

In this situation, a process wants to know which all other processes are the members of the same group as itself. This is again as simple as each process creating an ephemeral node under a well-defined Znode with let's say it's own pid as the name of the znode. Any process that wants to knows the group members just reads all the children of the well-defined znode. Since each process creates an ephemeral node, the node disappears from the tree (and from the group as well) if its associated session dies due to a timeout/failure etc and thus only active members of the group are available as znodes under the well-defined Znode.

3. CONFIGURATION MANAGEMENT

Again, this is also as simple as writing the config information in a well-defined Znode, say Zc. Any clients reading the Znode Zc get notified of any configuration changes through watches. 

4. DISTRIBUTED LOCKS

Omitted here for the sake of brevity (although this post is not "brief" by any yardstick). See the full paper here.

5. LEADER ELECTION

Hint: Similar to Group Membership 

ZOOKEEPER IMPLEMENTATION

1. WRITES

In Zookeeper, a write involves first transforming the write operation into a transaction, then atomically broadcasting the transaction to all zookeeper servers, followed by committing the transaction to the in-memory database at each replica. Atomic broadcast of the transaction ensures that each non-faulty server eventually gets the broadcast message (which it can subsequently commit). Note that the atomic broadcast succeeds only if the broadcasted transaction reaches at least a quorum (majority) of servers. This ensures that a write operation persists across any number of failures as long as there eventually exists a quorum of non-faulty servers. 

For durability purpose, any committed transaction is maintained in a replay log which is appended to before any changes are made to the in-memory database. 

To be more specific, a write operation is forwarded to the Zookeeper cluster leader, which then performs the atomic broadcast of the transaction. A write operation for a client completes once the local server (to which the client is connected) reflects the write changes in its in-memory database. This means that when a write returns, other replicas in the Zookeeper cluster may not reflect the same value for the affected Znode. Although Zookeeper replicas never diverge, they may lag each other in terms of transactions applied. 

Since, writes involve disk writes and coordination through atomic broadcast, these are relatively slow and costly.

2. READS

Reads in Zookeeper are served from the in-memory database of the locally connected server. These do not involve any disk reads or complex coordination, and hence are very fast (as compared to writes). 

Note that since reads refer only the local copy of in-memory database, reads may lag in time, although more recent writes may have been committed. As such, Zookeeper is only an eventually consistent system in which the reads may return stale values for some time. 

3. SLOW READS

Some applications may not be fine with getting stale values on a read. For such applications, a sync op is provided that ensures that all pending writes at the start of sync operation are applied before returning the result of a following read. This (sync --> read) is referred to as the slow read. However, note that this may still not return the most recent value in that it may miss any writes that happened after sync was issued and before the read happened. So, that's a gotcha.

KEY TAKEAWAYS

  1. Don't reinvent the wheel. Use Zookeeper as a template for solutions to things like Leader election, Config management, Distributed locks, Group membership etc. See here for a more detailed list.
  2. Zookeeper reads are fast. Writes not so much.
  3. Zookeeper is an eventually consistent system. Reads may return stale values.
  4. Zookeeper is sort of a key-value store, where values are not bigger than a MB.
  5. Watches are good. Really good. Much better than polling. 

REFERENCE

The original paper: http://static.cs.brown.edu/courses/cs227/archives/2012/papers/replication/hunt.pdf

Wednesday, March 30, 2016

Dynamo: Amazon's Highly Available Key-value Store

Before we start, note that this paper is about Dynamo and not DynamoDB. DynamoDB builds upon the principles of Dynamo, but just to be clear, these are not the same.

With over 3000 citations, this paper can be seen to be hugely responsible for the barrage of NoSQL databases (Riak, Voldemort, Cassandra, and the likes) that followed. 

From the title itself, the point is driven home that Dynamo's focus is availability. More specifically, Dynamo is always available for writes (assuming a bound on failing servers, of course). To achieve high availability, Dynamo sacrifices consistency. That is, different nodes may get different values for the same key on performing a read.

Why a key-value store (NoSQL) and not your next-door traditional SQL database?

  1. Many applications can make do with just a primary-key interface. An RDBMS with its full scale SQL capability is an overkill for such applications.
  2. Added capabilities of RDBMS require expensive hardware and has an operational cost in terms of trained RDBMS admins.
  3. Many RDBMS favor consistency over availability - not appropriate for applications that want to be always on and can tolerate some laxity in consistency.
  4. It's difficult to scale out relational databases. (Paper's claim not mine!)
Dynamo favors availability over consistency and can be scaled out easily by adding more dynamo instances to handle bigger data sets or increased read/write request load. 

It should also be noted that Dynamo targets applications that need to store only small values (typically < 1 MB).

DESIGN DECISIONS:


  1. Since availability and consistency cannot be achieved simultaneously (see the CAP theorem), sacrifice consistency in favor of availability. Dynamo is designed to be an eventually consistent store, in which updates reach all the replicas eventually.
  2. Since the data store is not always consistent, there may exist different versions of the data. Such differing (possibly conflicting) versions need to be reconciled either during reading or writing. Because Dynamo's target is being always available for writes, such conflicts are resolved during reads.
  3. Dynamo has no master-slave relationships. No special responsibilities for any node. It's all peer-to-peer.

DYNAMO INTERFACE:


  1. get(key): Returns a list of object replicas (with conflicting versions) or a single object if no conflicts exist in the system. In addition, a context object is returned that includes metadata, such as version of the object.
  2. put(key, context, value): A hash on the key is used to decide the dynamo instances where the object replicas would be stored. Note that a single key-value pair is stored at multiple dynamo instances for high durability.


SYSTEM ARCHITECTURE:



1. PARTITIONING

Dynamo uses consistent hashing to distribute the keys among the dynamo instances. In particular, each node corresponds to multiple virtual nodes on the consistent hash ring to allow uniform distribution of keys (uniform redistribution as well, when nodes die/resurrect). Assigning different number of virtual nodes to a node on the hash ring based on different capacities handles heterogeneity among the servers.

2. REPLICATION

High durability is achieved in Dynamo by ensuring that each key-value pair is replicated on N dynamo servers. The hash of the key is used to select a coordinator node which ensures N-way replication.

3. DATA VERSIONING

Since Dynamo is an eventually consistent system, a put may return to the caller before the update has been applied to all the replicas. Therefore, multiple versions of an object (key-value pair) can co-exist in the system. Such conflicting versions are reconciled on a read, so that the system is always write-available.

4. EXECUTING GET & PUT

A get/put operation in Dynamo, happens via a so-called coordinator node. There are 3 parameters involved here:

  • T: Total number of nodes in the system
  • N: Total number of object replicas (number of distinct copies of a key-value pair)
  • R: Number of nodes that must participate in a read/get op for it to be successful.
  • W: Number of nodes that must participate in a write/put op for it to be successful.
Note that typically, T > N.

Each node in the Dynamo system maintains a preference list - a list of nodes to decide where each key-value pair should be stored (the top node being the coordinator node). On receiving a get request, the coordinator node (usually the first node in the preference list) sends the request to N top healthy nodes from the preference list (the preference list has more than N nodes to tolerate nodes dying away), and waits for at least R - 1 responses before returning to the caller. Some of the (R - 1) responses (or even the coordinator's local copy) may have stale values or may be in conflict, which are either resolved by Dynamo itself (using simple policies like latest-write-wins) or left to the client to reconcile by returning all the conflicting versions.

Similarly, for a put operation, the coordinator updates the version of the key-value pair and sends it to top N healthy nodes from the preference list. The write succeeds when the coordinator node receives response from W - 1 nodes (-1 because the coordinator node itself saves the new version as well). 

Note that because the write succeeds after responses only from W nodes, a following read may return a stale value (assuming W < T, of course).

5. SLOPPY QUORUM (?)

For the case where N = T, R + W > N can guarantee full consistency ie. any read reflects the latest write. This is because with R + W > N, any successful read includes at least one node that committed a previous write successfully. For a detailed explanation of this, refer here.

But for the Dynamo case, N < T (to be able to handle failures), and even if we have R + W > N, this does not really provide any consistency guarantees because top N healthy nodes (and not the same N) out of T nodes are selected for the read/write operation. Since the subset of N nodes itself is not guaranteed to be the same for any write followed by a read, there is no guarantee that R nodes of the read intersect with even one of the W nodes of a previous write. 

At the very basic level, it's the transient nature of the N nodes (top N healthy nodes out of T total nodes) that is responsible for the sloppiness in the consistency guarantees of Dynamo. That's why, Dynamo is only eventually consistent, and not fully consistent (even with R + W > N).

6. REPLICA SYNCHRONIZATION

Dynamo uses Merkle trees for faster synchronization of the object replicas and to also reduce the amount of data transferred. The leaves of the Merkle tree correspond to hashes of values of individual keys. The parents higher up contain the hash value of their children. Each Dynamo server maintains a separate Merkle tree for the key-range corresponding to a virtual node (Each dynamo node corresponds to multiple virtual nodes on the consistent hash ring, so each node has multiple Merkle trees). To detect differences, two nodes first compare the root of the Merkle trees. If they are same, then the key-ranges are in sync. If not, one moves progressively down the Merkle trees until all the differing keys are identified. Merkle trees thus help reduce the data transferred as well as faster detection of differences. 

It's not clear though from the paper how frequently Merkle trees are used for such synchronization.

7. LOCAL PERSISTENCE ENGINE

One of the interesting things about Dynamo is the ability to plug in different persistence engines. For example, to store values of few tens of KBs, one could use the Berkeley Database, while for objects of larger sizes one can go for MySQL. 

8. TUNING DYNAMO

By varying N, R and W, one can affect the durability, availability and consistency characteristics of Dynamo. For example, by setting N = T (total nodes), and W = N, one can achieve full consistency as the write won't complete until it has been committed on all the nodes. 

By setting R = 1, along with W = N = T on the other hand, one obtains a High-performance read engine (with full consistency).

By setting W = 1, one gets high availability (writes don't fail as long as there is a single node in the system) at the cost of durability (if that one node fails, the write is lost forever).

Typical values of (R = 2, W = 2, N = 3) are used in Amazon's applications.

9. OPTIMIZING WRITES/READS:

An interesting optimization in Dynamo is the use of so-called buffered writes. To improve latency of reads/writes (at the cost of durability), Dynamo allows in-memory writes (instead of writing to the persistent store). Similarly, for reads, the key is first checked for in the in-memory buffer. A writer thread in the background regularly flushes updates to the persistent store. Not having to deal with persistent store in every read/write was found to improve the 99.9th percentile write latency by a factor of 5. 

To decrease the risk of durability, the coordinator ensures that at least one of the W nodes selected by the coordinator performs the write on persistent store (and not just in the buffer). Of course, this still doesn't guarantee absolute durability, because an update may still be lost forever, if god forbid, the node performing the durable write goes down somehow. A loss of a single node in the system of N nodes can result in the loss of an update (that's pretty dicey I think!).

10. DIVERGENT VERSIONS

With its sloppy Quorum, one expects the existence of multiple versions of an object in a system. However, in practice (at Amazon), 99.94 % of requests saw exactly one version of an object, 0.00057% saw 2 versions, 0.000475 saw 3, and 0.00009% saw 4 versions. Which is a bit surprising ;)

11. LOAD BALANCING

I would prefer having a separate (albeit short) post for this covering the various variants of consistent hashing. Watch out.

KEY TAKEAWAYS


  1. A key takeaway here is that designing a highly available system requires sacrifices in terms of consistency. To achieve an always available for writes key-value store, we have to tolerate the existence of multiple versions of an object in the system (which have to be reconciled somehow). 
  2. Being sloppy (quorum-wise) can help achieve high availability.
  3. R, W, N, T can be varied as per one's needs. One can get a fully consistent system in Dynamo as well by setting W = N = T. 
  4. Consistent hashing is good. Use it to partition load.
  5. Merkle trees are good as well. Use them to detect differences between a list of key-values and to ensure minimum data transfer.
  6. One can construct highly available and scalable systems on top of things like MySQL (Dynamo uses it as a local persistence engine).

REFERENCE

[1] http://www.allthingsdistributed.com/files/amazon-dynamo-sosp2007.pdf