Skip to content

Job Lifecycle

Every maintenance job in Snowpack follows a fenced execution lifecycle. The fence — an attempt_id generated per claim — ensures that at most one worker can write terminal state for a given job, even under pod eviction, network partitions, or stale-reclaim races.

Lifecycle sequence

sequenceDiagram
    participant Client
    participant API as FastAPI API
    participant PG as Postgres
    participant KEDA as KEDA ScaledJob
    participant Worker as Worker Pod
    participant Spark as Spark / Kyuubi

    Client->>API: POST /tables/{db}/{table}/maintenance
    API->>API: Check drain_mode (503 if quiesced)
    API->>API: Validate actions + lookup table in cache
    API->>PG: try_lock_table (ownership-checked)
    Note over PG: 409 if another job holds the lock
    API->>PG: INSERT into jobs
    API->>PG: INSERT into job_queue
    API->>PG: history.save_job (best-effort audit)
    API-->>Client: 202 Accepted + Location header

    Note over KEDA: ~30s later, polls Postgres
    KEDA->>PG: SELECT COUNT(*) FROM job_queue<br/>WHERE claimed_at IS NULL<br/>AND visible_at <= NOW()
    Note over KEDA: depth >= 1 -> spawn worker pod

    KEDA->>Worker: Spawn pod
    Worker->>PG: dequeue_with_claim(worker_id, attempt_id)
    Note over PG: Single-tx CTE:<br/>SELECT ... FOR UPDATE SKIP LOCKED<br/>SET claimed_at, claimed_by, attempt_id,<br/>visible_at = NOW() + 1800s,<br/>attempt_count += 1<br/>UPDATE jobs SET status='running'
    Worker->>Worker: Start shutdown watcher thread
    Worker->>Worker: Start heartbeat thread (60s lease renewal)

    loop For each action (sorted)
        Worker->>Worker: Check _shutdown_requested
        Worker->>Worker: Check cancel (status=cancelled)
        Worker->>Spark: Create engine, run action
        Spark-->>Worker: Action result
    end

    Worker->>PG: store.update(results) -- writes job_actions
    Worker->>PG: store.write_terminal(attempt_id, status, error)
    Note over PG: UPDATE jobs WHERE job_id=?<br/>AND attempt_id=? (fenced)
    Worker->>PG: store.ack(attempt_id)
    Note over PG: DELETE FROM job_queue<br/>WHERE job_id=? AND attempt_id=?
    Worker->>PG: store.unlock_table(db, tbl, job_id)
    Note over PG: DELETE FROM table_locks<br/>WHERE holder = job_id
    Worker->>Worker: _job_cleanup_completed.set()
    Worker->>Worker: Exit 0 or 1

Step-by-step walkthrough

  1. Client submits maintenance request. POST /tables/{db}/{table}/maintenance hits the API. The API checks drain mode (returns 503 if quiesced), validates the requested actions, and looks up the table in the cache.

  2. API acquires table lock and enqueues. JobStore.try_lock_table takes an ownership-checked lock on the table key — if another job already holds it, the API returns 409. On success the API inserts a row into jobs (status pending), inserts a row into job_queue, and writes a best-effort audit record via history.save_job. The client receives 202 Accepted with a Location header pointing to the job resource.

  3. KEDA polls for work. The KEDA postgresql scaler runs SELECT COUNT(*) FROM job_queue WHERE claimed_at IS NULL AND visible_at <= NOW() every 30 seconds. When the depth is at least 1, KEDA spawns a worker pod.

  4. Worker claims the job. dequeue_with_claim(worker_id, attempt_id) runs a single-transaction CTE that atomically selects the next unclaimed job (FOR UPDATE SKIP LOCKED), sets claimed_at, claimed_by, attempt_id, pushes visible_at forward by 1800 seconds (the claim lease), increments attempt_count, and flips the job status to running with the matching attempt_id.

  5. Worker executes actions. A heartbeat thread renews the claim lease and table lock every 60 seconds. For each action (sorted), the worker checks _shutdown_requested (SIGTERM path) and status=cancelled (cancel path) before creating a Spark engine and running the action.

  6. Worker writes terminal state. After all actions complete (or on early exit), the worker writes job_actions results, calls write_terminal to set the final job status (fenced by attempt_id), acks the queue row, unlocks the table, and signals _job_cleanup_completed so the shutdown watcher thread becomes a no-op. The worker then exits with code 0 (success) or 1 (failure/SIGTERM).

Fence semantics (DL-197)

attempt_id (uuid4 hex) is generated per claim and written to both jobs.attempt_id and job_queue.attempt_id. Every post-claim write is predicated on that fence:

  • write_terminal(attempt_id=X)UPDATE jobs WHERE job_id=? AND attempt_id=X
  • ack(attempt_id=X)DELETE FROM job_queue WHERE job_id=? AND attempt_id=X
  • heartbeat_claim(attempt_id=X)UPDATE job_queue ... WHERE attempt_id=X

If the fence is rotated — by cancel writing attempt_id=NULL, or by the sweeper reclaiming the job — the worker’s writes match 0 rows and return False. The worker logs writeback_stale_attempt and exits cleanly.

Invariant: at most one attempt_id value ever reaches terminal state for a given job_id.

SIGTERM recovery

Every worker installs a SIGTERM handler that sets _shutdown_requested. A daemon “shutdown watcher” thread monitors that event. On SIGTERM the watcher runs fenced cleanup:

  1. write_terminal with status=failed and error="Worker received SIGTERM"
  2. ack the queue row
  3. unlock_table
  4. Force os._exit(1)

This ensures DB state is tidied in approximately 2 seconds, even if the main thread is blocked inside a long-running Spark JDBC call. Without the watcher, the job would remain in running state until the 30-minute sweeper lease expires and reclaims it.