Alex's log 2009-12-13

Widefinder 2 with Clojure

Like many Clojurists, I’ve been pretty avidly following Tim Bray’s experiences. Recently Tim has been been trying to implement the Widefinder 2 benchmark and had apparently been struggling to get it to perform well. This was of course an irresistible challenge and I had to have a go at it myself.

I cooked up my own version which seemed to run pretty well on the only box I had handy to try it on (a quad-core Xeon) and then Tim kindly gave me access to the WF2 test machine a Sun T2000 with 32 hardware threads. Unsurprisingly it performed abysmally and was barely loading the server. However the help of Mark Triggs—who babysits Java apps for a living so is pretty handy with Solaris’ instrumentation and profiling tools—led to a couple tweaks to how I was doing IO and we got it running pretty well. How well? Better than Java and Scala. I hope that whets your appetite.

Update: Tim’s early reactions. And just to be clear I mean “faster” than the Scala and Java versions listed here. Clojure is written in Java so you can of course achieve the same (or possibly better) performance by doing exactly the same thing I did using another JVM language. This is more just an exploration of Clojure’s potential for performance, on a common real-world problem, than any serious attempt at competition.

The benchmark

The problem is a simple log parsing exercise. Given an Apache log from Tim’s website, generate a nice report of top ten most requested pages, referrers, client addresses and bytes served. Should be easy huh? The catch is the log file is 45 GB and Tim wants the report done by the time he gets back from refilling his coffee mug.

You can split the problem into three parts: parsing the log lines, tallying the various counts and outputting the report. It’s critical to parallelize the parsing and tallying. A fairly common approach to doing this is the shared-nothing or map-reduce method. We give each thread its own set of counters, have each process a different portion of the data and then merge all the counters together at the end. By doing this you don’t have to worry about locking or inter-thread communication. This works fine in virtually any programming language, but its a pretty boring way of doing things.

One of the main goals of Clojure is making local shared state concurrency safe, fast and easy, so I decided to go for a shared set of counters. At this point you’re probably thinking something along the lines of “argh… that’ll mean locks which are slow and easy to mess up”, but as we’ll see later Clojure has something better in store for us.

Parsing

My initial naive method of reading the log file was just to use read-lines grouped into chunks with partition and passing each chunk to a different thread. This worked fine on my quad-core but having only one of T2000’s many weak processors doing the read severely limited throughput. The solution of course is to have each thread do its own IO. I wanted to do this in the simplest way possible, however we can’t just split the file into evenly sized chunks as we’d end up splitting in the middle of a line. That’s where chunk-file comes in.

(defn chunk-file
  "Partitions a file into n line-aligned chunks.  Returns a list of start and
  end byte offset pairs."
  [filename n]
  (with-open [file (RandomAccessFile. filename "r")]
    (let [offsets (for [offset (range 0 (.length file) (/ (.length file) n))]
                    (do (when-not (zero? offset)
                          (.seek file offset)
                          (while (not= (.read file) (int \newline))))
                        (.getFilePointer file)))
          offsets (concat offsets [(.length file)])]
      (doall (partition 2 (interleave offsets (rest offsets)))))))

We first open the file for random read access and then use range to create a list of n evenly spaced positions into the file. We then seek to each of these in turn and roll forward until we encounter a line-break to ensure our offsets don’t occur in the middle of lines. Next tack the length of the file on the end as the last offset. Thus we finish up with a list looking something like (0 267 312 876). We then interleave it with itself minus the first element which gives us (0 267 267 312 312 876) and then partition by 2 to yield the desired start and end pairs ((0 267) (267 312) (312 876)). [Update: Matti points out in the comments that doing (partition 2 1 offsets) achieves the same thing without interleave.] The doall is necessary to realize the list before we close the file, as by default it’s lazily evaluated.

Right, now that we know what we’re reading how do we actually do it? The easiest way to read lines from a file is to well, use read-lines, but unfortunately we need to limit to our chunk. This calls for a new function, read-lines-range.

(defn read-lines-range [file start-byte end-byte]
  "Returns a lazy sequence of lines from file between start-byte and end-byte."
  (let [reader (-> (doto (FileInputStream. file)
                     (.skip start-byte))
                   (BufferedInputStream. 131072)
                   (InputStreamReader. "US-ASCII")
                   (BufferedReader.))]
    (letfn [(gobble-lines [remaining]
              (lazy-seq
               (if-let [line (and (pos? remaining) (.readLine reader))]
                 (cons line (gobble-lines (- remaining (.length line))))
                 (.close reader))))]
      (gobble-lines (- end-byte start-byte)))))

So the first thing we do is open our file. Normally in Clojure you’d just use the reader function from clojure.contrib.duck-streams to skip all the Java boilerplate but in this case I want to specify some additional options. The -> and doto macros save us from having to name each intermediate step and from the unreadable nesting you’d get if you tried to do the same thing in a typical curly-brace language.

BufferedReader reader = new BufferedReader(new InputStreamReader(new BufferedInputStream(
                          new FileInputStream(filename).skip(startByte), 131072), "US-ASCII"));

Oops, FileInputStream.skip() returns a long, so that doesn’t even work. Now who was complaining about Lisp syntax (looking (like (this)))?

We use a BufferedInputStream with a 128k buffer to increase the number of bytes we’re reading from the disk at a time. The reason we can’t just give the BufferedReader a large buffer is that InputStreamReader reads in 8192 byte blocks so we really need that buffer at the byte level. You can see if there’s excessive small reads happening on Solaris with sar -c 2 10. You can also use truss -p $pid on the process to look at them individually (the Linux equivalent is strace). Finally we set the encoding to US ASCII which saves us a little CPU time as we skip UTF-decoding.

The next couple of lines might look a bit unfamiliar to Clojure beginners so lets break them down.

(letfn [(gobble-lines [remaining]
          ...)]
  (gobble-lines (- end-byte start-byte)))

What’s this letfn thing? Has some sick and twisted person created a crossbreed of let and defn? Yes, that’s it exactly. letfn lets you define a named local function. Our function takes the number of bytes left to read, so we initially call it with all of them. Now onto the body of the local function.

(lazy-seq
 (if-let [line (and (pos? remaining)
                    (.readLine reader))]
   (cons line
         (gobble-lines (- remaining (.length line))))
   (.close reader)))

So if there’s some bytes remaining in our chunk we take a line from the reader. We then cons it on the front of the list of the rest of the lines. If there’s no more remaining bytes or we’ve reached the end of the file and .readLine returns nil we simply close the reader.

But hang on, isn’t this going to slurp the entire 45 GB file into memory in one go? Nope. See that lazy-seq macro wrapping the body? What it does is delays evaluating the code until you actually access that part of the list. Neat huh?

The careful reader will at this point be violently waving their arms trying to get my attention. “You’re calling .length on a string and assuming that equals the number of bytes! Localization! Character encodings!! Gnnyyyghh!!” Normally you’d be quite right, but remember I’m decoding as ASCII so this is a safe assumption to make. It also saves the expense of re-encoding the string. The more we can avoid touching every character in the file the better as doing so is our inner loop.

(defn #^"[Ljava.lang.String;" dumbest-split
  [#^String s c #^"[Ljava.lang.String;" tokens]
  (let [len (dec (int (alength tokens)))]
   (loop [start (int 0)
          i (int 0)]
     (let [idx (int (.indexOf s (int c) (int start)))]
       (if (or (neg? idx) (>= i len))
         (do (aset tokens i (.substring s start))
             tokens)
         (do (aset tokens i (.substring s start idx))
             (recur (inc idx) (inc i))))))))

Ewww! What’s that hideous mess? While profiling we discovered just using String.split to break our log lines into columns was a major bottleneck. You can see this in Tim Bray’s profiler output as well:

     Compiled + native   Method
 57.1%   148  +     0    java.util.regex.Matcher.search
 17.0%     1  +    43    java.util.regex.Pattern.split
  8.1%     0  +    21    clojure.lang.APersistentVector$Seq.reduce
  5.8%     3  +    12    java.util.regex.Pattern.matcher

String.split splits a string on a regular expression which is where those costly search and split calls are coming from. Obviously this needs improvement, so Mark and I had a bit of a competition to speed that function up as much as possible in a micro-benchmark. He first wrote a dumb-split, I then wrote dumber one and then he won with the final dumbest-split shown above.

String.split   1852.914773 msecs
dumb-split     1136.352235 msecs
dumber-split    691.229164 msecs
dumbest-split   418.799226 msecs

This function is a good example of Clojure letting you get down and dirty and switch to imperative style to optimize that innermost loop. Never write code like this unless it’s showing up in your profiling. Measure, don’t guess and don’t prematurely optimize or you’ll end up with horribly unreadable code.

Of note are the type hints #^String and #^"[Ljava.lang.String;". The latter means an array of Strings — I have no idea why the JVM uses this crappy syntax for arrays, but it does. Java programmers will be recognize it as the annoying output you get if you try to print an array. You might also notice the various int calls all over the place. These are used both as type-hints and to tell the Clojure compiler to use unboxed integers where possible.

Now onto the actual meat of the parser.

(defn parse-lines [lines]
  (let [ary (make-array String 12)]
   (for [#^String line lines
         :let [fields (dumbest-split line \space ary)
               status (aget fields 8)
               bytes (aget fields 9)
               #^String ref (aget fields 10)]
         :when (= (aget fields 5) "\"GET")
         :when (#{"200" "304" "404"} status)]
     {:client (aget fields 0)
      :url (aget fields 6)
      :status status
      :bytes (if (= bytes "-") 0 (Long/parseLong bytes))
      :ref (.substring ref 1 (dec (count ref)))})))

The parse-lines function takes a lazy sequence of lines, pulls out the fields we care about and returns a lazy sequence of maps. We allocate the array for the line splitting up front, which is probably overkill but we were getting really good performance improvements by optimizing the line splitting so we thought we’d do that as well. The for macro makes quick work of the various conditions saving us from an ugly map and filter nest.

Now you’re probably thinking “Wait, what? They went to all that effort using arrays and a dumb string splitting function and then go and stick the results in a hash-map? Isn’t that going to slow things down again?” No actually it’s not, and this is why you need to measure, not assume. For small maps (usually less than 8 entries) Clojure actually uses an array-map instead of a hash-map so no hashing or expensive lookups have to happen. Keywords are also interned so equality comparison of them is just a pointer comparison. This means that in practice small maps are lightning fast and you don’t need to sacrifice your readability by using an array or vector instead.

Tallying

Phew! 60 or so lines in and we’re halfway through the program. Fortunately all the boring parsing stuff and ugly low-level optimization is out of the way. Now for the concurrency fun. Let’s start off with the main function.

(def tallies [[:url-hits  "URIs by hit"]
              [:url-bytes "URIs by bytes" :megabytes]
              [:s404s     "404s"]
              [:clients   "client addresses"]
              [:refs      "referrers"]])

(def chunks 1000)

(defn wf-atoms [file]
  (let [state (zipmap (map first tallies) (repeatedly #(atom {})))]
    (dorun
     (pmap (fn [[idx [start end]]]
             (println (str "Chunk " idx "/" chunks " (" start " -> " end ")"))
             (->> (read-lines-range file start end)
                  (parse-lines)
                  (tally! state)))
           (indexed (chunk-file file chunks))))
    (time (report tallies state))
    (shutdown-agents)))

First we define some data: a vector of the various tallies and their reporting options and the number of chunks we’re going to break the file into. I just picked 1000 as it is large enough that we start a new chunk every few seconds so we get reasonably frequent status updates. It also means things are little more fine grained than the number threads, which means that if one thread is taking a little longer than the others for some reason when they finish the others can take over some its work.

Now the actual code. First thing we do is create an atom containing an empty map for each tally. zipmap takes a list of keys and a list of values and “zips” them together to create a hash-map. For the keys we give it the first column in tallies (:url-hits, :url-bytes and so on), and for the values we give in an infinite sequence of empty maps wrapped in atoms. zipmap will stop when either list runs out of elements. This means state ends up containing:

{:url-hits  (atom {})
 :url-bytes (atom {})
 :s404s     (atom {})
 :clients   (atom {})
 :refs      (atom {})}

For those of you unfamiliar with it, Clojure’s collections are all immutable. That means the only way to “add” something to them is to make a new collection with the extra item added. Clojure uses a lot of tricks under the hood to make this efficient, which we won’t get into today. But since our collections are immutable we need so way to be able change them across threads. This is where atoms come in.

Atoms are simply an atomic reference. To change them you use the swap! which takes an atom and function. swap! gets the current value of the atom, passes it to the function and replaces the contents of the atom with the return value of the function. To increment a counter inside an atom you simply do (swap! some-atom inc). To add an entry to a map you do (swap! some-atom assoc :key :value). If somebody else changed the atom in the meantime (while your function was running) the atom will detect it and retry. Thus atoms let us update Clojure’s collections from multiple threads without locking or even really having to think about thread-safety. Nice, huh?

     (pmap (fn [[idx [start end]]]
             (println (str "Chunk " idx "/" chunks " (" start " -> " end ")"))
             (->> (read-lines-range file start end)
                  (parse-lines)
                  (tally! state)))
           (indexed (chunk-file file chunks)))

Okay, so we take the list of byte offset ranges from chunk-file and use indexed to number each chunk. We then map over the chunks. For each chunk we first print out a status message. The reason I’m using str to concatenate the strings is to prevent them from getting mixed up together in the output as the prints happen in different threads. Then we use the nesting-reducing ->> macro to read the lines, parse them and send them off to a tally! function. Cool.

But hold up, didn’t I say something about multiple threads? Yes and if you look carefully you’ll see I’m using pmap instead of map, and no that’s not a typo. pmap is the parallel version of map, it farms the work out to a pool of worker threads. So yes, parallelizing your Clojure code can really be as simple as adding a single p character!

Right, we’ve seen chunk-file, read-lines-range and parse-lines already so let’s check out tally!.

(def article-re #"^/ongoing/When/\d\d\dx/\d\d\d\d/\d\d/\d\d/[^ .]+$")

(defn tally! [{:keys [url-hits url-bytes clients refs s404s]} records]
  (doseq [{:keys [url bytes client status #^String ref]} records]
    (if (= status "404")
      (bump! s404s url 1)
      (do (bump! url-bytes url bytes)
          (when (re-matches article-re url)
            (bump! url-hits url 1)
            (bump! clients client 1)
            (when-not (or (= ref "-")
                          (.startsWith ref "http://www.tbray.org/ongoing/"))
              (bump! refs ref 1)))))))

First there’s our article matching regular-expression and then we enter the function and—Woah! There’s a map in the argument list! There’s another one in the doseq. What’s this :keys business?

So you probably know that Clojure has a destructuring let which means you can write something like:

(let [[x y z] [1 2 3]]
  x)
;; => 1

But what you may not know is that you can destructure maps as well.

(let [{x :x, y :y} {:x 1, :y 2, :z 3}]
  y)
;; => 2

Handy. But if you use this a lot you’ll soon get fed up with having to write the binding x and the keyword :x over and over. Wouldn’t it be great if Clojure would just put the map entries for a particular keyword into a local of the same name? That’s exactly what :keys does!

(let [{:keys [x y]} {:x 1, :y 2, :z 3}]
  y)
;; => 2

So our that map in the argument list is just destructuring the state map into local bindings and the one in the doseq is destructuring the record. This saves us from having to write (:url-hits state) and (:url record) all over the place.

Right, so continuing we check the various conditions and bump some counters. Hang on, didn’t I say we’d be using swap! with the atoms? Let’s take a look at the hprof profiler output of the version that uses swap! directly.

rank   self  accum   count  trace method
   1 10.83% 10.83%    3180 300487 java.util.regex.Matcher.<init>
   2  8.92% 19.75%    2620 300482 wf$parse_lines__38$iter__40__44$fn__45.invoke
   3  7.14% 26.89%    2095 300531 clojure.lang.Atom.swap
   4  5.72% 32.61%    1678 300777 java.lang.String.length
   5  4.41% 37.02%    1295 300572 clojure.lang.Util.hash
   6  4.07% 41.09%    1195 300684 clojure.lang.Util.hash
   7  3.83% 44.92%    1125 300562 clojure.lang.Atom.swap
   8  3.69% 48.61%    1084 300627 wf$parse_lines__38$iter__40__44$fn__45.invoke
   9  3.65% 52.26%    1072 300446 java.io.FileInputStream.readBytes

Not bad, but there’s a bit of contention over the atoms causing swap! to show up and we’re also seeing a lot of hashing going on. What does the trace for those hash calls tell us?

TRACE 300572:
    clojure.lang.Util.hash(Util.java:55)
    clojure.lang.PersistentHashMap.assoc(PersistentHashMap.java:125)
    clojure.lang.PersistentHashMap.assoc(PersistentHashMap.java:28)
    clojure.lang.RT.assoc(RT.java:666)
    clojure.core$assoc__4268.invoke(core.clj:146)
    wf$update__57.invoke(wf2.clj:55)
    clojure.lang.Atom.swap(Atom.java:65)

TRACE 300684:
    clojure.lang.Util.hash(Util.java:55)
    clojure.lang.PersistentHashMap.valAt(PersistentHashMap.java:135)
    clojure.lang.RT.get(RT.java:642)
    clojure.core$get__4722.invoke(core.clj:981)
    wf$update__57.invoke(wf2.clj:55)
    clojure.lang.Atom.swap(Atom.java:65)

Hmm… so the retries from the atom contention are causing too many get and assoc calls on the hash-maps. How can we improve on this? Easy, if atoms are slowing you down you just need more of them! If we wrap each of the numbers in the values of the map in an atom then we can increment multiple entries simultaneously without causing retries. The only time we need to swap! on the outer atom is when we add a new entry.

In fact we can even do better than this as we don’t need the full functionality of an atom. Java provides a very nice AtomicLong class which has methods for atomically adding and incrementing, which is exactly what we need. So let’s take a look at bump!.

(defn bump! [map-atom #^String key #^Long delta]  
  (if-let [#^AtomicLong counter (get @map-atom key)]
    (.addAndGet counter delta)
    (swap! map-atom #(assoc % (String. key)
                            (if-let [#^AtomicLong counter (get % key)]
                              (AtomicLong. (+ (.get counter) delta))
                              (AtomicLong. delta))))))

The function takes an atom of the map for this particular counter, a key and an integer of how much we want to change it by. First we check if the map already contains our key. If it does, we simply increment the counter by the appropriate amount. If it doesn’t then we’ll need to add a new entry to the map.

There’s a couple important points about the anonymous function we’re passing to swap!. Notice how we check again whether there’s an entry in the map for this key? The reason we do that is two threads might try to add the same entry at the same time. The atom will cause one of them to retry but if it blindly goes ahead and overwrites the existing entry then we’re going to lose the existing count. Not good. This may seem subtle but there’s a simple rule of thumb which will ensure you always get the correct result: when using an atom your update function should depend solely on the current value passed to it, not any previously accessed version from surrounding code.

Another thing to note is that even if we find there’s an AtomicLong already there, we add its value to delta and replace it with a new AtomicLong. Why do we do this? Simple—an atom’s update function must always be a pure function, it can’t have any side effects. Incrementing the existing AtomicLong is a side-effect, creating a new one is not. If the atom retried several times and we kept adding to the counter we’d end up with a larger number than expected. We’d need to do the same thing if we were using a nested atom instead of an atomic long.

The final point to note is that when we create the map entry we create a copy of the key using the String copy constructor. This is not strictly necessary but it helps reduce our memory usage. Okay, I know, I know, you think I’m nuts. How can copying something reduce memory usage?! What you may not know about Java’s strings is that when you take a substring of one, nothing actually gets copied—the new string shares character data with the original string. This is normally a good thing as it can make your code much faster. However, it has the unfortunate side-effect that if you create a large string, take a substring of part of it and you then release any references you have to the original hoping it’ll be garbage collected, you’ll be in for a surprise. Since the substring internally holds a reference to the large string’s data it can’t be garbage collected! Thus by explicitly copying the key we only hold onto the part we need, the url, not the whole line from the input file.

Reporting

Finally the reporting part. First a couple of utility functions.

(defn truncate [s n]
  (if (> (count s) n)
    (str (.substring s 0 n) "...")
    s))

(defn sort-by-vals-desc [m]
  (sort-by #(- (val %)) m))

truncate takes a string and a number and shortens the string with ellipsis if it’s too long. short-by-vals-desc sorts the entries of a map by its values in descending order, simply by negating them. This could have been done in-line but I thought the intent was clearer with it named.

(defn print-top [sorted-results label & [megabytes?]]
  (println "Top" label)
  (let [fmt (if megabytes? " %9.1fM: %s" " %10d: %s")]
    (doseq [[k v] sorted-results]
      (let [v (if megabytes? (/ v 1024.0 1024.0) (long v))]
        (println (format fmt v (truncate k 60))))))
  (println))

print-top takes our sorted results and a label for them and an optional argument specifying that the numbers should be printed in megabytes. This is basically exactly the same as the Ruby version, we loop over the results and use format to print the numbers so they line up into columns and are easy to read.

(defn report [tallies state]
  (->> state
       (map (fn [tally rows] (str (count @rows) " " (name tally))))
       (join ", ")
       (println))
  (println)
  (->> tallies
       (pmap (fn [[tally & options]] 
               (cons (take 10 (sort-by-vals-desc @(state tally)))
                     options)))
       (map #(apply print-top %))
       (dorun)))

Finally the main reporting function. At this point I was pretty eager to test out how fast my implementation was so I didn’t spend much time thinking about it. The function takes our list of tallies and their labels and the current state of their values. Perhaps I should have stored these in the same list. Oh well. This might be good point to remind you what tallies and state look like.

;; tallies
[[:url-hits  "URIs by hit"]
 [:url-bytes "URIs by bytes" :megabytes]
 [:s404s     "404s"]
 [:clients   "client addresses"]
 [:refs      "referrers"]]

;; state
{:url-hits  (atom {})
 :url-bytes (atom {})
 :s404s     (atom {})
 :clients   (atom {})
 :refs      (atom {})}

First we print out a summary of the total number of rows in each tally map. This is pretty boring, we take the state, map over it producing strings that look like “73 url-hits”, join them together separated by commas. What’s that @ doing there? Our tallies are stored in atoms so we need to deref them to access them.

Next we need to print the top ten entries in each of our tallies. We map over them again. Note argument list of the map function? [[tally & options]] means we expect a single argument which is a list and we want to grab the first thing in the list and call it tally and the rest of the list we call options. We lookup tally in state and then sort the results and cons the top 10 on the front of options. At this stage options just consists of the tally’s label and the optional :megabyte format option, but since we’re just treating it as an opaque list this will be one less place we need to change in the code if we decide to add more options later.

Finally we map over the tallies again printing them out, using dorun to force evaluation. Hang on, why don’t I just put the call to print-top10 in the first map or change the whole thing into a doseq? If you were paying attention you’ll have noticed that our friend pmap sneaked in again. Sorting the results could takes a while (particularly for the clients and referrers tallies) so we use pmap to do them in parallel. We don’t want to print them in parallel however, as the output would get all jumbled up together!

Firing her up

So that’s it! We’re done. How did we fare? We weigh in at 110 lines of code, which is not bad. That’s little heavier than the Ruby version. Most of the extra weight comes from the functions we had to define to make reading fast. Java’s IO library always feels more of hassle than it should be and has some fundamental limitations, but it is very portable, has some excellent security features and generally deals well with character encoding. You can’t be perfect.

Now I know you’re dying to hear how it ran so let’s type the giant launch command of destiny and set it running on the big data.

$ time java -d64 -Xms4g -Xmx4g -verbose:gc -XX:+PrintGCDetails \
  -XX:+UseParallelGC -XX:+UseParallelOldGC -cp 'jars/*' \
  clojure.main -i wf2.clj -e '(wf/wf-atoms "O.all")'

We give it a 4 GB heap, turn on printing of GC details so we can see what’s happening with the GC in case we gave it too little or too much memory and turn on the parallel old gen collector. We’re hoping to avoid too many old gen collections but let’s turn it on just to be safe.

Chunk 475/1000 (21505830576 -> 21551105967)
Chunk 476/1000 (21551105967 -> 21596381394)
[GC [PSYoungGen: 1376800K->14816K(1382464K)] 3221973K->1870654K(4180032K),
0.0874210 secs] [Times: user=0.58 sys=0.27, real=0.09 secs]  
[GC [PSYoungGen: 1379424K->10848K(1382720K)] 3235262K->1879374K(4180288K),
0.0806451 secs] [Times: user=0.57 sys=0.25, real=0.08 secs]  
[GC [PSYoungGen: 1375456K->11072K(1383104K)] 3243982K->1888694K(4180672K),
0.0684203 secs] [Times: user=0.57 sys=0.22, real=0.07 secs]  

Halfway through and things are looking pretty sweet. We’re cruising along processing those chunks and the GC is working hard but happy.

But a little later… what’s this? Full GC? 20 seconds! Egads!

[Full GC [PSYoungGen: 216928K->0K(946176K)] [ParOldGen:
2413842K->1197654K(2797568K)] 2630770K->1197654K(3743744K) [PSPermGen:
11459K->11459K(28672K)], 20.1389928 secs] [Times: user=68.32 sys=17.38,
real=20.14 secs]  
[GC [PSYoungGen: 479232K->264960K(933888K)] 1676886K->1462614K(3731456K),
0.1677730 secs] [Times: user=5.02 sys=0.05, real=0.17 secs]  
Chunk 744/1000 (33684922076 -> 33730197430)
Chunk 745/1000 (33730197430 -> 33775472944)
[GC [PSYoungGen: 677760K->420096K(931392K)] 2836391K->2578727K(3728960K),
0.2414262 secs] [Times: user=7.41 sys=0.04, real=0.24 secs]  
[GC [PSYoungGen: 887040K->212928K(933888K)] 3045671K->2795799K(3731456K),
0.3684760 secs] [Times: user=11.23 sys=0.05, real=0.37 secs]  
[Full GC [PSYoungGen: 212928K->0K(933888K)] [ParOldGen:
2582871K->1230342K(2797568K)] 2795799K->1230342K(3731456K) [PSPermGen:
11459K->11459K(24576K)], 2.7819309 secs] [Times: user=35.11 sys=7.00,
real=2.78 secs]  

After that the GC settles down a little but the times are still high. Oh well, what’s the final time?

real    13m16.610s
user    258m0.907s
sys     28m33.799s

Wow! Not bad. That puts us ahead of the Scala version and just behind the Java version. Surely we can do better though.

Revisiting reporting

The first time through I skimped on the reporting stage a bit, it was taking over minute just to calculate the top 10s (even though we parallelized it a little). I was too eager to get things running and so got lazy and just sorted the whole maps. The reference Ruby version actually does something a little cleverer:

class Hash
  def top(howmany)
    count = 0
    keys = []
    min = 0
    self.each do |key, val|
      if val > min
        keys << key
        keys.sort! do |k1, k2|
          diff = self[k2] - self[k1]
          (diff != 0) ? diff : k1 <=> k2
        end
        keys.pop if keys.length > howmany
        min = self[keys[-1]]
      end
    end
    keys
  end
end

This is a simple selection algorithm. It does one pass over the data and keeps a list of the greatest 10 elements its found so far. Much quicker than simply sorting it. I had a poke around in Contrib and the Java standard library hoping to find a builtin implementation but came up short. The closest was greatest-least but it only finds all the equally greatest or least values not the top n. Oh well, I could go lookup a really good performing one but since we only want 10 values we may as well follow the Ruby version’s lead and keep things simple.

(defn take-greatest-vals [n m]
  (when-let [entries (seq m)]
    (reduce (fn [best x]
              (if (>= (val x) (val (last best)))
                (vec (take n (sort-by-vals-desc (conj best x))))
                best))
           [(first entries)] (rest entries))))

Hey look everybody, it’s our friend reduce. First we convert the map to a seq and cover the corner-case of the map being empty. Our reduction function takes the best n entries so far (starting with just the first entry) and the next entry x of our map m. We check whether the value of x is greater or equal to the last element of best and if so conjoin it onto best. We then sort best and take a new best n and put it into a vector so accessing the last element is fast. In the case where x is too wimpy to join our elite best list we simply skip it. Stupid and boring but probably good enough.

Second lap

A little fiddling with the buffer sizes, pumping the BufferedInputStream up to 1mb and reducing the chunk size to 32mb and we’re ready to try again. The GC seems happier this time, although I didn’t try to do anything to satisfy it. Perhaps the increased buffer size helped.

real    10m8.196s
user    260m17.137s
sys     7m9.457s

Vroooom! We just left Java in the dust and crept past the single-threaded C version.

Pushing it to the limit

While working on this writeup I got a bit distracted and started fiddling with the buffer sizes some more and then noticed that a number of the threads in the pool were occasionally showed up as waiting. A handy feature of the JVM is you can send it a QUIT signal at any time using kill -QUIT $pid or just pressing Ctrl+\ and it’ll print out a stack trace for each thread and some memory information.

"pool-2-thread-35" prio=3 tid=0x00000001026dac00 nid=0x4d waiting on condition [0xfffffffe59cfe000..0xfffffffe59cff740]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0xfffffffe754005b8> (a java.util.concurrent.SynchronousQueue$TransferStack)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
        at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:424)
        at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:323)
        at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:874)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:945)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
        at java.lang.Thread.run(Thread.java:619)

You could also see this in the output of prstat -mvL, they were spending 15-20% of their time asleep. Lazy slobs.

  PID USERNAME USR SYS TRP TFL DFL LCK SLP LAT VCX ICX SCL SIG PROCESS/LWPID 
  ...
26161 ato       77 1.8 0.0 0.0 0.0 4.1  15 2.3  35  53  6K   1 java/71
26161 ato       76 1.7 0.1 0.0 0.0  17 4.0 0.5  27 116  6K   1 java/69
26161 ato       75 1.9 0.0 0.0 0.0 5.4  16 1.5  24  53  6K   1 java/67
26161 ato       75 1.6 0.0 0.0 0.0 5.4  16 1.9  33  94  6K   0 java/43
26161 ato       75 1.6 0.1 0.0 0.0 5.4  16 2.0  40 115  6K   1 java/50
26161 ato       74 1.9 0.0 0.0 0.0  12  10 2.8  22  34  6K   0 java/74
26161 ato       74 1.8 0.0 0.0 0.0  20 4.7 0.1  12  59  6K   1 java/70
26161 ato       73 1.7 0.0 0.0 0.0 4.8  19 1.8  19  96  6K   1 java/56
26161 ato       72 1.6 0.0 0.0 0.0 4.0  20 1.7  19 108  6K   0 java/65

I was pretty baffled as to why this was happening until Mark pointed out the documentation for pmap.

Like map, except f is applied in parallel. Semi-lazy in that the parallel computation stays ahead of the consumption, but doesn’t realize the entire result unless required. Only useful for computationally intensive functions where the time of f dominates the coordination overhead.

Ah ha! So pmap tries not to get too far ahead. Normally you want this behavior as you don’t want to fill up your memory with the result list, but we’re returning nothing. So while pmap is quick and easy, perhaps its not the best tool here, let’s try replacing it with a bunch of threads sitting on a LinkedBlockingQueue which is filled by the result of chunk-file.

real    8m4.663s
user    197m36.624s
sys     8m2.930s

Woah.

Final thoughts

Clojure can be very, very fast and its concurrency model definitely does help. It’s not a silver bullet but it does make shared-state concurrency probably as easy and safe as it can be without affecting performance. Deadlocks are impossible and since everything is immutable by default, thread-safety is much less of concern. You still have to think about thread-safety just as you still have to think about memory with garbage collection, but in makes it significantly easier to get right.

Building on top of the JVM instead of reinventing the wheel is definitely a benefit but it does have some drawbacks too. You can see that most of the real effort that went into writing this was working around performance limitations of Java’s IO library. I tried other things like NIO as well, but it was slower and made the code much complex. We can’t compete with a lower-level language like C or C++ on this sort of data crunching as C code can do a number of nice tricks that the JVM keeps us from in the name of portability. On the other hand the Clojure version is much less code and runs anywhere Java does. Hopefully as Clojure matures someone who really knows how to use Java’s IO library well will come along and write a high-performance line-oriented IO library for exactly this sort of processing.

It’s often said that Clojure is an opinionated language. Clojure takes a stand between the opposing camps of mutable object-oriented and immutable functional programming and says, “Oi! You’ve each got some things right, but you’ve both also got problems. Here’s what you should do instead…” It combines the two not in an indecisive “multi-paradigm” way of trying to take every single feature from both but instead very selectively picks and chooses and mixes things up in its own unique way. It’s easy to dismiss Clojure by saying there’s nothing completely new in it, but doing so misses the forest for the trees. Clojure’s strength lies in its simplicity and cohesiveness of vision, not in any single feature. It’s still early days yet, Clojure has not even had its second release and there are still a few warts showing but its already got a prime place in my toolbox.

The code

The code for the version I walked you through is wf2-good.clj and the version with those last few tweaks which I haven’t cleaned up yet is wf2-faster.clj. Please note that I’ve only been using Clojure on and off for a couple of months. I’m probably still considered a newbie, so don’t assume this is a shining example of how Clojure should be written.

Comments