Skip to content

Commit

Permalink
chore(Project): Add test data stream for development environment
Browse files Browse the repository at this point in the history
  • Loading branch information
gremid committed Dec 23, 2024
1 parent f42623d commit 513c7c6
Show file tree
Hide file tree
Showing 6 changed files with 333 additions and 292 deletions.
30 changes: 25 additions & 5 deletions dev/user.clj
Original file line number Diff line number Diff line change
@@ -1,16 +1,36 @@
(ns user
(:require
[clojure.tools.namespace.repl :as repl :refer [set-refresh-dirs]]
[dwds.livestream.server :as server]))
[dwds.livestream.http :as http]
[clojure.core.async :as a]
[clojure.java.io :as io]
[taoensso.timbre :as log]
[jsonista.core :as json])
(:import
(java.util.zip GZIPInputStream)))

(set-refresh-dirs "dev" "src")

(defn stream-sample-wb-page-requests
[ch]
(a/thread
(try
(with-open [r (->> (io/resource "wb-page-requests.edn.gz")
(io/input-stream) GZIPInputStream. (io/reader))]
(loop [lines (cycle (line-seq r))]
(when-let [line (first lines)]
(when (a/>!! ch (json/write-value-as-string (read-string line)))
(Thread/sleep 1000)
(recur (rest lines))))))
(catch Throwable t
(log/fatal t "Error while streaming sample page requests")))))

(defn start!
[]
(let [stop-metrics-reporter! (server/start-metrics-reporter!)
stop-tailer! (server/start-tailer!)
stop-server! (server/start-server!)]
(fn [] (stop-server!) (stop-tailer!) (stop-metrics-reporter!))))
(let [ch (a/chan)
stop-server! (http/start-server! (a/mult ch))]
(stream-sample-wb-page-requests ch)
(fn [] (a/close! ch) (stop-server!))))

(def stop!
nil)
Expand Down
Binary file added dev/wb-page-requests.edn.gz
Binary file not shown.
126 changes: 126 additions & 0 deletions src/dwds/livestream/access_log.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
(ns dwds.livestream.access-log
(:require
[clojure.core.async :as a]
[clojure.java.io :as io]
[clojure.string :as str]
[dwds.livestream.env :as env]
[dwds.livestream.metrics :as metrics]
[lambdaisland.uri :as uri]
[lambdaisland.uri.normalize :refer [percent-decode]]
[taoensso.timbre :as log])
(:import
(java.time Instant)
(java.time.format DateTimeFormatter)
(org.apache.commons.io.input Tailer TailerListenerAdapter)))

(def uri-prefix
"/wb/")

(def uri-prefix-length
(count uri-prefix))

(def wb-http-request-prefix
(str "GET " uri-prefix))

(def wb-typeahead-http-request-prefix
(str wb-http-request-prefix "typeahead"))

(defn wb-page-request?
[log-line]
(and (str/includes? log-line wb-http-request-prefix)
(not (str/includes? log-line wb-typeahead-http-request-prefix))))

(def log-line-parts
[:line :ip :timestamp :method :uri :status :size :referrer :user-agent])

(def log-line-pattern
#"(?x)
(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})?\s # Remote-IP
-\s
-\s
\[(.*)\]\s # timestamp
\"(\w+)\s([^\s]+)[^\"]*\"\s # method and request URI
(\d{3})\s # status code
(\d+)\s # response size
\"([^\"]*)\"\s # referrer
\"([^\"]*)\".* # user-agent
")

(def timestamp-formatter
(DateTimeFormatter/ofPattern "dd/MMM/yyyy:HH:mm:ss Z"))

(defn parse-timestamp
[^String s]
(str (Instant/from (.parse ^DateTimeFormatter timestamp-formatter s))))

(defn parse-log-line
[line]
(let [log-line-match (re-find log-line-pattern line)]
(-> (apply hash-map (interleave log-line-parts log-line-match))
(update :timestamp parse-timestamp))))

(def bot-patterns
(with-open [r (io/reader (io/resource "dwds/livestream/bot-patterns.txt"))]
(re-pattern (str/join "|" (line-seq r)))))

(defn valid-lemma?
[entry]
(and (seq entry) (not (str/starts-with? entry "["))))

(defn sub-wb?
[[entry & tail :as _path]]
(or (seq tail) (#{"dwb" "dwb2" "etymwb" "wdg" "index" "Wörterbuch"} entry)))

(defn bot?
[user-agent]
(some? (re-find bot-patterns user-agent)))

(defn log-line->wb-page-requests
[log-line]
(try
(when (wb-page-request? log-line)
(let [{:keys [status uri user-agent timestamp]} (parse-log-line log-line)]
(when (= "200" status)
(let [uri (uri/uri (subs uri uri-prefix-length))
path (str/split (or (uri :path) "") #"/")
[lemma :as path] (into [] (map percent-decode) path)]
(when (and (valid-lemma? lemma)
(not (sub-wb? path))
(not (bot? user-agent)))
(list {:timestamp timestamp
:lemma lemma}))))))
(catch Throwable t
(log/debugf t "Error parsing access log line '%s'" log-line))))

(def access-log-meter
(metrics/meter "access-log"))

(def wb-page-request-meter
(metrics/meter "wb-page-requests"))

(defn start-tailer!
[ch]
(let [access-log (str env/access-log)]
(->> (Tailer/create
access-log
(proxy [TailerListenerAdapter] []
(handle [event]
(if (instance? Throwable event)
(log/warnf ^Throwable event "Error while tailing %s" access-log)
(let [log-line event]
(try
(metrics/mark! access-log-meter)
(when (wb-page-request? log-line)
(doseq [wpr (log-line->wb-page-requests log-line)]
(when (a/>!! ch wpr)
(metrics/mark! wb-page-request-meter))))
(catch Throwable t
(log/debugf t "Error dispatching log line '%s'" log-line))))))
(fileNotFound []
(log/debugf "File not found: %s" access-log))
(fileRotated []
(log/debugf "Rotated: %s" access-log)))
1000
true)
(partial #(.stop ^Tailer %)))))

148 changes: 148 additions & 0 deletions src/dwds/livestream/http.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
(ns dwds.livestream.http
(:require
[clojure.core.async :as a]
[clojure.java.io :as io]
[clojure.string :as str]
[dwds.livestream.env :as env]
[dwds.livestream.html :as html]
[dwds.livestream.metrics :as metrics]
[muuntaja.core :as m]
[reitit.coercion.malli]
[reitit.ring]
[reitit.ring.coercion]
[reitit.ring.middleware.exception]
[reitit.ring.middleware.muuntaja]
[reitit.ring.middleware.parameters]
[ring.core.protocols :refer [StreamableResponseBody]]
[ring.util.response :as resp]
[taoensso.timbre :as log]
[throttler.core :refer [throttle-chan]])
(:import
(org.eclipse.jetty.io EofException)
(org.eclipse.jetty.server Server)))

(defn proxy-headers->request
[{:keys [headers] :as request}]
(let [scheme (some->
(or (headers "x-forwarded-proto") (headers "x-scheme"))
(str/lower-case) (keyword) #{:http :https})
remote-addr (some->>
(headers "x-forwarded-for") (re-find #"^[^,]*")
(str/trim) (not-empty))]
(cond-> request
scheme (assoc :scheme scheme)
remote-addr (assoc :remote-addr remote-addr))))

(def proxy-headers-middleware
{:name ::proxy-headers
:wrap (fn [handler]
(fn
([request]
(handler (proxy-headers->request request)))
([request respond raise]
(handler (proxy-headers->request request) respond raise))))})

(defn log-exceptions
[handler ^Throwable e request]
(when-not (some-> e ex-data :type #{:reitit.ring/response})
(log/warn e (.getMessage e)))
(handler e request))

(def exception-middleware
(-> reitit.ring.middleware.exception/default-handlers
(assoc :reitit.ring.middleware.exception/wrap log-exceptions)
(reitit.ring.middleware.exception/create-exception-middleware)))

(def handler-options
{:muuntaja m/instance
:coercion reitit.coercion.malli/coercion
:middleware [proxy-headers-middleware
reitit.ring.middleware.parameters/parameters-middleware
reitit.ring.middleware.muuntaja/format-middleware
exception-middleware
reitit.ring.coercion/coerce-exceptions-middleware
reitit.ring.coercion/coerce-request-middleware
reitit.ring.coercion/coerce-response-middleware]})

(def wb-page-broadcast-meter
(metrics/meter "wb-page-broadcasts"))

(defn response-stream
[jsonl->chunk broadcast-ch req]
(let [epm (get-in req [:parameters :query :epm])]
(reify StreamableResponseBody
(write-body-to-stream [_body _response output-stream]
(let [ch (a/chan (a/sliding-buffer 1))
throttled (cond-> ch epm (throttle-chan epm :minute))]
(try
(a/tap broadcast-ch ch)
(with-open [writer (io/writer output-stream)]
(loop []
(when-let [msg (a/<!! throttled)]
(doto writer (.write ^String (jsonl->chunk msg)) (.flush))
(metrics/mark! wb-page-broadcast-meter)
(recur))))
(catch EofException e
(->> "Client closed connection while streaming WB page requests"
(log/debug e)))
(catch Throwable t
(log/warn t "Error while streaming WB page requests"))
(finally
(a/untap broadcast-ch ch))))))))

(def text-event-stream
(partial response-stream #(str "data: " % "\n\n")))

(def jsonl-stream
(partial response-stream #(str % "\n")))

(defn handle-stream
[broadcast-ch stream-generator content-type req]
(-> (resp/response (stream-generator broadcast-ch req))
(resp/content-type content-type)
(resp/header "Cache-Control" "no-cache")
(resp/header "X-Accel-Buffering" "no")))

(defn stream-handler
[broadcast-ch k generator content-type]
{:name k
:handler (partial handle-stream broadcast-ch generator content-type)
:parameters {:query [:map [:epm {:optional true} [:and :int [:> 0]]]]}})

(def index-handler
{:name :index
:handler (-> (resp/response html/index)
(resp/content-type "text/html")
(constantly))})

(defn ring-handler
[broadcast-ch]
(reitit.ring/ring-handler
(reitit.ring/router
[env/http-context-path handler-options
[["/" index-handler]
["/api"
["/events" (stream-handler broadcast-ch :events text-event-stream
"text/event-stream")]
["/jsonl" (stream-handler broadcast-ch :jsonl jsonl-stream
"text/jsonl")]]]])
(reitit.ring/routes
(reitit.ring/redirect-trailing-slash-handler)
(reitit.ring/create-file-handler {:path env/http-context-path})
(reitit.ring/create-default-handler))))

(defn stop-server!
[^Server server]
(.stop server)
(.join server))

(require 'ring.adapter.jetty)

(defn start-server!
[broadcast-ch]
(log/infof "Starting HTTP server @ %d/tcp" env/http-port)
(->> {:port env/http-port
:output-buffer-size 1024
:join? false}
(ring.adapter.jetty/run-jetty (ring-handler broadcast-ch))
(partial stop-server!)))
Loading

0 comments on commit 513c7c6

Please sign in to comment.