Class TaskSpec

java.lang.Object
nz.org.riskscape.engine.task.TaskSpec
All Implemented Interfaces:
com.codahale.metrics.Metric, com.codahale.metrics.MetricSet

public final class TaskSpec extends Object implements com.codahale.metrics.MetricSet

The TaskSpec defines the work for a step or series of steps in a pipeline. This is a just blueprint for the work that needs to be done. One or more WorkerTasks are then created for the TaskSpec, which will actually carry out the work.

  • Field Details

    • NO_MINIMUM

      public static final int NO_MINIMUM
      See Also:
    • progressMetrics

      protected final Map<String,com.codahale.metrics.Metric> progressMetrics

      A map of Metrics that give an indication of progress through this task. Metrics from this map will be added to ExecutionOptions.getProgressMetrics() whilst the task is being processed.

  • Constructor Details

    • TaskSpec

      public TaskSpec(Class<? extends WorkerTask> impl, List<nz.org.riskscape.engine.pipeline.RealizedStep> forSteps, ReadPageBuffer input, WritePageBuffer output, boolean parallelizable, nz.org.riskscape.engine.output.PipelineJobContext context)
  • Method Details

    • getInput

      public Optional<ReadPageBuffer> getInput()
    • getOutput

      public Optional<WritePageBuffer> getOutput()
    • getWorkerTaskClass

      public Class<? extends WorkerTask> getWorkerTaskClass()
      Returns:
      the subclass of WorkerTask that is used to fulfill this taskspec's processing. One or many of these classes are instantiated, depending on the type of task, to consume/produce/both tuples as part of execution.
    • addDependency

      public void addDependency(TaskSpec task)

      Adds a hard dependency that requires another task to have run to completion, while allowing arbitrary objects (called processing results) to be passed to dependents before they themselves execute.

      WorkerTasks can return 'processing results' when they are complete by setting WorkerTask.processingResult. The Scheduler picks these up once a WorkerTask is complete by calling WorkerTask.consumeProcessingResult(). This result is then passed to the dependencies by calling addProcessingResultFromDependency(TaskSpec, Object). This is all done from the scheduler thread on tasks that are either waiting to run or have finished running, so no locking is required.

      When a WorkerTask that has dependencies runs, it can access the results of their dependencies by calling getProcessingResultsFrom(Class, Class) or getProcessingResultFrom(Class, Class)..

    • hasDependency

      public boolean hasDependency(TaskSpec taskSpec)
      Returns:
      true if this task depends on the given task spec being complete
    • satisfyDependency

      public void satisfyDependency(TaskSpec dependency)

      Mark a given dependent task as completed. This should be called once any and all WorkerTasks from that dependency have run to completion and their processingResults have been distributed appropriately, as once all dependencies have been satisfied a task may start running. If it doesn't have all the results it needs, the results of execution are likely to be incomplete.

    • hasOutstandingDependencies

      public boolean hasOutstandingDependencies()
      Returns:
      true if this tasks has dependencies that are preventing it from running.
    • addProcessingResultFromDependency

      public void addProcessingResultFromDependency(TaskSpec fromSpec, Object processingResult)

      Pass the result of execution from a dependency to this task, so that a (or many) future WorkerTasks can use the results as part of their execution. WorkerTasks should use getProcessingResultsFrom(Class, Class) or getProcessingResultFrom(Class, Class) to access these results within the call to WorkerTask.run().

    • getProcessingResultsFrom

      public <T> List<T> getProcessingResultsFrom(Class<? extends WorkerTask> taskType, Class<T> expectedType)

      Shortcut for getProcessingResultsFrom(taskType, expectedType, NO_MINIMUM)

    • getProcessingResultsFrom

      public <T> List<T> getProcessingResultsFrom(Class<? extends WorkerTask> taskType, Class<T> expectedType, int min)

      Returns all of the processing results from dependencies of the given task type, or an empty list if there is no dependency of that type, or no results of that type.

      Parameters:
      taskType - the type of the dependency we're expecting there to be. No checking is done that there ever was a dependency of this type at this point, but the idea is that future use cases might depend on more than one task type and so this is here to distinguish the dependencies.
      expectedType - The expected type of processing results from the given task type.
      min - the minimum number of results expected
      Returns:
      A list of the processing results, or an empty list of no results were found
      Throws:
      ClassCastException - if there are results but they are not of expectedType
      IllegalStateException - if there are less results than the specified minimum
    • getProcessingResultFrom

      public <T> T getProcessingResultFrom(Class<? extends WorkerTask> taskType, Class<T> expectedType)

      A version of getProcessingResultsFrom(Class, Class) that ensures that one and only one processing result exists.

      Throws:
      IllegalStateException - if there isn't exactly one result. This will halt execution and is considered a programming error, not a user error
    • newWorkerTask

      public WorkerTask newWorkerTask(Scheduler scheduler) throws nz.org.riskscape.problem.ProblemException

      Creates a new WorkerTask for the given Scheduler. The Scheduler needs to translate a TaskSpec into one or more WorkerTasks that will actually carry out the work.

      Throws:
      nz.org.riskscape.problem.ProblemException
    • newMetric

      protected <T extends com.codahale.metrics.Metric> T newMetric(String metricName, Supplier<T> constructor)
    • newMetric

      protected <T extends com.codahale.metrics.Metric> T newMetric(String metricName, String progressName, Supplier<T> constructor)

      Create or return an existing Metric with the given task-local name.

      Creating a metric through this method will register the metric against the execution contexts metrics with a name that identifies it uniquely against the executed pipeline (using the task spec's steps to generate an id). For parallelizable tasks, you may be returned a pre-existing metric - this allows the same metric to record activity across a set of tasks related to the same TaskSpec

      All calls to newMetric should be done from the scheduler thread, not from the worker thread

      Parameters:
      progressName - a possibly alternative name to use to register this metrics as a progress metric. See getProgressMetrics() for more info. Set to null if this isn't an important progress indicator metric.
    • getCombinedStepsName

      public String getCombinedStepsName()
      Returns:
      a combined string containing of all the step names that this task handles
    • getStepsSummary

      public String getStepsSummary()
    • toString

      public String toString()
      Overrides:
      toString in class Object
    • getLastStep

      public nz.org.riskscape.engine.pipeline.RealizedStep getLastStep()
    • getFirstStep

      public nz.org.riskscape.engine.pipeline.RealizedStep getFirstStep()
    • changeState

      public void changeState(TaskState newState)
    • isCreated

      public boolean isCreated()
    • isStarted

      public boolean isStarted()
    • isComplete

      public boolean isComplete()
    • close

      public void close()
    • hadDependency

      public boolean hadDependency(TaskSpec taskSpec)
    • allWorkersMatch

      public boolean allWorkersMatch(Predicate<WorkerTask> predicate)
    • getExecutionContext

      public nz.org.riskscape.engine.pipeline.ExecutionContext getExecutionContext()
    • getContainer

      public nz.org.riskscape.engine.output.PipelineOutputContainer getContainer()
    • equals

      public boolean equals(Object o)
      Overrides:
      equals in class Object
    • hashCode

      public int hashCode()
      Overrides:
      hashCode in class Object
    • getForSteps

      public List<nz.org.riskscape.engine.pipeline.RealizedStep> getForSteps()

      The underlying pipeline step(s) that this task is executing

    • getDependsOn

      public List<TaskSpec> getDependsOn()
    • isParallelizable

      public boolean isParallelizable()
    • getJobContext

      public nz.org.riskscape.engine.output.PipelineJobContext getJobContext()
    • getMetrics

      public Map<String,com.codahale.metrics.Metric> getMetrics()
      Specified by:
      getMetrics in interface com.codahale.metrics.MetricSet
    • getWorkerTasks

      public List<WorkerTask> getWorkerTasks()
    • getProgressMetrics

      public Map<String,com.codahale.metrics.Metric> getProgressMetrics()

      A map of Metrics that give an indication of progress through this task. Metrics from this map will be added to ExecutionOptions.getProgressMetrics() whilst the task is being processed.

    • getState

      public TaskState getState()