(ns wf (:use (clojure.contrib duck-streams seq-utils [str-utils2 :only [join]])) (:import java.util.concurrent.atomic.AtomicLong)) (set! *warn-on-reflection* true) ;;; Big wins: ;;; * Use queue instead of pmap. ;;; * Overlap chunking with actual runtime ;;;; Reading (defn chunk-file [filename n] (println "chunking...") (let [file (java.io.RandomAccessFile. filename "r")] (let [offsets (for [offset (range 0 (.length file) (/ (.length file) n))] (do (when-not (zero? offset) (.seek file offset) (while (not= (.read file) (int \newline)))) (.getFilePointer file))) offsets (concat offsets [(.length file)])] (partition 2 (interleave offsets (rest offsets)))))) (defn dolines [#^String file start-byte end-byte f] (with-open [stream (java.io.FileInputStream. file)] (.skip stream start-byte) (let [rdr (java.io.BufferedReader. (java.io.InputStreamReader. (java.io.BufferedInputStream. stream (* 8 131072)) "US-ASCII") 131072)] (loop [#^String line (.readLine rdr) remaining (- end-byte start-byte)] (when (and (>= remaining 0) line) (f line) (recur (.readLine rdr) (- remaining (.length line)))))))) (defn #^"[Ljava.lang.String;" dumbest-split [#^String s c #^"[Ljava.lang.String;" tokens] (let [len (dec (int (alength tokens)))] (loop [start (int 0) i (int 0)] (let [idx (int (.indexOf s (int c) (int start)))] (if (or (neg? idx) (>= i len)) (do (aset tokens i (.substring s start)) tokens) (do (aset tokens i (.substring s start idx)) (recur (inc idx) (inc i)))))))) (defn parse-line [line ary] (let [fields (dumbest-split line \space ary) status (aget fields 8) bytes (aget fields 9) #^String ref (aget fields 10)] (when (and (= (aget fields 5) "\"GET") ('#{"200" "304" "404"} status)) {:client (aget fields 0) :url (aget fields 6) :status status :bytes (if (= bytes "-") 0 (Long/parseLong bytes)) :ref (.substring ref 1 (dec (count ref)))}))) ;;;; Tallying (defn bump! [m #^String k delta] (let [delta (long delta)] (if-let [#^AtomicLong l (get @m k)] (.addAndGet l delta) (swap! m #(assoc % (String. k) (if-let [#^AtomicLong l (get % k)] (AtomicLong. (+ (.get l) delta)) (AtomicLong. delta))))))) (def article-re #"^/ongoing/When/\d\d\dx/\d\d\d\d/\d\d/\d\d/[^ .]+$") ;; (defn tally! [{:keys [url-hits url-bytes clients refs s404s]} records] ;; (doseq [{:keys [#^String url bytes client status #^String ref]} records] ;; (if (= status "404") ;; (bump! s404s url 1) ;; (do (bump! url-bytes url bytes) ;; (when (and (.startsWith url "/ongoing/When/") ;; (re-matches article-re url)) ;; (bump! url-hits url 1) ;; (bump! clients client 1) ;; (when-not (or (= ref "-") ;; (.startsWith ref "http://www.tbray.org/ongoing/")) ;; (bump! refs ref 1))))))) ;;;; Reporting (defn sort-by-vals-desc [m] (sort-by #(- (val %)) m)) (defn take-greatest-vals [n m] (when-let [m (seq m)] (reduce (fn [best x] (if (>= (val x) (val (last best))) (vec (take n (sort-by-vals-desc (conj best x)))) best)) [(first m)] (rest m)))) (defn truncate [#^String s n] (if (> (count s) n) (str (.substring s 0 n) "...") s)) (defn print-top10 [results label & [shrink?]] (println "Top" label) (let [fmt (if shrink? " %9.1fM: %s" " %10d: %s")] (doseq [[k v] (take 10 results)] (let [v (if shrink? (/ v 1024.0 1024.0) (long v))] (println (format fmt v (truncate k 60)))))) (println)) (defn report [tallies state] (println (join ", " (map #(str (count @(val %)) " " (name (key %))) state))) (println) (doseq [result (pmap #(cons (take-greatest-vals 10 @(state (first %))) (rest %)) tallies)] (apply print-top10 result))) ;;;; Main (def tallies [[:url-hits "URIs by hit"] [:url-bytes "URIs by bytes" :shrink] [:s404s "404s"] [:clients "client addresses"] [:refs "referrers"]]) (def thread-count 64) (defn wf-atoms [file] (let [chunk-count (int (/ (.length (java.io.File. file)) (* 32 1024 1024))) state (zipmap (map first tallies) (repeatedly #(atom {}))) chunks (indexed (chunk-file file chunk-count)) queue (java.util.concurrent.LinkedBlockingQueue. (+ chunk-count thread-count)) still-chunking (atom true) chunker (future (do (doseq [chunk chunks] (.put queue chunk)) (println "chunking complete.") (reset! still-chunking false)))] (let [threads (for [tid (range thread-count)] (future (while (when-let [[idx [start end]] (if @still-chunking (.take queue) (.poll queue))] (when (zero? (mod idx 10)) (println (str "Chunk " idx "/" chunk-count " (" start " -> " end ")" " Q " (.size queue)))) (let [ary (make-array String 12) {:keys [url-hits url-bytes clients refs s404s]} state] (dolines file start end #(when-let [record (parse-line % ary)] (let [{:keys [url bytes client status #^String ref]} record] (if (= status "404") (bump! s404s url 1) (do (bump! url-bytes url bytes) (when (and (.startsWith #^String url "/ongoing/When/") (re-matches article-re url)) (bump! url-hits url 1) (bump! clients client 1) (when-not (or (= ref "-") (.startsWith ref "http://www.tbray.org/ongoing/")) (bump! refs ref 1))))))))) true))))] (dorun (map deref threads))) (time (report tallies state))))