The CSP Library Core.Async with ClojureScript

P. Madanasekaran

We have earlier seen the “actor” concurrency model. An actor program consists of independent, concurrently executing entities (called actors in Akka, or processes in Erlang and Elixir) that communicate by sending each other messages. Each actor has a mailbox that stores messages until they’re handled. A program using the Communicating Sequential Processes –CSP- model similarly consists of independent, concurrently executing entities that communicate by sending each other messages. The difference is one of emphasis—instead of focusing on the entities sending the messages, CSP focuses on the channels over which they are sent. Channels are first class—instead of each process being tightly coupled to a single mailbox, channels can be independently created, written to, read from, and passed between processes. CSP is an old idea that’s experiencing a renaissance. CSP’s recent popularity is largely due to the Go language. The idea was first described by Tony Hoare—of Quick-sort fame—in his 1978 paper Communicating Sequential Processes. CSP has since been extended and implemented in several languages, the latest of which being Google's Go programming language. As David Nolen, the primary author and maintainer of ClojureScript,  points out that the authors of Go language were dabbling with one concurrent language or other for more than two decades and finally hit a jackpot in Go. Rich Hickey, as pointed out in an earlier article, was not in favour of actor model, which in his opinion, couples the producer with the consumer. Rich Hickey has adopted Go’s implementation of CSP in core.async. Rich Hickey also acknowledged the influence of “Async” in C# which enables the programmer to write procedural/ synchronous code and the asynchronous execution of the same is effected through a state machine generated by the compiler. In core.async a similar state machine is created by macros.

Core.async:  The core.async library provides two primary facilities—channels and go blocks. Go blocks allow multiple concurrent tasks to be efficiently multiplexed across a limited pool of threads. A channel is a thread-safe queue—any task with a reference to a channel can add messages to one end, and any task with a reference to it can remove messages from the other. Lisp macros can seem like magic, enabling dramatic code transformations. The go macro is particularly magical. Code within a go block is transformed into a state machine. Instead of blocking when it reads from or writes to a channel, the state machine parks, relinquishing control of the thread it’s executing on. When it’s next able to run, it performs a state transition and continues execution, potentially on another thread. This represents an inversion of control, allowing the core.async runtime to efficiently multiplex many go blocks over a limited thread pool.

More on abstractions in core.async: It is useful to think of channels as blocking/parking  queues. Rather than designing programs off of a single message queue (the JavaScript runtime event loop), the creation and maintenance of an arbitrary number of message queues is used instead. These message queues, typically referred to as channels, can have messages en-queued by any part of a program and de-queued anywhere else, and allow for programs and systems to be designed with a stronger separation of concerns between the producers of data and their consumers and processors. In JVM Clojure, core.async provides a handy API for managing message passing between threads. Since ClojureScript shares JavaScript's single-threaded runtime, in practice the experience of writing programs with core.async gives the feel of writing a multithreaded program, without either the headache or the actual implementation concerns. For those coming from a JavaScript background, there's also no call-back hell! Another advantage of core.async is that it's fast. David Nolen, published a blog post, “Make No Promises”, not too long after the initial release of core.async comparing the speed of vanilla JavaScript promises to messages passed over a core.async. While individual tests may vary widely, on average, the messages passed using core.async were evaluated two to three times as quickly, and, in some cases, five times faster. The post itself is available at h t t p : / / s w a n n o d e t t e . g i t h u b . i o / 2 0 1 3 / 0 8 / 2 3 / m a k e - n o - p r o m i s e s

Sample

As mentioned in the earlier article on ClojureScript, it is comparatively easier to use “figwheel” as development environment for ClojureScript apps if you have installed Leiningen build and dependency management tool. Execute in command prompt
   lein new figwheel example

Necessary files and folders will be generated. Change to example directory and execute

lein figwheel

Necessary dependencies will be downloaded and the default browser will open and show as under:

If you have a look at the generated “project.clj”-the configuration file of your project, you can see the downloaded dependencies include core.async. The executable code in /src/example/core.clj may be modified as under:

/src/example/core.clj

(ns example.core  ;;  -------1)
(:require [cljs.core.async :as async])  ;;-------2)
(:require-macros [cljs.core.async.macros :as async-macros]))
(defn sample [])
(let [c (async/chan 2(map inc))] ;;------3)
(async-macros/go (async/>! c “VENKATESA”))   ;;-------4) & 5)
(async-macros/go (js/console.log (async/<! c)))  

  1. Namespace declaration distinguishes ClojureScript from JavaScript where every variable by default is global.
  2. Core.async and its macros are required.
  3.  “chan” function creates a core.async channel. As mentioned previously, it can be thought of as a concurrent blocking queue and it is the main abstraction in the library. By default chan creates an un-buffered/synchronous channel. It will not allow you to put a message into it unless there is process to take it  By specifying 1 to chan constructor we are creating a buffered channel of the size 2.Now the channel becomes asynchronous and it will permit messages to be written into it upto the size of the buffer without any process being ready to consume it. We will see more about buffered and un-buffered channels later.
  4. The functions >! and <! are used to put and take values from a channel, respectively. The caveat is that they have to be used inside a go block. We are putting the string “VENKATESA” in the channel. The put and take only return a channel. The value within the channel has to be extracted and we use js/console for the purpose.
  5. go is a macro that takes a body of expressions—which form a go block—and creates lightweight processes. This is where the magic happens. Inside a go block, any calls to >! and <! while waiting for values to be available in channels are not blocked but instead “parked”. Parking is a special type of blocking used internally in the state machine of core.async.

If you click F12 in the browser you can see it as under.

What is the advantage of using Go macros? Core.async also provides “thread” macro to be used with JVM where it is absolutely essential. But threads can be expensive. On the JVM, their default stack size is 512 kilobytes—configurable via the -Xss JVM start-up option. When developing a highly concurrent system, creating thousands of threads can quickly drain the resources of the machine the application is running on. core.async acknowledges this limitation and gives us lightweight processes. Internally, they do share a thread pool, but instead of wastefully creating a thread per go block, threads are recycled and reused when a put/take operation is waiting for a value to become available. The thread pool used by core.async defaults to the number of available processors x 2, + 42. So, a machine with eight processors will have a pool with 58 threads. Therefore, it is common for core.async applications to have dozens of thousands of lightweight processes. They are extremely cheap to create.

One more sample

Create one more figwheel project “exp”. Create a file “async.cljs” as under:

/src/exp/async.cljs

(ns experiment.async
(:require [cljs.core.async :as async])
(:require-macros
[cljs.core.async.macros :as async-macros]))
(def channel (async/chan 5)) ;;define a buffered channel of the size 5
(defn enqueue-val ;; for putting a value in the channel we define a function
"Enqueue a new value into our channel."
[v]
(async-macros/go
(async/>! channel v)))
(defn retrieve-val ;; for retrieving the value from the channel we define another function
[]
"Retrieve a new value from our channel and log it."
(async-macros/go
(js/console.log (async/<! channel))))
(defn enqueue-and-retrieve ;;test function
"Enqueue a value into a channel, and then test that we can retrieve
it."
[v]
(enqueue-val v)
(retrieve-val))

Modify the core.cljs to require the above file as under:

/src/exp/core.cljs

(ns exp.core
(:require [exp.async]))

Execute
lein figwheel

When you get
cljs/user prompt
Execute
(exp.async/enqueue-and-retrieve “Ganesha”)
You can see the return value as
#object [cljs.core.async.impl.channels.ManyToManyChannel ;; only a channel is returned
If you go to the browser and click F12
You can see in the console,  the value extracted from the channel “Ganesha”.

Another sample
In this sample we will remove the function “retrieve-val” and the test function “enqueue-and-retrieve” from the async.cljs of the earlier sample. Instead we will modify our code to use a listener so that it is just always listening and does something whenever a new value is found.

Execute
 lein new figwheel listen.

The modified async.cljs is given below

/src/listen/async.cljs

(ns listen.async
(:require [cljs.core.async :as async])
(:require-macros
[cljs.core.async.macros :as async-macros]))
(def channel (async/chan 5))
(defn enqueue-val
"Enqueue a new value into our channel."
[v]
(async-macros/go
(async/>! channel v)))

(defn listen  ;; new function
[]
"Listen to our channel for any events and log them to the console."
(async-macros/go
(while true  ;;----1)
(js/console.log (async/<! channel)))))
(listen)

1) This might seem like it would just loop forever, but remember that within core.async project's go blocks, functions like <! are essentially blocking.

So as long as <! doesn't find a new value on the channel, listen won't continue to loop.

Execute
lein figwheel
When you see “cljs-user” in the prompt execute
 cljs.user=> (listen.async/enque-val “Ganesh remover of Vigna”  )
you will see the result
 #object[cljs.core.async.impl.channels.ManyToManyChannel]
In the browser click F12 and you will see what was retrieved from the channel as shown below:

This shows that if we put a new value into our channel, it would be immediately read and logged by our background listener.

Buffered channels

In the samples we defined a channel as under:
 (def channel (async/chan 5))

If we don’t pass any number in the above definition what is returned is only a synchronous and unbuffered channel. Then, the channel will not allow anything to be published unless there is a waiting consumer. Note that this does not block the thread; it just means that the publisher will be parked, waiting, until such time as a consumer becomes available.The 5 in the above case is an optional argument that specifies how large we want the channel's buffer to be (that is, how many messages it can hold at one time).This is called fixed size buffer. It is fixed to a chosen number n, allowing producers to put items in the channel upto that number without having to wait for consumers. In addition to specifying the buffer's size, we can also choose what we want the buffer's strategy to be. For instance, we could use the dropping-buffer function to generate a buffer that drops new values when the buffer is full or we could use the sliding-buffer function to generate a buffer that drops the oldest value in the queue and retains the latest value being added when the buffer is full. If you don't set an explicit buffer strategy, core.async will use a fixed buffer. That a producer attempting to put a value in the channel will be parked when the buffer is full.

Transducer 

Transducer was introduced in Clojure 1.7. The Clojure documentation describes transducers as composable algorithmic transformations. Let's see why that is.

See the code below:
(->> (range 10)
(map inc) ;; creates a new sequence
(filter even?) ;; creates a new sequence
(prn "result is "))
;; "result is " (2 4 6 8 10)

The preceding snippet is straightforward and highlights an interesting property of what happens when we apply combinators to Clojure sequences: each combinator creates an intermediate sequence.In the previous example, we ended up with three in total: the one created by range, the one created by map, and finally the one created by filter. Most of the time, this won't really be an issue but for large sequences this means a lot of unnecessary allocation.

Using transducer, the previous example can be written like so:
(def xform  ;; create a tranducer
(comp (map inc)
(filter even?))) ;; no intermediate sequence created
 (->> (range 10)
(sequence xform) ;;apply the transducer created earlier
(prn "result is "))
;; "result is " (2 4 6 8 10)

In the new version, a whole range of the core sequence combinators, such as map and filter, have gained an extra arity: if you don't pass it a collection, it instead returns a transducer.In the previous example, (map inc) returns a transducer that knows how to apply the function inc to elements of a sequence. Similarly, (filter even?) returns a transducer that will eventually filter elements of a sequence. Neither of them do anything yet, they simply return functions.This is interesting because transducers are composable. We build larger and more complex transducers by using simple function composition:

Principally, transducers don't care about what the function does, the context of what's being built up, or the source of inputs.The most critical differentiating characteristics of transducers is that they don't care about the source of inputs. Other higher-order functions (map, filter, reduce, and so on) are critically tied to the collection (and hence sequence) abstractions. So  one really awesome use of transducers is to apply a transformation to anything  that goes through a core.async channel. We will see more about it later.

Conclusion

CSP represents a very different model from the default model in JavaScript - call-backs and promises. The default style can easily get out of hand—instead of writing more natural, sequential steps to achieving a task, that logic is instead scattered across multiple call-backs, increasing the developer's cognitive load. Using promises can flatten your call-back pyramid, but they don't eliminate call-backs. In sequential/procedural code the control is always with your code and it is easy to follow .The identifying characteristic of call-back hell is that you give over control from your code to a demon called call-back. Its execution is indeterminate and the order in which they will be executed is not pre-determined. Core.async enables you to write sequential code and takes over the responsibility of generating and managing the mess of call-backs. You write some synchronous /procedural code and the library gives you the benefit of asynchronous execution and faster result. The CSP model in core.async is influenced by “async” in C#; but Rich Hickey has adapted it from Go language. Go block is a sub-thread level construct. That is one thread can manage many such constructs. Inside a go block, any calls to >! and <!  waiting for values to be available in channels are parked. Parking is a special type of blocking used internally in the state machine of core.async’s lightweight processes. Internally, they do share a thread pool, but instead of wastefully creating a thread per go block, threads are recycled and reused when a put/take operation is waiting for a value to become available. Therefore, it is common for core.async applications to have dozens of thousands of lightweight processes. They are extremely cheap to create. Hence on an average, the messages passed using core.async were evaluated two to three times as quickly, and, in some cases, five times faster when compared to the speed of messages delivered with vanilla JavaScript promises. Hence core.async library is one of the attractions of ClojureScript over JavaScript. This model is highly flexible and allows you to write code that separates the behaviour of data consumers and processors from their producers.

Summing up

While using core.async, the first and foremost advantage is that you never lose control. The code even reads procedural. The go macro converts your easy code into a bunch of call-backs and coordinates them with a powerful state machine which will re-assemble them without ever losing control. It's all good-old call-backs and mutable state underground. But above ground, you can write a code that's easy to reason about. It's about regaining control of your asynchronous calls and not smearing your logic across your code in little bits contained in call-backs. The go block compares in speed very favorably to an Elixir process—a very impressive result given that Clojure runs on the JVM, whereas Elixir runs on the Erlang virtual machine, which was built with efficient concurrency in mind. But in CSP, there is no equivalent of Erlang OTP with built-in-support for fault-tolerance or direct support for parallelism. The actor community has concentrated on fault tolerance and distribution, and the CSP community on efficiency and expressiveness. Choosing between them, therefore, is largely a question of deciding which of these aspects is most important to you.








}