com.walmartlabs.active-status.workers

added in 0.1.15

Manage a pool for worker CSPs, including representing the workers on a status board.

This is a common use case for the status board and an asynchronous work flow: to spread some form of work across multiple CSPs to achieve better throughput than is possible using a single thread (because the CSPs will spend most of their time parked, waiting for some form of disk or network I/O to complete).

run-workers

(run-workers worker-count worker-constructor)

Runs a number of workers CSPs in parallel.

The worker-constructor function is a constructor for CSPs. It is passed a worker id, and creates a worker CSP and returns a channel. Worker ids are numbered from 1.

The channels are merged together; the merged channel is returned.

Typically, each worker CSP is a go block that returns no value; the merge of that is a channel that closes only after every worker terminates.

Alternately, the workers can convey a series of values through their channels before closing their individual channels. The order in which such values are conveyed through the merged channel is not deterministic.

wrap-with-status

(wrap-with-status status-board worker-constructor)(wrap-with-status status-board worker-constructor options)

Wraps a worker constructor as a worker builder that reports status using the status-board.

A worker constructor is passed a status-board job channel (instead of a worker id), and returns a result channel. wrap-with-status exists to create a constructor that’s compatible with run-workers.

status-board is an Active Status Board; a job will be added for each worker, and ultimately closed (after the worker CSP completes).

The job’s channel is passed to the worker-constructor function, which returns a channel. The worker process may simply close the channel when work is complete, or may convey a series of values through the channel.

The worker process can write its own updates to the status-board job channel passed to it; typically this identifies the work being performed.

Returns a channel that conveys any result values from the worker process, and closes when the worker process result channel closes.

Usage:

(defn migrate-data
  [connection work-ch job-ch]
  (go
    (loop []
      (when-let [v (<! work-ch)]
        (>! job-ch v) ; Update status board with current work
        ...           ; do some work
        (recur)))))

(defn driver
  "Does all the work on the provided work-ch, using the connection, and 10
  worker processes. Returns a channel that closes when all work is complete."
  [status-board connection work-ch]
  (run-workers 10 (wrap-with-status status-board #(migrate-data connection work-ch %)))

This is, of course, just a sketch, as it doesn’t show where the channel of work comes from, or what the actual work is.

options:

:prefix (default worker %2d:)
Status job prefix, generated from the worker id (numbered from 1).
:initial-text (default waiting)
Initial text for the worker (the worker receives the job channel and may make any other updates to it).
:complete-text (default done)
Text written to the worker’s job channel after the worker process closes its channel.