in which things are mapped, but also reducedA while ago Tim Bray had a project called the Wide Finder to collect implementations of a log parsing problem in different languages to see which ones made it easy to take advantage of massively-parallel hardware. The idea is to accept a web server log file and return statistics for which pages have been requested the most. Yesterday he posted a follow-up in Clojure using refs.
While his version is interesting for someone just getting into the language because it uses refs (probably the sexiest piece of the language), I think a map/reduce approach is a little more idiomatic since it can be done with no explicit state change.
I'll step through my implementation piece-by-piece. Note that it is naïve and could be optimized for speed at the expense of straightfowardness, especially with regard to reading from disk; my intent here is simply to explain the functional approach.
Update: Commenters have posted a version that totally
smokes my agent-based implementation in terms of performance and
simplicity. I'm leaving it up because I think it's a fun romp in
the land of higher-order functions; if you think having
higher-order functions means "I can pass a block
to Array#map in Ruby" then you're in for a treat.
(ns wide-finder "A basic map/reduce approach to the wide finder using agents. Optimized for being idiomatic and readable rather than speed." (:use [clojure.contrib.duck-streams :only [reader]]))
Every Clojure file opens with a call to the ns
macro. This defines a namespace (wide-finder) and
states any dependencies it may have on other namespaces, in this
case the reader function from
the duck-streams contrib library. Omitting
the :only clause would cause it to make all
the duck-streams vars available in this namespace,
but it's usually better to be explicit about what you need.
We'll start with the entry point. The find-widely
function below takes a filename and a number of agents to work
with. Agents can be thought of as independent asynchronous workers
that share a thread pool and keep a single state value. They are
initialized with the agent function that takes a
starting state. We'll be using each agent to keep a map of pages
to hit counts, so we map this function over a list
of n empty maps generated by repeat to
get our list of initialized agents.
(defn find-widely [filename n] (let [agents (map agent (repeat n {}))] (dorun (map #(send %1 count-line %2) (cycle agents) (line-seq (reader filename)))) (apply await agents) (apply merge-with + (map deref agents))))
Once the agents are initialized, we send them
work. Our line-seq sets up a lazy sequence of lines
from the file. We map over this sequence together with an infinite
loop of the agents we construct
using cycle. The map function can loop
over multiple sequences in parallel and will stop when the
shorter one runs out, so this is just a way of pairing each line
in the file with an agent in a round-robin fashion.
(dorun (map #(send %1 count-line %2) (cycle agents) (line-seq (reader filename))))
The code that does the mapping is the anonymous
function #(send %1 count-line %2), which adds a call
to the count-line function to the agent's internal
queue. This could be written in more traditional lambda form
as (fn [a line] (send a count-line line);
the #() form is simply
shorthand. The count-line function will be called
with two arguments: the agent's current value and the next line in
the seq. That function will return a new value for the agent.
(doseq [a agents] (await a))
Once all the work has been queued up, it's necessary to
call await on each agent to make sure it has a chance
to finish its queue. If we used map here, it would
be a no-op, since map merely creates a lazy sequence;
it does not actually perform the function calls until the value is
needed. Since it's not in the tail-call position, the value would
simply be discarded. Note that the same problem
would occur in the previous call to map, but wrapping it in
a dorun call forces the lazy seq to be evaluated.
(apply merge-with + (map deref agents))
Once that's done we can call deref on each agent to
get its value. But this gives us n maps rather than a
list of totals, so we need to merge the values.
merge-with is a special case of reduce
that assumes it works on a sequence of maps and merges key
collisions using the provided function, in this
case +. This gives us a map of page names to hit
counts.
(defn count-line [counts line] (if-let [[_ hit] (re-find #"GET /(\d+) " line)] (update-in counts [hit] inc-or-init) counts))
Finally we have the function that actually performs the
counting. As mentioned, it takes an agent's state (which is the current
map of pages to hits) and a line from the log file. If the line
matches the regex #"GET /(\d+) ", then we return an
updated version of the counts map that increments the
entry corresponding to the hit. The other interesting thing here
is that if-let uses destructuring: by
binding the return value of re-find to a vector form,
it splits the value in two. The first element (the full matched
string) is bound to the unused _ local, and the
second element (the match group corresponding to the actual page
path) is bound to hit.
(defn inc-or-init [i] (if i (inc i) 1))
The only piece left is the tiny inc-or-init function
that increments the counter given it but treats nil as zero. This
would be unnecessary if we could construct hash-maps with custom default
values, which a perusal of the implementation of PersistentHashMap.java
seems to indicate is supported, though it's not exposed anywhere
through a Clojure function. This is the piece of the puzzle that
I'm the least happy with, but it may be possible to eliminate it.
In any case, this is a simple example of how to break up a commonplace problem using a classic map/reduce strategy and immutable data structures. I have no idea how it compares in terms of performance to the other Wide Finder implementations since the logs I have for this site are not exactly the hundred-megabyte blockbuster kind of logs that ongoing enjoys, but the fact that it can be parallelized in twelve lines is a testament to the expressiveness of the language.
One improvement you could make--you don't explain the `update-in`, and I had to look at the clojure docs. I'm glad I know about it now. :-)
What is wrong with the straightforward implementation? Something like this:
I think your regexp is broken. It matches only pages, whose names are numbers. The one I'm using above is probably too permissive, but that's not the main point of this, right?
Why not simply use pmap, like so?
It's less code, easier to understand and I'm guessing more idiomatic. The only downside is you can't control the number of cores.
It's also substantially faster. Running them both with all 4 cores on my machine (the pmap version defaults to using all cores I assume) wrapped in a call to "time" I get this:
"Elapsed time: 32508.140029 msecs"
vs the agent approach which yields:
"Elapsed time: 297367.898428 msecs"
Both timings were taken after running each once before to prime the filesystem cache, etc.
Also both approaches are still heavily I/O bound and I haven't done anything to address the performance vs. straightforwardness tradeoffs so there's not really much to be gained over writing it to run on a single core with regular map, which yields:
"Elapsed time: 33095.086899 msecs"
Baishampayan: Nice; should have read the docs for await more closely.
Ivan: the regex was modified to work with URLs from my blog rather than Tim's.
Ivan and John: Wow; I had no idea how drastic the difference would be between pmap and agents. It's significantly simpler, which is a big win. I'm going to leave the agents version up since I think it's an interesting explanation of how functions get passed around and called with a different set of args depending on the context, but that's definitely interesting.
John: it would be interesting to compare CPU time usage between pmap and regular map rather than just wall-clock time to determine the bottleneck.
Anybody who wants to dive deeper on this can have an account on the wide-finder machine.
That'd be me. Unfortunately I don't get much time to hack on these types of things but if you want to email me at (my first name @ milo dot com) and let me know how to get at the full data set I'll try to dig deeper next time I find myself with some free time.
-
John
Anyway, here's the correct code:
http://pastie.org/pastes/717940
One of the problems is with the "apply" function, which will try to compute all the arguments before it calls "merge-with". A bit nicer solution would be something like:
(reduce (partial merge-with +) (pmap count-line (line-seq (reader filename))))
It would be interesting to see how this performs, but I don't have the time to play with Wide finder right now. Still I'm working on a similar problem and if I'm able to optimize it I'll post some results.