Diving into Clojure, parts two and three

Previously, I set up a Clojure environment and wrote a basic functioning HTTP mass downloader. Once I had something that worked, my next goals were to make it more idiomatic and to make it work in parallel. After finishing Programming Clojure, I was still uncertain how to proceed, and the online documentation for Clojure is mostly sparse reference material. I knew that the heavily callback-based code I’d written was neither good Clojure nor easy to follow and read, but how should I proceed from there? My gut feeling was that I needed to make better use of seqs.

To get some help, I went to the #clojure channel on Freenode. The dynamic there should be familiar to people who’ve spent much time in open source IRC channels – a lot of lurkers but only occasional bursts of discussion. There’s a few helpful bots which do things like looking up docs and running code snippets, and a lot of the discussion seems to be newbie-oriented. I asked for advice on my little program and I got plenty of feedback.

  • Paren style. The initial version of my program spread closing parens along multiple lines as if they were closing braces or ‘end’ keywords in Ruby. The Lisp way for doing this is to close all the parens at once and let your editor keep track of them for you.
  • Callback structure. Like I thought, the structure I was using could be cleaned up by composing functions and using seqs.

You can see the diff for my second revision here. I tore out the existing implementation of mass-download and urls-from-file. I rolled the file-reading functionality into mass-download as a temporary measure to make the code a bit more straightforward.

    (defn mass-download
      [url-file fetch-fn name-fn output-fn]
      (let [url-file "urls.txt"
            write-out (fn [url data] (output-fn (name-fn url) data))]
        (with-open [rdr (clojure.java.io/reader url-file)]
          (dorun (map (partial fetch-fn write-out) (line-seq rdr))))))

mass-download takes a file to read URLs from (url-file), a function which fetches a URL and passes on the data (fetch-fn), a function which converts URLs into filenames (name-fn), and a function which writes out data to a given filename (output-fn). It creates a helper function write-out which combines the naming and output functions. (This may be possible in a more terse form.) After opening the file, I build a sequence of URLs and apply the various functions to it. dorun then forces the seq to run, invoking the side effects of fetching and writing out the files.

This approach lets me clean up some of the other functions. For example, in-dir previously took another function, to which it yielded (falling back on Ruby parlance) a filename contained in a directory. The new version simply returns the new string.

Once I’d cleaned up the code, I went in to make it dirty again. This time, I wanted to parallelize the downloads. To do this, I need to spawn up multiple threads which perform downloads, but only one thread should be responsible for writing out files. This is because the eventual version of this tool will need to write out to a ZIP archive, and there’s not really any such thing as concurrent ZIP output. I made a few attempts at doing this with Clojure concurrency primitives (atoms, refs, vars, and agents) but wasn’t able to make any headway. Instead, I’ve implemented it using Java concurrency utilities. It’s not pretty, but it does work. First, I need to include a few Java classes:

    (ns mass-download.core
      (:use [clojure.string :only [join split blank?]])
      (:import [java.util.concurrent LinkedBlockingQueue Executors]))

I solved the easier problem first: serializing access to the output function. I suspect this is a place where I could make use of refs, but instead I just used an old-fashioned Queue and a worker thread:

    (defn package-with-worker [action]
      (let [
        queue (LinkedBlockingQueue.)
        worker (fn []
          (loop [entry (. queue take)]
            (when-not (blank? (first entry))
              (apply action entry)
              (recur (. queue take)))))
        worker-thread (Thread. worker)]
          (. worker-thread start)
            ([kw] (. queue put ["" ""]))
            ([filename data] (. queue put [filename data]))))))

This somewhat byzantine function takes an output function and returns a version with serialized access. It creates a queue and a thread which plucks work (filename + data pairs) off until it receives a blank filename (which is the ‘poison pill’ signalling the thread to stop). This is pretty much standard, stateful, imperative programming like I’d do in Java or Ruby – I can’t help but think there must be a better way. At this point, mass-download is invoked thus:

    (defn -main []
      (let [packager (package-with-worker store-to-file)]
        (mass-download "urls.txt"
                       (comp (in-dir "downloaded") only-basename)

Imperative approach aside, it’s really nice to be able to serialize access to a resource this transparently. None of the rest of the code had to be changed.

The next part is more difficult: distributing the download tasks out into threads. I tried some stuff with agents at first, but I ended up using Java executors just like shown on the Clojure website. The mass-download function starts off the same way:

(defn mass-download url-file fetch-fn name-fn output-fn (let url-file “urls.txt” write-out (fn url dataname-fn url) data)) (with-open rdr (clojure.java.io/reader url-file)

Using some unfortunately fairly ugly nested lambdas, I convert the seq of URLs into a seq of tasks, one for each URL:

    (let [tasks (map
                  (fn [t]
                    (fn []
                      ((partial fetch-fn write-out) t)))
                  (line-seq rdr))
          pool (Executors/newFixedThreadPool 10)]

This took a lot of doing – previous versions of this approach were quietly performing the downloads sequentially before the tasks were even dispatched. Now that I have all those URLs transformed into a sequence of quanta of work, I can dispatch them to workers using that thread pool.

      (.invokeAll pool tasks)
      (output-fn :done)
      (.shutdown pool)))))

.invokeAll conveniently dispatches a list of tasks to a pool and waits until they are all done. I use a special argument to output-fn to signal that all the URLs are downloaded and it should wrap up.

At this point I have a fully functional tool – I pass it a file full of URLs, it downloads them all concurrently, and it packages them all safely in a directory. The downloader, namer, and packager functions are all pluggable. There’s still a lot of room for improvement, though:

  • I want to implement ZIP packaging for the output. This will require more Java interop than I’ve done so far.
  • Bad URLs bomb the process out entirely – it doesn’t properly report the error or keep track of unretrievable URLs. I would want to be able to pass a callback for that eventuality.
  • The threading is entirely using Java libraries and imperative style. This works, but there’s probably a more Clojure-y (and functional) way.
  • Instead of passing a filename, I’d like to pass any seq. That way, I could extract URLs out of an XML document, or receive them from another part of the code, or anything else. Seqs appear to be the basic Clojure concept for expressing a series of units of work.
  • I’d like to package it up as a JAR and try talking to it from JRuby code.

I’ll be tackling at least one of these in the next update.