Class TaskSpec
- All Implemented Interfaces:
com.codahale.metrics.Metric
,com.codahale.metrics.MetricSet
The TaskSpec defines the work for a step or series of steps in a pipeline. This is a just blueprint for the work that needs to be done. One or more WorkerTasks are then created for the TaskSpec, which will actually carry out the work.
-
Field Summary
Modifier and TypeFieldDescriptionstatic final int
Constant forgetProcessingResultsFrom(Class, Class, int)
A map ofMetric
s that give an indication of progress through this task. -
Constructor Summary
ConstructorDescriptionTaskSpec
(Class<? extends WorkerTask> impl, List<nz.org.riskscape.engine.pipeline.RealizedStep> forSteps, ReadPageBuffer input, WritePageBuffer output, boolean parallelizable, nz.org.riskscape.engine.output.PipelineJobContext context) -
Method Summary
Modifier and TypeMethodDescriptionvoid
addDependency
(TaskSpec task) Adds a hard dependency that requires another task to have run to completion, while allowing arbitrary objects (called processing results) to be passed to dependents before they themselves execute.void
addProcessingResultFromDependency
(TaskSpec fromSpec, Object processingResult) Pass the result of execution from a dependency to this task, so that a (or many) futureWorkerTask
s can use the results as part of their execution.boolean
allWorkersMatch
(Predicate<WorkerTask> predicate) void
changeState
(TaskState newState) void
close()
boolean
nz.org.riskscape.engine.output.PipelineOutputContainer
nz.org.riskscape.engine.pipeline.ExecutionContext
nz.org.riskscape.engine.pipeline.RealizedStep
List<nz.org.riskscape.engine.pipeline.RealizedStep>
The underlying pipeline step(s) that this task is executinggetInput()
nz.org.riskscape.engine.output.PipelineJobContext
nz.org.riskscape.engine.pipeline.RealizedStep
<T> T
getProcessingResultFrom
(Class<? extends WorkerTask> taskType, Class<T> expectedType) A version ofgetProcessingResultsFrom(Class, Class)
that ensures that one and only one processing result exists.<T> List<T>
getProcessingResultsFrom
(Class<? extends WorkerTask> taskType, Class<T> expectedType) Shortcut forgetProcessingResultsFrom(taskType, expectedType, NO_MINIMUM)
<T> List<T>
getProcessingResultsFrom
(Class<? extends WorkerTask> taskType, Class<T> expectedType, int min) Returns all of the processing results from dependencies of the given task type, or an empty list if there is no dependency of that type, or no results of that type.A map ofMetric
s that give an indication of progress through this task.getState()
Class<? extends WorkerTask>
boolean
hadDependency
(TaskSpec taskSpec) boolean
hasDependency
(TaskSpec taskSpec) int
hashCode()
boolean
boolean
boolean
boolean
boolean
protected <T extends com.codahale.metrics.Metric>
TCreate or return an existing Metric with the given task-local name.protected <T extends com.codahale.metrics.Metric>
TnewWorkerTask
(Scheduler scheduler) Creates a new WorkerTask for the given Scheduler.void
satisfyDependency
(TaskSpec dependency) Mark a given dependent task as completed.toString()
-
Field Details
-
NO_MINIMUM
public static final int NO_MINIMUMConstant for
getProcessingResultsFrom(Class, Class, int)
- See Also:
-
progressMetrics
A map of
Metric
s that give an indication of progress through this task.Metric
s from this map will be added toExecutionOptions.getProgressMetrics()
whilst the task is being processed.
-
-
Constructor Details
-
TaskSpec
public TaskSpec(Class<? extends WorkerTask> impl, List<nz.org.riskscape.engine.pipeline.RealizedStep> forSteps, ReadPageBuffer input, WritePageBuffer output, boolean parallelizable, nz.org.riskscape.engine.output.PipelineJobContext context)
-
-
Method Details
-
getInput
-
getOutput
-
getWorkerTaskClass
- Returns:
- the subclass of WorkerTask that is used to fulfill this taskspec's processing. One or many of these classes are instantiated, depending on the type of task, to consume/produce/both tuples as part of execution.
-
addDependency
Adds a hard dependency that requires another task to have run to completion, while allowing arbitrary objects (called processing results) to be passed to dependents before they themselves execute.
WorkerTask
s can return 'processing results' when they are complete by settingWorkerTask.processingResult
. TheScheduler
picks these up once a WorkerTask is complete by callingWorkerTask.consumeProcessingResult()
. This result is then passed to the dependencies by callingaddProcessingResultFromDependency(TaskSpec, Object)
. This is all done from the scheduler thread on tasks that are either waiting to run or have finished running, so no locking is required.When a
WorkerTask
that has dependencies runs, it can access the results of their dependencies by callinggetProcessingResultsFrom(Class, Class)
orgetProcessingResultFrom(Class, Class)
.. -
hasDependency
- Returns:
- true if this task depends on the given task spec being complete
-
satisfyDependency
Mark a given dependent task as completed. This should be called once any and all
WorkerTask
s from that dependency have run to completion and their processingResults have been distributed appropriately, as once all dependencies have been satisfied a task may start running. If it doesn't have all the results it needs, the results of execution are likely to be incomplete. -
hasOutstandingDependencies
public boolean hasOutstandingDependencies()- Returns:
- true if this tasks has dependencies that are preventing it from running.
-
addProcessingResultFromDependency
Pass the result of execution from a dependency to this task, so that a (or many) future
WorkerTask
s can use the results as part of their execution.WorkerTask
s should usegetProcessingResultsFrom(Class, Class)
orgetProcessingResultFrom(Class, Class)
to access these results within the call toWorkerTask.run()
. -
getProcessingResultsFrom
public <T> List<T> getProcessingResultsFrom(Class<? extends WorkerTask> taskType, Class<T> expectedType) Shortcut for
getProcessingResultsFrom(taskType, expectedType, NO_MINIMUM)
-
getProcessingResultsFrom
public <T> List<T> getProcessingResultsFrom(Class<? extends WorkerTask> taskType, Class<T> expectedType, int min) Returns all of the processing results from dependencies of the given task type, or an empty list if there is no dependency of that type, or no results of that type.
- Parameters:
taskType
- the type of the dependency we're expecting there to be. No checking is done that there ever was a dependency of this type at this point, but the idea is that future use cases might depend on more than one task type and so this is here to distinguish the dependencies.expectedType
- The expected type of processing results from the given task type.min
- the minimum number of results expected- Returns:
- A list of the processing results, or an empty list of no results were found
- Throws:
ClassCastException
- if there are results but they are not ofexpectedType
IllegalStateException
- if there are less results than the specified minimum
-
getProcessingResultFrom
A version of
getProcessingResultsFrom(Class, Class)
that ensures that one and only one processing result exists.- Throws:
IllegalStateException
- if there isn't exactly one result. This will halt execution and is considered a programming error, not a user error
-
newWorkerTask
public WorkerTask newWorkerTask(Scheduler scheduler) throws nz.org.riskscape.problem.ProblemException Creates a new WorkerTask for the given Scheduler. The Scheduler needs to translate a TaskSpec into one or more WorkerTasks that will actually carry out the work.
- Throws:
nz.org.riskscape.problem.ProblemException
-
newMetric
-
newMetric
protected <T extends com.codahale.metrics.Metric> T newMetric(String metricName, String progressName, Supplier<T> constructor) Create or return an existing Metric with the given task-local name.
Creating a metric through this method will register the metric against the execution contexts metrics with a name that identifies it uniquely against the executed pipeline (using the task spec's steps to generate an id). For parallelizable tasks, you may be returned a pre-existing metric - this allows the same metric to record activity across a set of tasks related to the same
TaskSpec
All calls to newMetric should be done from the scheduler thread, not from the worker thread
- Parameters:
progressName
- a possibly alternative name to use to register this metrics as a progress metric. SeegetProgressMetrics()
for more info. Set to null if this isn't an important progress indicator metric.
-
getCombinedStepsName
- Returns:
- a combined string containing of all the step names that this task handles
-
getStepsSummary
-
toString
-
getLastStep
public nz.org.riskscape.engine.pipeline.RealizedStep getLastStep() -
getFirstStep
public nz.org.riskscape.engine.pipeline.RealizedStep getFirstStep() -
changeState
-
isCreated
public boolean isCreated() -
isStarted
public boolean isStarted() -
isComplete
public boolean isComplete() -
close
public void close() -
hadDependency
-
allWorkersMatch
-
getExecutionContext
public nz.org.riskscape.engine.pipeline.ExecutionContext getExecutionContext() -
getContainer
public nz.org.riskscape.engine.output.PipelineOutputContainer getContainer() -
equals
-
hashCode
public int hashCode() -
getForSteps
The underlying pipeline step(s) that this task is executing
-
getDependsOn
-
isParallelizable
public boolean isParallelizable() -
getJobContext
public nz.org.riskscape.engine.output.PipelineJobContext getJobContext() -
getMetrics
- Specified by:
getMetrics
in interfacecom.codahale.metrics.MetricSet
-
getWorkerTasks
-
getProgressMetrics
A map of
Metric
s that give an indication of progress through this task.Metric
s from this map will be added toExecutionOptions.getProgressMetrics()
whilst the task is being processed. -
getState
-