The CSP Library Core.Async with ClojureScript
Posted On November 3, 2016 by Shruthi S filed under Enterprise
We have earlier seen the “actor” concurrency model. An actor program consists of independent, concurrently executing entities (called actors in Akka, or processes in Erlang and Elixir) that communicate by sending each other messages. Each actor has a mailbox that stores messages until they’re handled. A program using the Communicating Sequential Processes –CSP- model similarly consists of independent, concurrently executing entities that communicate by sending each other messages. The difference is one of emphasis—instead of focusing on the entities sending the messages, CSP focuses on the channels over which they are sent. Channels are first class—instead of each process being tightly coupled to a single mailbox, channels can be independently created, written to, read from, and passed between processes. CSP is an old idea that’s experiencing a renaissance. CSP’s recent popularity is largely due to the Go language. The idea was first described by Tony Hoare—of Quick-sort fame—in his 1978 paper Communicating Sequential Processes. CSP has since been extended and implemented in several languages, the latest of which being Google's Go programming language. As David Nolen, the primary author and maintainer of ClojureScript, points out that the authors of Go language were dabbling with one concurrent language or other for more than two decades and finally hit a jackpot in Go. Rich Hickey, as pointed out in an earlier article, was not in favour of actor model, which in his opinion, couples the producer with the consumer. Rich Hickey has adopted Go’s implementation of CSP in core.async. Rich Hickey also acknowledged the influence of “Async” in C# which enables the programmer to write procedural/ synchronous code and the asynchronous execution of the same is effected through a state machine generated by the compiler. In core.async a similar state machine is created by macros.
Core.async: The core.async library provides two primary facilities—channels and go blocks. Go blocks allow multiple concurrent tasks to be efficiently multiplexed across a limited pool of threads. A channel is a thread-safe queue—any task with a reference to a channel can add messages to one end, and any task with a reference to it can remove messages from the other. Lisp macros can seem like magic, enabling dramatic code transformations. The go macro is particularly magical. Code within a go block is transformed into a state machine. Instead of blocking when it reads from or writes to a channel, the state machine parks, relinquishing control of the thread it’s executing on. When it’s next able to run, it performs a state transition and continues execution, potentially on another thread. This represents an inversion of control, allowing the core.async runtime to efficiently multiplex many go blocks over a limited thread pool.
As mentioned in the earlier article on ClojureScript, it is comparatively easier to use “figwheel” as development environment for ClojureScript apps if you have installed Leiningen build and dependency management tool. Execute in command prompt
lein new figwheel example
Necessary files and folders will be generated. Change to example directory and execute
Necessary dependencies will be downloaded and the default browser will open and show as under:
If you have a look at the generated “project.clj”-the configuration file of your project, you can see the downloaded dependencies include core.async. The executable code in /src/example/core.clj may be modified as under:
(ns example.core ;; -------1)
- Core.async and its macros are required.
- “chan” function creates a core.async channel. As mentioned previously, it can be thought of as a concurrent blocking queue and it is the main abstraction in the library. By default chan creates an un-buffered/synchronous channel. It will not allow you to put a message into it unless there is process to take it By specifying 1 to chan constructor we are creating a buffered channel of the size 2.Now the channel becomes asynchronous and it will permit messages to be written into it upto the size of the buffer without any process being ready to consume it. We will see more about buffered and un-buffered channels later.
- The functions >! and <! are used to put and take values from a channel, respectively. The caveat is that they have to be used inside a go block. We are putting the string “VENKATESA” in the channel. The put and take only return a channel. The value within the channel has to be extracted and we use js/console for the purpose.
- go is a macro that takes a body of expressions—which form a go block—and creates lightweight processes. This is where the magic happens. Inside a go block, any calls to >! and <! while waiting for values to be available in channels are not blocked but instead “parked”. Parking is a special type of blocking used internally in the state machine of core.async.
If you click F12 in the browser you can see it as under.
What is the advantage of using Go macros? Core.async also provides “thread” macro to be used with JVM where it is absolutely essential. But threads can be expensive. On the JVM, their default stack size is 512 kilobytes—configurable via the -Xss JVM start-up option. When developing a highly concurrent system, creating thousands of threads can quickly drain the resources of the machine the application is running on. core.async acknowledges this limitation and gives us lightweight processes. Internally, they do share a thread pool, but instead of wastefully creating a thread per go block, threads are recycled and reused when a put/take operation is waiting for a value to become available. The thread pool used by core.async defaults to the number of available processors x 2, + 42. So, a machine with eight processors will have a pool with 58 threads. Therefore, it is common for core.async applications to have dozens of thousands of lightweight processes. They are extremely cheap to create.
One more sample
Create one more figwheel project “exp”. Create a file “async.cljs” as under:
Modify the core.cljs to require the above file as under:
When you get
You can see the return value as
#object [cljs.core.async.impl.channels.ManyToManyChannel ;; only a channel is returned
If you go to the browser and click F12
You can see in the console, the value extracted from the channel “Ganesha”.
In this sample we will remove the function “retrieve-val” and the test function “enqueue-and-retrieve” from the async.cljs of the earlier sample. Instead we will modify our code to use a listener so that it is just always listening and does something whenever a new value is found.
lein new figwheel listen.
The modified async.cljs is given below
(defn listen ;; new function
1) This might seem like it would just loop forever, but remember that within core.async project's go blocks, functions like <! are essentially blocking.
So as long as <! doesn't find a new value on the channel, listen won't continue to loop.
When you see “cljs-user” in the prompt execute
cljs.user=> (listen.async/enque-val “Ganesh remover of Vigna” )
you will see the result
In the browser click F12 and you will see what was retrieved from the channel as shown below:
This shows that if we put a new value into our channel, it would be immediately read and logged by our background listener.
In the samples we defined a channel as under:
(def channel (async/chan 5))
If we don’t pass any number in the above definition what is returned is only a synchronous and unbuffered channel. Then, the channel will not allow anything to be published unless there is a waiting consumer. Note that this does not block the thread; it just means that the publisher will be parked, waiting, until such time as a consumer becomes available.The 5 in the above case is an optional argument that specifies how large we want the channel's buffer to be (that is, how many messages it can hold at one time).This is called fixed size buffer. It is fixed to a chosen number n, allowing producers to put items in the channel upto that number without having to wait for consumers. In addition to specifying the buffer's size, we can also choose what we want the buffer's strategy to be. For instance, we could use the dropping-buffer function to generate a buffer that drops new values when the buffer is full or we could use the sliding-buffer function to generate a buffer that drops the oldest value in the queue and retains the latest value being added when the buffer is full. If you don't set an explicit buffer strategy, core.async will use a fixed buffer. That a producer attempting to put a value in the channel will be parked when the buffer is full.
Transducer was introduced in Clojure 1.7. The Clojure documentation describes transducers as composable algorithmic transformations. Let's see why that is.
See the code below:
(->> (range 10)
(map inc) ;; creates a new sequence
(filter even?) ;; creates a new sequence
(prn "result is "))
;; "result is " (2 4 6 8 10)
The preceding snippet is straightforward and highlights an interesting property of what happens when we apply combinators to Clojure sequences: each combinator creates an intermediate sequence.In the previous example, we ended up with three in total: the one created by range, the one created by map, and finally the one created by filter. Most of the time, this won't really be an issue but for large sequences this means a lot of unnecessary allocation.
Using transducer, the previous example can be written like so:
(def xform ;; create a tranducer
(comp (map inc)
(filter even?))) ;; no intermediate sequence created
(->> (range 10)
(sequence xform) ;;apply the transducer created earlier
(prn "result is "))
;; "result is " (2 4 6 8 10)
In the new version, a whole range of the core sequence combinators, such as map and filter, have gained an extra arity: if you don't pass it a collection, it instead returns a transducer.In the previous example, (map inc) returns a transducer that knows how to apply the function inc to elements of a sequence. Similarly, (filter even?) returns a transducer that will eventually filter elements of a sequence. Neither of them do anything yet, they simply return functions.This is interesting because transducers are composable. We build larger and more complex transducers by using simple function composition:
Principally, transducers don't care about what the function does, the context of what's being built up, or the source of inputs.The most critical differentiating characteristics of transducers is that they don't care about the source of inputs. Other higher-order functions (map, filter, reduce, and so on) are critically tied to the collection (and hence sequence) abstractions. So one really awesome use of transducers is to apply a transformation to anything that goes through a core.async channel. We will see more about it later.
While using core.async, the first and foremost advantage is that you never lose control. The code even reads procedural. The go macro converts your easy code into a bunch of call-backs and coordinates them with a powerful state machine which will re-assemble them without ever losing control. It's all good-old call-backs and mutable state underground. But above ground, you can write a code that's easy to reason about. It's about regaining control of your asynchronous calls and not smearing your logic across your code in little bits contained in call-backs. The go block compares in speed very favorably to an Elixir process—a very impressive result given that Clojure runs on the JVM, whereas Elixir runs on the Erlang virtual machine, which was built with efficient concurrency in mind. But in CSP, there is no equivalent of Erlang OTP with built-in-support for fault-tolerance or direct support for parallelism. The actor community has concentrated on fault tolerance and distribution, and the CSP community on efficiency and expressiveness. Choosing between them, therefore, is largely a question of deciding which of these aspects is most important to you.