Sunday, April 17, 2016

Time, Clocks and the Order of Events in a Distributed System

THE PROBLEM

In a distributed system, it is a non-trivial problem to be able to order the events occurring at various points in the system ie. it is hard to establish a "happened before" relationship among events. One might say, why not just use physical clocks and order events according to the physical clock reading. But then the question arises, how to keep those physical clocks in sync? Independent physical clocks may not show the same reading and may drift apart with time.

This paper first introduces a way to order only a subset of events in a distributed system. This method is then extended to impose an order upon all the events in the system. Further, we see that how this total order may be in conflict with the order of events in the real world - a method using physical clocks is proposed to counter such anomalous behavior. The paper signs off with a method to keep the physical clocks in sync (within certain limits) - required to avoid any anomalous behavior.

PARTIAL ORDER OF EVENTS

A simple happened-before relationship ( --> ) between a pair of events in the system can be established as follows:

  1. If the two events occur at the same process P, then the events can be ordered according to their order on the process P.
  2. Let A be the event of sending message M from process P1 to process P2. Let B be the event of receiving message M on P2. Then, A --> B.
  3. If A --> B, B --> C ==> A --> C
Note that these 3 rules may not be able to establish a happened-before relation on just any pair of events in the system. That's why, the happened-before relationship ( --> ) is said to impose only a partial order on the events in the system. If !(A --> B) && !(B --> A), then events A, B are said to be concurrent.

In this diagram, p3 and q4 are concurrent. But, q1 --> p3.

LOGICAL CLOCKS

In above, we established a theoretical underpinning of the happened-before relationship. To move towards a more real-world system, we introduce Logical clocks (unrelated to physical time). In absence of relation to physical time, the logical clocks are deemed correct if
                                                          a --> b ==> C(a) < C(b)
where, C(a) and C(b) are the logical clock readings of events a & b.

Note that, C(a) < C(b) does not imply a --> b. Because, if it did, then any concurrent events would need to have C(a) == C(b) - a condition which does not apply to p3 and q4 in the above diagram, for example. 

To ensure that logical clocks are correct as defined above, the clocks need to follow just 2 rules:
  1. Between any 2 events on a process P, the logical clock ticks.
  2. When a process P sends a message M to process Q, it stamps it with the local clock reading Tm. When Q receives M, it sets the local logical clock to max(Tm + 1, Local Clock Reading Cq).
The above 2 conditions make sure that if a --> b, then C(a) < C(b). 

ORDER ORDER! TOTAL ORDER!

With our logical clocks in place, we can now go from Partial order ( --> ) to Total order ( ==> ). We simply order the events by the logical clock reading. If C(a) == C(b), then we use an arbitrary ordering O of the processes to order the corresponding events. The total ordering thus arrived at, is not unique however. It depends upon the logical clocks as well as the the ordering O. 

Note that since our logical clocks are correct, for any two events that have a causal relationship or a happened-before relationship, ie. a --> b, C(a) < C(b). For otherwise concurrent events in our previous model, we impose an arbitrary order based on logical clock readings and O. 

Being able to have a total order in a distributed system is a very useful capability. Total ordering of requests for a lock, can be, for example, be used to grant those requests in order. Total order in the system is the end, to which, the logical clocks are the means.

WHEN ORDER IS CHAOS!

Note that our total order in the previous section is a bit arbitrary. In fact, so arbitrary, that it may well conflict with the order of events observed in the real world. For example, event A may occur at process P at logical time C(a), and the observer may then initiate an event B on process Q at time C(b). Although, the observer initiated event B after A, the system (and the associated logical clocks) knows nothing about it, and may very well have C(a) > C(b), thus putting them in the exact opposite order of what was observed in the real (Newtonian) world. 

This is Chaos! And probably acceptable to observers in the Newtonian world!

BRINGING ORDER TO CHAOS!

One of the very interesting things in this paper is that the total order of events in a system can be made consistent with the order of things in the real world by constructing our system with physical clocks. Provided that each individual clock works correctly (dC/dt ~ 1), and the distinct clocks are synced within boundaries (Ci - Cj < E), it can be made sure that for any a ==> b (total order), b occurs after a in real world as well. 

Note that for a ==> b to be consistent with real world observations, individual physical clocks should be in sync within limits. Real physical clocks would drift apart in general and become out of sync. The paper outlines a simple algorithm to sync physical clocks within an error bound. 

This is an incredibly useful result as this means that we now have a total order that is also consistent with real world observations. This is what we set out to achieve in the first place: To be able to order events (possibly using physical clocks that are not synced). This paper not only provides a method to sync physical clocks within a bound, but also a method to use it to arrive at a total order of events that is consistent with real world observations as well.

KEY TAKEAWAYS

  1. Total order of events in a distributed system, though a great enabler, is a non-trivial problem to solve.
  2. A system of synced physical clocks (within a bound) can be used to arrive at a total order of events in the distributed system that is consistent with order observed in the real world. 
  3. If real world order of events is unimportant, we can very well make do with a system of logical clocks that provides total ordering of events in the system. 

REFERENCE

[1] http://research.microsoft.com/en-us/um/people/lamport/pubs/time-clocks.pdf












Sunday, April 10, 2016

Zookeeper: A Wait-Free Coordination Kernel

WHAT IS ZOOKEEPER ANYWAY?

A service for coordinating processes of a distributed application.

Some common forms of coordination required in a distributed application include leader election, name registry, configuration management, group membership etc. Zookeeper provides a set of primitives which can be used to implement solutions to such common coordination problems. 

A common way of using Zookeeper happens to be through the Apache Curator Framework, which is a high-level API over Zookeeper and also provides recipes - ready to use, out of the box solutions for things like leader election, shared locks etc. See http://curator.apache.org/curator-recipes/index.html

TERMINOLOGY

Client: A user of the Zookeeper service

Server: A process providing Zookeeper service. Usually, we have an ensemble of such processes for replication and high availability.

Znode: An in-memory node to hold Zookeeper data.

Data Tree: The Znodes are arranged in a hierarchical name space much like the traditional file system. The complete set of Znodes is referred to as the Data Tree. 

SERVICE OVERVIEW

Zookeeper provides an abstraction of data nodes (called Znodes) in a hierarchical namespace. A Znode can be referred to (for a read/write operation) using the full path (like in a UNIX system) of the said znode. For example, the node p3 in the following diagram can be referred to, using the path: /app1/p_3, where / is the path of the root znode and /app1 is the path of the node app1.


Clients connect to zookeeper via sessions and perform read/write operations on the Znodes using their full path. 

There are 2 types of Znodes: 
  1. Regular: Clients manipulate regular znodes by creating and deleting them explicitly. 
  2. Ephemeral: Clients delete such Znodes explicitly or let the system remove them when the session that creates them terminates (deliberately or due to failure).

WATCHES

Zookeeper implements watches on Znodes so that clients can be notified of the changes in Znode value without requiring polling. When a client retrieves a value of a node, it can optionally set a watch on it, so that whenever the returned value changes, the client is notified. Note that watches are one-shot affairs and have to be set again once a notification for a changed value is received. Also, watches do NOT persist across sessions.

DATA MODEL

Although similar in spirit to the traditional file system, an important difference in Zookeeper is that the Znodes can be read or written upon only in full. One can not read or write them partially. 
Also, Znodes are not designed for general purpose data storage. By default, Znodes can store only 1 MB data.

Znodes are also associated with timestamps (to record when a change was made) and versions (to allow for conditional updates)

CLIENT API

Following is the API exposed by Zookeeper service (Apache Curator builds on top of this vanilla API):
  1. create(path, data, flags): Create a Znode with the ‘data’. Flags indicate whether the node is to be regular/ephemeral etc.
  2. delete(path, version): Delete a Znode if its version matches the specified version.
  3. exists(path, watch): Check for the existence of a Znode and set a watch if it does (to notify of its deletion).
  4. getData(path, watch): The read operation on a Znode with the specified path. Sets a watch too on the Znode to provide back a notification if its value changes. Note that the notification does not contain the changed value, only that the value has changed. Client needs to do a read again to get the updated value.
  5. setData(path, data, version): The proverbial write on a Znode. Conditional upon version matching.
  6. getChildren(path, watch): Get the children of the Znode at the specified path
  7. sync(path): Covered later.
An important point to note here is that there are no open/close calls in this API, since all methods use the full path name of the Znode. This not only simplifies the API, but also obviates storing unnecessary state on the servers. Win win all around!

All the above calls have an async as well as a sync version. The async version allows a client to have multiple calls outstanding on Zookeeper whilst having a low latency for each of them. 

ZOOKEEPER GUARANTEES

  1. Linearizable writes: All writes appear to have been done in some order. And that perceived order of the writes is the same across all the clients. That is, if a client observes (by doing reads) that a write W1 was performed followed by W2 , every other client in the system observes either W1 followed by W2, or just W2, but never does any client observe a W2 followed by W1.

    Watch notifications are also processed along with writes. What this means is that if a client receives notifications in the order: N1 (for a write W1) followed by N2 (for a write W2), all other clients in the system receive notifications in the same order (or just the N1 notification, as it is sufficient to suggest that the value with the client has gone stale).
  2. FIFO order of client operations: Operations executed by a client are performed in FIFO order. Note that the FIFO order is imposed upon the ops performed by a client, not upon the ops performed across clients.

PRIMITIVES USING THE ZOOKEEPER

1. NAME REGISTRY

A name registry is a registry (at a well-defined location) that publicizes the availability of services at particular ip address/port. Implementing such a registry is very simple with Zookeeper. All one needs is to have a well-defined znode under which each service creates a new znode with its own IP address/port when it becomes available. Any client that has done a getChildren on the well-defined Znode with a watch gets to know about the newly added/deleted Znode (under the well-defined znode) as and when services become available/go away. Clients can also set watches on the service znodes as well to get intimated about any changes in the address/port of the service.

2. GROUP MEMBERSHIP

In this situation, a process wants to know which all other processes are the members of the same group as itself. This is again as simple as each process creating an ephemeral node under a well-defined Znode with let's say it's own pid as the name of the znode. Any process that wants to knows the group members just reads all the children of the well-defined znode. Since each process creates an ephemeral node, the node disappears from the tree (and from the group as well) if its associated session dies due to a timeout/failure etc and thus only active members of the group are available as znodes under the well-defined Znode.

3. CONFIGURATION MANAGEMENT

Again, this is also as simple as writing the config information in a well-defined Znode, say Zc. Any clients reading the Znode Zc get notified of any configuration changes through watches. 

4. DISTRIBUTED LOCKS

Omitted here for the sake of brevity (although this post is not "brief" by any yardstick). See the full paper here.

5. LEADER ELECTION

Hint: Similar to Group Membership 

ZOOKEEPER IMPLEMENTATION

1. WRITES

In Zookeeper, a write involves first transforming the write operation into a transaction, then atomically broadcasting the transaction to all zookeeper servers, followed by committing the transaction to the in-memory database at each replica. Atomic broadcast of the transaction ensures that each non-faulty server eventually gets the broadcast message (which it can subsequently commit). Note that the atomic broadcast succeeds only if the broadcasted transaction reaches at least a quorum (majority) of servers. This ensures that a write operation persists across any number of failures as long as there eventually exists a quorum of non-faulty servers. 

For durability purpose, any committed transaction is maintained in a replay log which is appended to before any changes are made to the in-memory database. 

To be more specific, a write operation is forwarded to the Zookeeper cluster leader, which then performs the atomic broadcast of the transaction. A write operation for a client completes once the local server (to which the client is connected) reflects the write changes in its in-memory database. This means that when a write returns, other replicas in the Zookeeper cluster may not reflect the same value for the affected Znode. Although Zookeeper replicas never diverge, they may lag each other in terms of transactions applied. 

Since, writes involve disk writes and coordination through atomic broadcast, these are relatively slow and costly.

2. READS

Reads in Zookeeper are served from the in-memory database of the locally connected server. These do not involve any disk reads or complex coordination, and hence are very fast (as compared to writes). 

Note that since reads refer only the local copy of in-memory database, reads may lag in time, although more recent writes may have been committed. As such, Zookeeper is only an eventually consistent system in which the reads may return stale values for some time. 

3. SLOW READS

Some applications may not be fine with getting stale values on a read. For such applications, a sync op is provided that ensures that all pending writes at the start of sync operation are applied before returning the result of a following read. This (sync --> read) is referred to as the slow read. However, note that this may still not return the most recent value in that it may miss any writes that happened after sync was issued and before the read happened. So, that's a gotcha.

KEY TAKEAWAYS

  1. Don't reinvent the wheel. Use Zookeeper as a template for solutions to things like Leader election, Config management, Distributed locks, Group membership etc. See here for a more detailed list.
  2. Zookeeper reads are fast. Writes not so much.
  3. Zookeeper is an eventually consistent system. Reads may return stale values.
  4. Zookeeper is sort of a key-value store, where values are not bigger than a MB.
  5. Watches are good. Really good. Much better than polling. 

REFERENCE

The original paper: http://static.cs.brown.edu/courses/cs227/archives/2012/papers/replication/hunt.pdf