Technomancy in which things are mapped, but also reduced

A while ago Tim Bray had a project called the Wide Finder to collect implementations of a log parsing problem in different languages to see which ones made it easy to take advantage of massively-parallel hardware. The idea is to accept a web server log file and return statistics for which pages have been requested the most. Yesterday he posted a follow-up in Clojure using refs.

While his version is interesting for someone just getting into the language because it uses refs (probably the sexiest piece of the language), I think a map/reduce approach is a little more idiomatic since it can be done with no explicit state change.

I'll step through my implementation piece-by-piece. Note that it is naïve and could be optimized for speed at the expense of straightfowardness, especially with regard to reading from disk; my intent here is simply to explain the functional approach.

Update: Commenters have posted a version that totally smokes my agent-based implementation in terms of performance and simplicity. I'm leaving it up because I think it's a fun romp in the land of higher-order functions; if you think having higher-order functions means "I can pass a block to Array#map in Ruby" then you're in for a treat.

(ns wide-finder
  "A basic map/reduce approach to the wide finder using agents.
  Optimized for being idiomatic and readable rather than speed."
  (:use [clojure.contrib.duck-streams :only [reader]]))

Every Clojure file opens with a call to the ns macro. This defines a namespace (wide-finder) and states any dependencies it may have on other namespaces, in this case the reader function from the duck-streams contrib library. Omitting the :only clause would cause it to make all the duck-streams vars available in this namespace, but it's usually better to be explicit about what you need.

We'll start with the entry point. The find-widely function below takes a filename and a number of agents to work with. Agents can be thought of as independent asynchronous workers that share a thread pool and keep a single state value. They are initialized with the agent function that takes a starting state. We'll be using each agent to keep a map of pages to hit counts, so we map this function over a list of n empty maps generated by repeat to get our list of initialized agents.

(defn find-widely [filename n]
  (let [agents (map agent (repeat n {}))]
    (dorun (map #(send %1 count-line %2)
                (cycle agents)
                (line-seq (reader filename))))
    (apply await agents)
    (apply merge-with + (map deref agents))))

Once the agents are initialized, we send them work. Our line-seq sets up a lazy sequence of lines from the file. We map over this sequence together with an infinite loop of the agents we construct using cycle. The map function can loop over multiple sequences in parallel and will stop when the shorter one runs out, so this is just a way of pairing each line in the file with an agent in a round-robin fashion.

(dorun (map #(send %1 count-line %2)
                (cycle agents)
                (line-seq (reader filename))))

The code that does the mapping is the anonymous function #(send %1 count-line %2), which adds a call to the count-line function to the agent's internal queue. This could be written in more traditional lambda form as (fn [a line] (send a count-line line); the #() form is simply shorthand. The count-line function will be called with two arguments: the agent's current value and the next line in the seq. That function will return a new value for the agent.

(doseq [a agents] (await a))

Once all the work has been queued up, it's necessary to call await on each agent to make sure it has a chance to finish its queue. If we used map here, it would be a no-op, since map merely creates a lazy sequence; it does not actually perform the function calls until the value is needed. Since it's not in the tail-call position, the value would simply be discarded. Note that the same problem would occur in the previous call to map, but wrapping it in a dorun call forces the lazy seq to be evaluated.

(apply merge-with + (map deref agents))

Once that's done we can call deref on each agent to get its value. But this gives us n maps rather than a list of totals, so we need to merge the values. merge-with is a special case of reduce that assumes it works on a sequence of maps and merges key collisions using the provided function, in this case +. This gives us a map of page names to hit counts.

(defn count-line [counts line]
  (if-let [[_ hit] (re-find #"GET /(\d+) " line)]
    (update-in counts [hit] inc-or-init)
    counts))

Finally we have the function that actually performs the counting. As mentioned, it takes an agent's state (which is the current map of pages to hits) and a line from the log file. If the line matches the regex #"GET /(\d+) ", then we return an updated version of the counts map that increments the entry corresponding to the hit. The other interesting thing here is that if-let uses destructuring: by binding the return value of re-find to a vector form, it splits the value in two. The first element (the full matched string) is bound to the unused _ local, and the second element (the match group corresponding to the actual page path) is bound to hit.

(defn inc-or-init [i]
  (if i (inc i) 1))

The only piece left is the tiny inc-or-init function that increments the counter given it but treats nil as zero. This would be unnecessary if we could construct hash-maps with custom default values, which a perusal of the implementation of PersistentHashMap.java seems to indicate is supported, though it's not exposed anywhere through a Clojure function. This is the piece of the puzzle that I'm the least happy with, but it may be possible to eliminate it.

In any case, this is a simple example of how to break up a commonplace problem using a classic map/reduce strategy and immutable data structures. I have no idea how it compares in terms of performance to the other Wide Finder implementations since the logs I have for this site are not exactly the hundred-megabyte blockbuster kind of logs that ongoing enjoys, but the fact that it can be parallelized in twelve lines is a testament to the expressiveness of the language.

« older | 2009-11-14T04:29:30Z | newer »

Andy Kish2009-11-14T09:12:33Z
Awesome post. There's a fair amount of discussion of clojure online these days, but not enough idiomatic code!

One improvement you could make--you don't explain the `update-in`, and I had to look at the clojure docs. I'm glad I know about it now. :-)
Baishampayan Ghose2009-11-14T22:03:59Z
Great post. By the way, you could have replaced the (doseq [a agents] (await a)) part with (apply await agents). Much cleaner, IMHO :)
Ivan Toshkov – 2009-11-15T09:46:16Z

What is wrong with the straightforward implementation? Something like this:



(def line-re #"GET /([^ ]*) ")

(defn match-line [line]
(if-let [[_ hit] (re-find line-re line)]
{hit 1}))

(defn find-widely-2 [filename]
(apply merge-with + (pmap match-line (line-seq (reader filename)))))


I think your regexp is broken. It matches only pages, whose names are numbers. The one I'm using above is probably too permissive, but that's not the main point of this, right?

John2009-11-16T07:11:04Z

Why not simply use pmap, like so?



(ns my-wide-finder
"A basic map/reduce approach to the wide finder using agents.
Optimized for being idiomatic and readable rather than speed.
NOTE: Originally from:
http://technomancy.us/130
but updated to use pmap."
(:use [clojure.contrib.duck-streams :only [reader]]))

(def re #"GET /(\d+) ")

(defn count-line
"Increment the relevant entry in the counts map."
[line]
(if-let [[_ hit] (re-find re line)]
{hit 1}
{}))

(defn my-find-widely
"Return a map of pages to hit counts in filename."
[filename]
(apply merge-with +
(pmap count-line (line-seq (reader filename)))))


It's less code, easier to understand and I'm guessing more idiomatic. The only downside is you can't control the number of cores.



It's also substantially faster. Running them both with all 4 cores on my machine (the pmap version defaults to using all cores I assume) wrapped in a call to "time" I get this:



"Elapsed time: 32508.140029 msecs"



vs the agent approach which yields:



"Elapsed time: 297367.898428 msecs"



Both timings were taken after running each once before to prime the filesystem cache, etc.



Also both approaches are still heavily I/O bound and I haven't done anything to address the performance vs. straightforwardness tradeoffs so there's not really much to be gained over writing it to run on a single core with regular map, which yields:



"Elapsed time: 33095.086899 msecs"

Phil2009-11-16T19:37:15Z

Baishampayan: Nice; should have read the docs for await more closely.



Ivan: the regex was modified to work with URLs from my blog rather than Tim's.



Ivan and John: Wow; I had no idea how drastic the difference would be between pmap and agents. It's significantly simpler, which is a big win. I'm going to leave the agents version up since I think it's an interesting explanation of how functions get passed around and called with a different set of args depending on the context, but that's definitely interesting.



John: it would be interesting to compare CPU time usage between pmap and regular map rather than just wall-clock time to determine the bottleneck.

SI Hawakaya – 2009-11-20T15:06:21Z
Spelling: "naïve", not "naïeve".
Tim Bray2009-11-21T17:23:17Z
Hey, commenter John, it'd be nice to know who you are. I ran your elegant ultra-minimal code on the Wide Finder machine and it started out pretty well, using on average 5 or 6 of the cores. But eventually it got stuck in gc hell or some other problem (I wasn't running a lot of diagnostics) so that it had gone single-threaded, so eventually I killed it. It had burned 32 CPU-hours in about 16 hours elapsed and I don't know how far it got.



Anybody who wants to dive deeper on this can have an account on the wide-finder machine.
John2009-11-26T08:36:50Z
Hey Tim,

That'd be me. Unfortunately I don't get much time to hack on these types of things but if you want to email me at (my first name @ milo dot com) and let me know how to get at the full data set I'll try to dig deeper next time I find myself with some free time.

-
John
John2009-11-26T08:43:24Z
P.S. Just for reference sake, here is the faster version I mentioned to you on #clojure which batches chunks of 20 URLs per unit of work:

http://pastie.org/715782
John2009-11-28T07:57:21Z
Oops, provided the wrong version of the chunked code before, I think (can't see the comment yet, but I'm pretty sure...)

Anyway, here's the correct code:

http://pastie.org/pastes/717940
Ivan Toshkov – 2009-12-01T13:21:40Z
@Tim Bray: I'm not "John", but my solution was virtually identical with his, so I'll comment a bit. The implementation was supposed to be idiomatic, rather than optimized for speed or memory usage or anything else.

One of the problems is with the "apply" function, which will try to compute all the arguments before it calls "merge-with". A bit nicer solution would be something like:

(reduce (partial merge-with +) (pmap count-line (line-seq (reader filename))))

It would be interesting to see how this performs, but I don't have the time to play with Wide finder right now. Still I'm working on a similar problem and if I'm able to optimize it I'll post some results.

Comments are disabled on older posts.