Skip to content
This repository has been archived by the owner on Jan 6, 2023. It is now read-only.

async checkpoint recovery. #850

Open
wants to merge 2 commits into
base: 0.12.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 58 additions & 41 deletions src/onyx/peer/task_lifecycle.clj
Original file line number Diff line number Diff line change
Expand Up @@ -441,19 +441,29 @@
(advance (update-event! state f)))))

(defn recover-input [state]
(let [{:keys [recover-coordinates recovered?] :as context} (get-context state)
input-pipeline (get-input-pipeline state)]
(when-not recovered?
(let [event (get-event state)
stored (res/recover-input event recover-coordinates)]
(info (:onyx.core/log-prefix event) "Recover pipeline checkpoint:" stored)
(p/recover! input-pipeline (t/replica-version state) stored)))
(if (p/synced? input-pipeline (t/epoch state))
(-> state
(set-context! nil)
(advance))
;; ensure we don't try to recover input again before synced
(set-context! state (assoc context :recovered? true)))))
(let [{:keys [recover-coordinates restored-fut] :as context} (get-context state)
input-pipeline (get-input-pipeline state)
restored-fut (if restored-fut
restored-fut
(future (Thread/sleep 800)
(res/recover-input (get-event state) recover-coordinates)))]
(cond (and (future-done? restored-fut)
(p/synced? input-pipeline (t/epoch state)))
(let [stored @restored-fut]
(info (:onyx.core/log-prefix (get-event state)) "Recover pipeline checkpoint:" stored)
(p/recover! input-pipeline (t/replica-version state) stored)
(-> state
(set-context! nil)
(advance)))

(:start-time context)
(if (> (System/nanoTime) (+ (:start-time context) (* 700 1000000)))
(throw (ex-info "Past recover tolerance time." {}))
state)

:else
;; ensure we don't try to recover output again before synced and the checkpoint is fetched
(set-context! state (assoc context :restored-fut restored-fut :start-time (System/nanoTime))))))

(defn cleanup-previous-state-store! [state]
(when-let [state-store (get-state-store state)]
Expand All @@ -466,37 +476,42 @@
(defn recover-state
[state]
(let [{:keys [onyx.core/task-id onyx.core/peer-opts] :as event} (get-event state)
_ (cleanup-previous-state-store! state)
state-serializers (onyx.state.serializers.utils/event->state-serializers event)
db-name (db-name event)
state-store (db/create-db peer-opts db-name state-serializers)
_ (set-state-store! state state-store)
{:keys [recover-coordinates]} (get-context state)
recovered-windows (res/recover-windows event state-store recover-coordinates)]
(-> state
(set-windows-state! recovered-windows)
(ws/assign-windows (onyx.types/new-state-event :recovered
event
(t/replica-version state)
(t/epoch state)))
(advance))))

(defn recover-output [state]
(let [{:keys [recover-coordinates recovered?] :as context} (get-context state)
pipeline (get-output-pipeline state)]
(when-not recovered?
(let [event (get-event state)
;; output recovery is only supported with onyx/n-peers set
;; as we can't currently scale slot recovery up and down
stored (res/recover-output event recover-coordinates)]
(info (:onyx.core/log-prefix event) "Recover output pipeline checkpoint:" stored)
(p/recover! pipeline (t/replica-version state) stored)))
(if (p/synced? pipeline (t/epoch state))
{:keys [recover-coordinates restored-fut] :as context} (get-context state)
restored-fut (if restored-fut
restored-fut
(let [_ (cleanup-previous-state-store! state)
state-serializers (onyx.state.serializers.utils/event->state-serializers event)
db-name (db-name event)
state-store (db/create-db peer-opts db-name state-serializers)
_ (set-state-store! state state-store)]
(future (res/recover-windows event state-store recover-coordinates))))]
(if (future-done? restored-fut)
(-> state
(set-windows-state! @restored-fut)
(ws/assign-windows (onyx.types/new-state-event :recovered
event
(t/replica-version state)
(t/epoch state)))
(set-context! nil)
(advance))
;; ensure we don't try to recover output again before synced
(set-context! state (assoc context :recovered? true)))))
(set-context! state (assoc context :restored-fut restored-fut)))))

(defn recover-output [state]
(let [{:keys [recover-coordinates restored-fut] :as context} (get-context state)
output-pipeline (get-output-pipeline state)
restored-fut (if restored-fut
restored-fut
(future (res/recover-output (get-event state) recover-coordinates)))]
(if (and (future-done? restored-fut)
(p/synced? output-pipeline (t/epoch state)))
(let [stored @restored-fut]
(info (:onyx.core/log-prefix (get-event state)) "Recover pipeline checkpoint:" stored)
(p/recover! output-pipeline (t/replica-version state) stored)
(-> state
(set-context! nil)
(advance)))
;; ensure we don't try to recover otput again before synced and the checkpoint is fetched
(set-context! state (assoc context :restored-fut restored-fut)))))

(defn event->pub-liveness [event]
(ms->ns (arg-or-default :onyx.peer/publisher-liveness-timeout-ms
Expand Down Expand Up @@ -852,6 +867,7 @@
(when messenger (component/stop messenger))
(when coordinator (coordinator/stop coordinator scheduler-event))
(some-> event :onyx.core/storage cp/stop)
(some-> this get-context :restored-fut future-cancel)
(some-> input-pipeline (p/stop event))
(some-> output-pipeline (p/stop event))
this)
Expand Down Expand Up @@ -1058,6 +1074,7 @@
this)
(goto-recover! [this]
(.set idx recover-idx)
(some-> this get-context :restored-fut future-cancel)
(-> this
(set-context! nil)
(reset-event!)))
Expand Down
8 changes: 3 additions & 5 deletions test/onyx/windowing/basic_windowing_crash_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,9 @@
(let [{:keys [job-id]} (onyx.api/submit-job peer-config job)
_ (onyx.test-helper/feedback-exception! peer-config job-id)
results (take-segments! @out-chan 5)]
(println "COORDS" (onyx.api/job-snapshot-coordinates peer-config id job-id))
(println "Snapshot coordinates:" (onyx.api/job-snapshot-coordinates peer-config id job-id))
(println "GC checkpoints.")
(onyx.api/gc-checkpoints peer-config id job-id)
(println "Try again" )
(println "GC checkpoints a second time.")
(onyx.api/gc-checkpoints peer-config id job-id)
;; FIXME: improve test by pulling in old coordinates, recovering and then starting gc again.
;; TODO, allow gc to take part over tenancy history.

(is (= expected-windows (output->final-counts @test-state))))))))