(ns wf (:use (clojure.contrib duck-streams seq-utils [str-utils2 :only [join]])) (:import java.util.concurrent.atomic.AtomicLong)) (set! *warn-on-reflection* true) ;;;; Reading (defn chunk-file "Partitions a file into n line-aligned chunks. Returns a list of start and end byte offset pairs." [filename n] (with-open [file (java.io.RandomAccessFile. filename "r")] (let [offsets (for [offset (range 0 (.length file) (/ (.length file) n))] (do (when-not (zero? offset) (.seek file offset) (while (not= (.read file) (int \newline)))) (.getFilePointer file))) offsets (concat offsets [(.length file)])] (doall (partition 2 (interleave offsets (rest offsets))))))) (defn read-lines-range [file start-byte end-byte] "Returns a lazy sequence of lines from file between start-byte and end-byte." (let [reader (-> (doto (java.io.FileInputStream. file) (.skip start-byte)) (java.io.BufferedInputStream. (* 8 131072)) (java.io.InputStreamReader. "US-ASCII") (java.io.BufferedReader. 131072))] (letfn [(read-line [remaining] (lazy-seq (if-let [line (and (pos? remaining) (.readLine reader))] (cons line (read-line (- remaining (.length line)))) (.close reader))))] (read-line (- end-byte start-byte))))) (defn #^"[Ljava.lang.String;" dumbest-split [#^String s c #^"[Ljava.lang.String;" tokens] (let [len (dec (int (alength tokens)))] (loop [start (int 0) i (int 0)] (let [idx (int (.indexOf s (int c) (int start)))] (if (or (neg? idx) (>= i len)) (do (aset tokens i (.substring s start)) tokens) (do (aset tokens i (.substring s start idx)) (recur (inc idx) (inc i)))))))) (defn parse-lines [lines] (let [ary (make-array String 12)] (for [#^String line lines :let [fields (dumbest-split line \space ary) status (aget fields 8) bytes (aget fields 9) #^String ref (aget fields 10)] :when (= (aget fields 5) "\"GET") :when ('#{"200" "304" "404"} status)] {:client (aget fields 0) :url (aget fields 6) :status status :bytes (if (= bytes "-") 0 (Long/parseLong bytes)) :ref (.substring ref 1 (dec (count ref)))}))) ;;;; Tallying (defn bump! [map-atom #^String key #^Long delta] (if-let [#^AtomicLong counter (get @map-atom key)] (.addAndGet counter delta) (swap! map-atom #(assoc % (String. key) (if-let [#^AtomicLong counter (get % key)] (AtomicLong. (+ (.get counter) delta)) (AtomicLong. delta)))))) (def article-re #"^/ongoing/When/\d\d\dx/\d\d\d\d/\d\d/\d\d/[^ .]+$") (defn tally! [{:keys [url-hits url-bytes clients refs s404s]} records] (doseq [{:keys [#String url bytes client status #^String ref]} records] (if (= status "404") (bump! s404s url 1) (do (bump! url-bytes url bytes) (when (when (.startsWith url "/ongoing/When/") (re-matches article-re url)) (bump! url-hits url 1) (bump! clients client 1) (when-not (or (= ref "-") (.startsWith ref "http://www.tbray.org/ongoing/")) (bump! refs ref 1))))))) ;;;; Reporting (defn truncate [s n] (if (> (count s) n) (str (.substring s 0 n) "...") s)) (defn print-top10 [results label & [shrink?]] (println "Top" label) (let [fmt (if shrink? " %9.1fM: %s" " %10d: %s")] (doseq [[k v] (take 10 results)] (let [v (if shrink? (/ v 1024.0 1024.0) (long v))] (println (format fmt v (truncate k 60)))))) (println)) (defn sort-by-vals-desc [m] (sort-by #(- (val %)) m)) (defn take-greatest-vals [n m] (when-let [m (seq m)] (reduce (fn [best x] (if (>= (val x) (val (last best))) (vec (take n (sort-by-vals-desc (conj best x)))) best)) [(first m)] (rest m)))) (defn report [tallies state] (->> state (map (fn [[tally rows]] (str (count @rows) " " (name tally)))) (join ", ") (println)) (println) (->> tallies (pmap (fn [[tally & options]] (cons (take-greatest-vals 10 (sort-by-vals-desc @(state tally))) options))) (map #(apply print-top10 %)) (dorun))) ;;;; Main (def tallies [[:url-hits "URIs by hit"] [:url-bytes "URIs by bytes" :shrink] [:s404s "404s"] [:clients "client addresses"] [:refs "referrers"]]) (defn wf-atoms [file] (let [chunk-count (int (/ (.length (java.io.File. file)) (* 32 1024 1024))) state (zipmap (map first tallies) (repeatedly #(atom {})))] (dorun (pmap (fn [[idx [start end]]] (println (str "Chunk " idx "/" chunk-count " (" start " -> " end ")")) (->> (read-lines-range file start end) (parse-lines) (tally! state))) (indexed (chunk-file file chunk-count)))) (time (report tallies state))))