in which things are mapped, but also reduced



A while ago Tim Bray had a project called the Wide Finder to collect implementations of a log parsing problem in different languages to see which ones made it easy to take advantage of massively-parallel hardware. The idea is to accept a web server log file and return statistics for which pages have been requested the most. Yesterday he posted a follow-up in Clojure using refs.

While his version is interesting for someone just getting into the language because it uses refs (probably the shiniest piece of the language), I think a map/reduce approach is a little more idiomatic since it can be done with no explicit state change.

I'll step through my implementation piece-by-piece. Note that it is naïve and could be optimized for speed at the expense of straightfowardness, especially with regard to reading from disk; my intent here is simply to explain the functional approach.

Update: Commenters have posted a version that totally smokes my agent-based implementation in terms of performance and simplicity. I'm leaving mine up because I think it's a fun romp in the land of higher-order functions; if you think having higher-order functions means "I can pass a block to Array#map in Ruby" then you're in for a treat. If you are looking for a hard-core walkthrough of high-performance coding, your best bet is Alex Osborne's blog post on Wide Finder.

(ns wide.finder
  "A basic map/reduce approach to the wide finder using agents.
  Optimized for being idiomatic and readable rather than speed."
  (:use [ :only [reader]]))

Every Clojure file opens with a call to the ns macro. This defines a namespace (wide.finder) and states any dependencies it may have on other namespaces, in this case the reader function from the namespace. Omitting the :only clause would cause it to make all the io vars available in this namespace, but it's usually better to be explicit about what you need.

We'll start with the entry point. The find-widely function below takes a filename and a number of agents to work with. Agents can be thought of as independent asynchronous workers that share a thread pool and keep a single state value. They are initialized with the agent function that takes a starting state. We'll be using each agent to keep a map of pages to hit counts, so we map this function over a list of n empty maps generated by repeat to get our list of initialized agents.

(defn find-widely [filename n]
  (let [agents (map agent (repeat n {}))]
    (dorun (map #(send %1 count-line %2)
                (cycle agents)
                (line-seq (reader filename))))
    (apply await agents)
    (apply merge-with + (map deref agents))))

Once the agents are initialized, we send them work. Our line-seq sets up a lazy sequence of lines from the file. We map over this sequence together with an infinite loop of the agents we construct using cycle. The map function can loop over multiple sequences in parallel and will stop when the shorter one runs out, so this is just a way of pairing each line in the file with an agent in a round-robin fashion.

(dorun (map #(send %1 count-line %2)
                (cycle agents)
                (line-seq (reader filename))))

The code that does the mapping is the anonymous function #(send %1 count-line %2), which adds a call to the count-line function to the agent's internal queue. This could be written in more traditional lambda form as (fn [a line] (send a count-line line); the #() form is simply shorthand. The count-line function will be called with two arguments: the agent's current value and the next line in the seq. That function will return a new value for the agent.

(doseq [a agents] (await a))

Once all the work has been queued up, it's necessary to call await on each agent to make sure it has a chance to finish its queue. If we used map here, it would be a no-op, since map merely creates a lazy sequence; it does not actually perform the function calls until the value is needed. Since it's not in the tail-call position, the value would simply be discarded. Note that the same problem would occur in the previous call to map, but wrapping it in a dorun call forces the lazy seq to be evaluated.

(apply merge-with + (map deref agents))

Once that's done we can call deref on each agent to get its value. But this gives us n maps rather than a list of totals, so we need to merge the values. merge-with is a special case of reduce that assumes it works on a sequence of maps and merges key collisions using the provided function, in this case +. This gives us a map of page names to hit counts.

(defn count-line [counts line]
  (if-let [[_ hit] (re-find #"GET /(\d+) " line)]
    (update-in counts [hit] inc-or-init)

Finally we have the function that actually performs the counting. As mentioned, it takes an agent's state (which is the current map of pages to hits) and a line from the log file. If the line matches the regex #"GET /(\d+) ", then we return an updated version of the counts map that increments the entry corresponding to the hit. The other interesting thing here is that if-let uses destructuring: by binding the return value of re-find to a vector form, it splits the value in two. The first element (the full matched string) is bound to the unused _ local, and the second element (the match group corresponding to the actual page path) is bound to hit.

(defn inc-or-init [i]
  (if i (inc i) 1))

The only piece left is the tiny inc-or-init function that increments the counter given it but treats nil as zero. This would be unnecessary if we could construct hash-maps with custom default values, which a perusal of the implementation of seems to indicate is supported, though it's not exposed anywhere through a Clojure function. This is the piece of the puzzle that I'm the least happy with, but it may be possible to eliminate it.

In any case, this is a simple example of how to break up a commonplace problem using a classic map/reduce strategy and immutable data structures. I have no idea how it compares in terms of performance to the other Wide Finder implementations since the logs I have for this site are not exactly the hundred-megabyte blockbuster kind of logs that ongoing enjoys, but the fact that it can be parallelized in twelve lines is a testament to the expressiveness of the language.

« older | 2009-11-14T04:29:30Z | newer »