Introduction

Refs are a seldom used feature of the Clojure concurrency toolkit. But they are vital to understand well and sometimes the only natural fit for a problem.

Transactions on the other hand are something backend devs deal with quite a lot, specially when it comes to managing data stores. If you have ever used database transactions, the clojure Refs APIs will be easy to understand. Transactions in clojure provide Atomic, Consistent and Isolated updates to shared mutable storage. Atomic means that every change to Refs made within a transaction occurs or none do. Consistent means that each new value can be checked with a validator function before allowing the transaction to commit. Isolated means that no transaction sees the effects of any other transaction while it is running.

Clojure implements transactions using Software transactional memory (STM). STM is exactly analogous to database transactions, but implemented for controlling access to shared memory.

In this blog, we’ll take a look at how Clojure implements a Software transactional memory, namely, MVCC STM and take a deep dive into the implementation.

Outline of the blog

Software Transactional Memory

Refs implement a Software transactional memory. Essentially, you start a bunch of co-ordinated changes involving multiple shared entities, but these are all isolated to everyone outside the transaction. Once the changes are calculated, the transaction either commits all the changes or fails and starts again. All the while the rest of the code is oblivious to these updates until the commit succeeds.

STM is a strategy implemented in software, rather than as a hardware component. A transaction in this context occurs when a piece of code executes a series of reads and writes to shared memory. These reads and writes logically occur at a single instant in time; intermediate states are not visible to other (successful) transactions.

STM is optimistic, a thread completes modifications to shared memory without regard for what other threads might be doing, recording every read and write that it is performing in a log. After completing an entire transaction, it verifies that other threads have not concurrently made changes to memory that it accessed in the past. This final operation, in which the changes of a transaction are validated and, if validation is successful, made permanent, is called a commit. A transaction may also abort at any time, causing all of its prior changes to be rolled back or undone. If a transaction cannot be committed due to conflicting changes, it is typically aborted and re-executed from the beginning until it succeeds.

The benefit of this optimistic approach is increased concurrency: no thread needs to wait for access to a resource, and different threads can safely and simultaneously modify disjoint parts of a data structure that would normally be protected under the same lock.

The Clojure STM uses multiversion concurrency control. This means that every update to a value actually creates a new value. The version of the value which a transactions sees depends on the time when the transaction started, this is also known as snapshot isolation. Thus it is designed to work with persistent collections which can make cheap copies of themselves.

Motivation

Lets start by looking at a motivational example of where transactions are useful, the Account Transfer problem.

The problem

You need to write code to transfer money from one account to another. You start with a set of accounts each with some amount of money in them. You can only transfer money from one account to another. Before making the transfer you have to ensure that the source account has enough money for the transfer. And all this needs to happen concurrently with other transactions running in the system using the same set of accounts. Throughout the process, you can never have a situation where money has been taken out of an account but not put into another account. So no readers of the accounts data should ever see partial results. To ensure this, your aim is to always keep the total amount of money in the system the same.

Why is this hard to do ?

The naive way of thinking about the solution here is to use fine-grained locking, right? Associate a lock with each Account object, acquire it before the operation, perform the transfer and then release the lock. But with this strategy, it is very easy to run into deadlocks where one operation has acquired a lock on an account and is waiting on another operation which has locked that account and is waiting to lock the first account, of a chain of such dependencies. So the next step might be to do ordered locking, where you only lock accounts in some globally agreed upon order. This prevents deadlocks since all operations will only ever acquire locks in the same order. So imagine a scenario like below

Operation 1 -> Lock A, Lock B
Operation 2 -> Lock B, Lock C
Operation 3 -> Lock C, Lock A

;; Order of operations
Operation 1 -> acquire Lock A, wait for Lock B
Operation 2 -> acquire Lock B, wait for Lock C
Operation 3 -> wait for Lock A

;; Result
Operation 2 is free to acquire Lock C and proceed with the operation.
Then Operation 1 will proceed and finally Operation 3.

This is a completely viable approach, but the throughtput here completely drops to the floor. Also, in this scheme, readers also have to wait for acquiring locks and thus read throughtput also suffers.

Clojure’s solution

Now lets look at how the implementation will look like by using Clojure’s well designed Transaction system :

(ns demo.transactions)

(defn transfer
  "From and To are accounts.
  Shape of account {:money Int}"
  [from to]
  (dosync
   (let [amount (:money @from)
         new-balance (+ (:money @to) amount)]
     (alter from assoc :money 0)
     (alter to assoc :money new-balance))))

(def running (atom true))

(def accounts (map (fn [_] (ref {:money 0})) (range 97)))

(def seed-accounts (map (fn [_] (ref {:money 100})) (range 3)))

(def all-accounts (vec (concat seed-accounts accounts)))

(defn make-transfers
  [accounts thread-count]
  (doseq [t (range thread-count)]
    (future
      (while (= @running true)
        (Thread/sleep 200)
        (let [fromRank (rand-int 100)
              toRank (rand-int 100)
              from (get accounts fromRank)
              to (get accounts toRank)]
          (when (and (not (= fromRank toRank)) (< 0 (:money @from)))
            (print "Transferring from " fromRank " to " toRank)
            (transfer from to)))))))

(defn check-transfers
  []
  (while (= @running true)
    (Thread/sleep 200)
    (dosync
     (let [sum (reduce (fn [sum account]
                         ((fnil + 0) (:money @account) sum))
                       0
                       all-accounts)]
       (when (< sum 300)
         (print "============== Something went wrong !!! ================")
         (reset! running false))))))

Implementation details

Each thread has 1 transaction and each transaction stores ref values which it reads or updates

(def *current-transaction* nil)

Each transaction has a collection of in-transaction-values i.e values of refs which are read or written by operations in the transaction. A transaction also has a written-refs which is a set of refs which are modified / written by the operations in the transaction.


{:read-point 0
 :in-tx-values (atom {}) ;; map where key = ref value = value of ref
 :written-refs (atom #{})} ;; set of refs modified in this transaction

Each thread has a local-binding of such a transaction object. Whenever a transaction runs in a thread via the do-sync method a transaction is either created if nil or re-used if already bound.

(defn new-tx
  []
  {:read-point 0
   :in-tx-values (atom {})
   :written-refs (atom #{})})

(defn run-tx
  [tx f]
  (let [tx (binding [*current-transaction* tx]
             (f))]
    ;; try committing, if success, return
    ;; else create a new transaction and recur
    ))

(defn sync
  [f]
  (if (nil? *current-transaction*)
    (run-tx (new-tx) f)
    (f)))

Each ref is actually a historical log of its values and a revision number which points to when this value was written, also called history chain of a ref. First value in the chain is the latest while the last non-nil value is the oldest. The length of the chain is pre-determined and controlled by Clojure’s implementation. Currently clojure sets it to 10

[{:write-point 5 :value 10}
 {:write-point 4 :value 9}
 {:write-point 3 :value 8}
 {:write-point 2 :value 7}
 {:write-point 1 :value 6}
 {:write-point 0 :value 5}
 nil nil nil nil]

Transactions modify the outside world only in commit phase. This is also known as the write-point of the transaction. This is basically an globally incrementing counter who’s value is incremented on every commit. So whenever a new ref is created, the current value of the global write-point is stored as the ref’s write point. Like so

(def global-write-point (atom 0))
(def some-default-values (repeat 10 nil))

(defn new-ref
  [val]
  (atom (cons {:value val :write-point @global-write-point}
              some-default-values)))

All changes made to Refs during a transaction will appear to occur at a single point in the global “Ref world” timeline (its write point).

Also, every time a new transaction is created, it stores the current global-write point as its read-point, to indicate that this transaction should read ref values atleast as latest as this write-point.

{:read-point @global-write-point
 :in-tx-values (atom {})
 :written-refs (atom #{})}

So with all this knowledge, let’s see how a deref of a Ref looks like. Note that Refs can be read outside a transaction but not written to.

(defn de-ref [ref]
  (if (nil? *current-transaction*)
    (:value (first @ref))
    (tx-read *current-transaction* ref)))

So what does tx-read actually do ?

  1. See if the ref value is available in the in-tx-values. This indicates that some operation before this modified this ref. So better pick the recently updated value.
  2. If not, find the value in the Ref’s history chain which is older than the read-point of this transaction or just as old. This ensures that values written to Refs after this transaction started, do not reflect in this transaction aka snapshot isolation.
(defn tx-read
  "read the value of ref inside transaction tx"
  [tx ref]
  (let [in-tx-values (:in-tx-values tx)]
    (if (contains? @in-tx-values ref)
      (@in-tx-values ref) ;; return the in-tx-value
      ;; else search the history chain for entry with write-point <= tx's read-point
      (let [ref-entry (find-entry-before-or-on @ref (:read-point tx))]
        (if (not ref-entry)
          ;; if such an entry was not found, retry
          (tx-retry))
        (let [in-tx-value (:value ref-entry)]
          ;; cache the value
          (swap! in-tx-values assoc ref in-tx-value)
          ;; save and return the ref's value
          in-tx-value)))))

Let’s see how writes happen to Refs inside a transaction. Note that Refs can only be written to inside a transaction.

  1. Ensure that we are indeed inside a transaction.
  2. Record the value of this ref to the in-transaction-values map.
  3. Add this ref to the set of Refs written to by this transaction.
(defn tx-write
  "write val to ref inside transaction tx"
  [tx ref val]
  (swap! (:in-tx-values tx) assoc ref val)
  (swap! (:written-refs tx) conj ref)
  val)

(defn ref-set [ref value]
  (if (nil? *current-transaction*)
    (throw (IllegalStateException "write outside a transaction not allowed"))
    (tx-write *current-transaction* ref value)))

Committing a transaction consists of two parts, both of which happen inside a global commit lock :

  1. Validation: for each written ref, check if the ref has since been modified by another committed transaction. If yes, retry this transaction. This indicates a write conflict between transactions.
  2. If not, store the in-transaction-value of all written-to refs in the history chain of the refs under a new write-point. Only then update the global write-point such that new transactions can see the new values.

This implementation though sufficiently complex, has a couple of limitations

  1. Global commit lock is a bottleneck
  2. Write-skew

Global Locks

To overcome this bottleneck of global locks, clojure uses Fine-grained locking.

In this approach, we do locking with ordered locking to prevent deadlocks. So now every ref can be thought of having an additional ID, an atomically incrementing global counter, and a lock along with its history chain.

To commit a transaction, we have to acquire locks to all written-refs, but in lock order.

Write skew

To understand the problem of write-skew, let’s look at an example :

Consider 2 threads running 2 different transactions.

  1. The first transaction, t1 reads from the car ref, and depending on it’s value, writes to the bike ref.
  2. The second transaction, t2 reads from the bike ref, and depending on it’s value, writes to the car ref.
(defn t1
  [bike car]
  (Thread. (fn []
             (dosync (if (= (:color bike) (:color car))
                       (alter bike assoc bike :color :red))))))

(defn t2
  [bike car]
  (Thread. (fn []
             (dosync (if (= (:color bike) (:color car))
                       (alter car assoc car :color :gray))))))

Now when these 2 transactions commit concurrently, none of them will detect that they were dependent on a value which another transaction modified, since it was never part of their written refs.

To guard against this, clojure has the ensure function. This is basically equivalent to saying, I depend on this ref, add it to set of your list of tracked refs and fail if there is an update!

Barging

The limitation of the above discussed approach is that transactions only detect conflicts at the very end in the commit phase. It would be much more optimal to detect a write conflict as early as possible.

And clojure does ! It uses something known as barging to halt and force-retry another transaction when a write conflict is detected.

  • Each transaction now has a state which indicates its current status, one of : running, retry, committing, committed, killed
  • Each ref also has an id for the transaction which has acquired it. This is similar to an exclusive access to say that a transaction can commit a ref only if it was successful in acquiring it.
  • When writing to a ref, transaction has to acquire it. If this ref is being acquired by a transaction which is newer than this one, the newer one is barged and it will have to retry. If none is found and ref cannot be acquired, this transaction will be retried.

Watches

Clojure also has an API for being notified of changes to a Ref’s value via watches. The add-watch function, takes a ref to watch, a key which identifies this watcher, and a watch function to call when the Ref’s value changes. The watch function must be a fn of 4 args: a key, the reference, its old-state, its new-state. Let’s look at some code here :

(def a (ref 1))
(def b (ref 1))

(let [watch (fn [key
                ref
                old-value
                new-value]
              (println "Ref : " key " updated from " old-value " to " new-value))]
  (add-watch a :a watch)
  (add-watch b :b watch)
  (future
    (dotimes [_ 10]
      (dosync
       (alter a inc)
       (alter b inc))))

  (future
    (dotimes [_ 5]
      (dosync
       (alter a #(* %1 (deref %2)) b)
       (alter b #(* %1 (deref %2)) a)))))

Conclusion

  • Refs are an amazing concurrency construct, which turn out to be over kill in most cases other than when they don’t ! And then they are the best choice for synchronous co-ordinated changes.

  • Being able to make cheap copies of data is central to the Multi-version mechanism in Clojure. So any language which wants to implement such a transactional system will need high performant persistent data structures.

  • Learning about the details of the implementation of STM has deepened by understanding about the intent and spirit of concurrency control in Clojure !

References