Reactive Swing and Observables Pt 4
tom, 2012-02-14

The first post in the series indicated that a simple path-finding visualization application was the catalyst for some mental stemming over the past week. I started off explaining the type off application I wanted, then focused on a library to get me there (Swing). Shortly thereafter, I saw a deficiency in Swing’s treatment of events, and decided to try my hand at fixing it via Clojure. That’s where we are now. In this post, I will finally start throwing some code at you.

My first step was to crack open the F# source code to get an idea of how they implemented first class events in .Net. Some functions were nearly direct ports to Clojure, but I had to lay some pipe in other areas before I could get a clojure implementation working.

This is a library that aims to port the (IMO) nice events-as-data infrastructure from F# into Clojure. I have most of the core library implemented, as well as a few extra convenience methods.

The basic abstraction is that observations are just a sequence of events, likely driven by IO. We model observations with observables and observers:

Observables serve as the notification mechanism for registering interested parties (observers) and notifying them of new observations.

Observers serve as function appliers (usually in the form of side-effects, but not always!).

We use protocols to define observables and observers, both of which can be easily extended to new data types or existing data. This is particularly useful for making “noisy” data, where we want to sync up some stateful structure (a model) with a view.

The primary motivation for this was a bad experience with event management in Java interop. It’s asinine, and effective at hiding the data that’s flowing around, i.e. Events. I remembered the nice stuff about F#, and its implementation of the Rx framework, which allows trivial composition of events. Why not implement it natively in clojure? The benefit is, we get a composable event framework for clojure, and it goes wherever clojure goes.

Note ->; this is a stateful library. Observers can be composed and routed using all of our favorite seq-related functions, like map, filter, etc., but they are implemented using a ref. This is fine if we’re dealing with IO, specifically nasty GUI stuff (which Java/Swing make even nastier due their interface-madness event implementation).

TBD: error-handling and stopping/disposal of handlers. There are probably really nasty corner cases I am missing that will come back to bite. F# does this via auto-detaching observers that fail gracefully.

Some comments are getting truncated, I apologize for this. The code is still readable/useful!


(defprotocol observer
  (update! [obs arg]
   "Inform the observer obs that arg has happened. This typically
    implies that obs will call a function with arg as its argument."))

(defprotocol observable
  (subscribe! [able obs] "Subscribe observer obs to observable able.")
  (notify! [able arg]
    "For each subscriber of observable able, call update! with arg.")
  (get-subscribers [able]
    "Return all subscribers associated with observable able")
  (clear-subscribers! [able]
    "Remove all subscribers from observable able."))

(defn make-observer
  "Given a function f, return an observable that maps f to arguments passed
   to it via update! calls."
  (reify observer (update! [obs arg] (f arg))))

(defn subscribe
  "Return the result of subscribing an anonymous observer, that
   update!s with f, through notify! arguments from observable origin."
  [f origin]
  (do (subscribe! origin (make-observer f)) origin))

(defn- default-subscribe [subscribers _ obs]
  (if (satisfies? observer obs)
      (dosync (alter subscribers conj obs))
      (throw (Exception. "not a valid observer!"))))

(defn make-observable
  "Primitive function for building new observables.  Used as an entry
   point for making generic observables (which can be notified using
   any type)."
   (let [subscribers (ref [])]
     (reify observable
       (subscribe! [able obs] (subscribef subscribers able obs))
       (notify! [able arg] ;(doall
       (doseq [o (deref subscribers)] (update! o arg)))
       (get-subscribers [able] subscribers)
       (clear-subscribers! [able] (dosync (ref-set subscribers []))))))
  ([] (make-observable default-subscribe)))

(defn- bind-observable
  "Bind an observable to a base observable.  We create specialized
   behavior via the subscriptionf function, which allows us to control
   the binding context.  We keep track of the base observable source
   (via reference) so that we can subscribe to observables at any
   level of composition."
  ([subscribef & base]
   (let [b (first base)
	 subscribers (if b (get-subscribers b) (ref []))]
     (reify observable
       (subscribe! [able obs] (subscribef subscribers able obs))
       (notify! [able arg] (doseq [o (deref subscribers)] (update! o arg)))
       (get-subscribers [able] subscribers)
       (clear-subscribers! [able] (dosync (ref-set subscribers []))))))
  ([] (make-observable default-subscribe)))

(defn map-obs
  "Map function f to the stream of observations arriving from origin.
   Returns a new observable."
  [f origin]
    (fn [subscribers _ obs]
      (subscribe! origin (make-observer #(update! obs (f %)))))

(defn splitmap-obs
  "Useful in conjunction with split and merge.  Takes either a single
   function, or 2 function args, and applies them to relative args in
   a vector of observables, returning a vector of new observables."
  ([fl fr [origin1 origin2]] [(map-obs fl origin1) (map-obs fr origin2)])
  ([f observers] (splitmap-obs f f observers)))

(defn multimap-obs
  "Return a sequence of observers that are the result of applying f
   via map-obs over each of them in turn."
  [f observercoll] (map (partial map-obs f) observercoll))

(defn choose-obs
  "Imported from the F# lib.  They use Some and None due to static typing, which
   are members of the Option type.  We have nil in clojure, so it's used here.
   Might be able to deprecate this, as it's basically a filter."
  [f origin]
    (fn [subscribers _ obs]
      (subscribe! origin
	 (make-observer (fn [arg] (if-let [v (f arg)] (update! obs v))))))

(defn filter-obs
  "Filter using a single argument function, filterf.  Returns an
   observable that fires for observations where (= (filterf arg) true)"
  [filterf origin]
  (choose-obs (fn [arg] (if (filterf arg) arg nil)) origin))

(defn partition-obs
  "Split the observation into a vector of 2 observables.  Similar to split-with in
   clojure's seq library.  The first or left observable in the vector fires on
   events that passed the filter, while the right fires on everything else."
  [filterf origin]
    [(filter-obs filterf origin) (filter-obs (comp not filterf) origin)])

;; untested
(defn reductions-obs
  "This is called scan in F#, but is akin to reductions over a
   sequence of events.  We maintain some state, inialized by init.
   f is a function of 2 args, state and the next value.  Each reduction
   replaces the old state with the value of the new reduction."
  [f init origin]
    (let [state (ref init)]
	(fn [subscribers _ obs]
	    (fn [v] (let [init @state
			  result (f init v)]
	       (dosync (alter state result)

;; somewhat tested
(defn merge-obs
  "Merge two observables into a single observable."
  ([origin1 origin2]
      (fn [subscribers able obs]
	(let [h1 (subscribe! origin1 (make-observer #(update! obs %)))
	      h2 (subscribe! origin2 (make-observer #(update! obs %)))]))
   ([[origin1 origin2]] (merge-obs origin1 origin2)))

;; untested
(defn multimerge-obs
  "Merge multiple observables into a single observable."
  (reduce merge-obs observercoll))

;; somewhat tested
(defn split-obs [f origin]
  [(choose-obs (fn [v] (if (f v) v nil)) origin)
   (choose-obs (fn [v] (if-not (f v) v nil)) origin)])

  ;; testing.  this is a simple simulation.....
  (defn balance [split & source]
    (let [signal (if (first source) (first source) (make-observable))]
      (->> signal
      (partition-obs #(<= % split))
	 (fn [l] [:low l])
	 (fn [r] [:high r]))
      (map-obs (fn [[vtype volume]]
	(let [msg (case vtype
		    :low "Volume is loooooow :"
		    :high "Volume is HIGH!!! :")]
	  (str msg volume)))))))

  ;; create a volume thingy....if volume goes over 50 it thinks its loud.
  (def b (->> (balance 50)
	   (subscribe println)))

  ;; feed a stream of "events", i.e. notifications to the observable
  (doseq [v (range 50)] (notify! b (rand-int 100)))