User guide

Creating tasks

The best way to manage tasks is by using TaskService.

Each new task must have commands with a list of JSONRPC request body templates (see TaskCommand). Each request in the list of commands is considered a stage. The stages will be executed consequently. The result of each stage is sent back to the manager and stored in the task result field. Maximum number of commands per task is regulated by MAX_STAGES.

from kaiju_tasks import Task, TaskCommand, TaskService

# TaskService
task = await task_service.create(Task(
    app=self.app.name,
    id='orders.delete_expired',
    commands=[
        TaskCommand(method='orders.delete', params={'conditions': {'expired': True}}),
        TaskCommand(method='cache.clear', params=None)
    ]
))

It is recommended to manage parameters using kws and templates, see Sharing data between stages for more info.

It’s reasonable to include app_name to tell the manager which app would run this task. You can access current app name using self.app.name from inside your service.

It’s also reasonable to manually set id when creating a periodic or triggered task so it will be easier to manipulate it in the table. The id can be any string.

See task_schema and Task for the complete list of attributes.

Task status

Each task has its state stored in status field of the tasks table. All new tasks are created in IDLE state. Before sending a task into the stream, the manager switches its status to QUEUED. When an executor callbacks the task id to the manager the task is switched to EXECUTED acknowledging the fact that it is now in process. Eventually it becomes FINISHED / FAILED depending on the result. Failed tasks can be restarted (see Retries). Finished tasks can be restarted too if they have run intervals configured (see Periodic tasks).

Occasionally a task may become SUSPENDED. It means that the executor who was running the task is exited or not responding. The manager will automatically try to re-queue such tasks as soon as possible.

See TaskStatus for a list of task states.

task state activity

Timeouts

Each task has an assigned max_exec_timeout which determines a total timeout in seconds for all task stages. The timeout interval is calculated from the queued_at (queue time) and the resulting deadline is stored in exec_deadline. When a task reaches its timeout it is automatically aborted in the executor spawning a TimeoutError.

Note

It’s recommended to configure timeouts for each task manually depending on how much theoretically it may take to complete. Note however that an internal RPC server of the executor may have max_timeout value lower than the one tou have set for the task.

Defaults for timeouts are DEFAULT_T and MIN_T, MAX_T.

Additionally each command supports its own timeout using optional max_timeout argument.

TaskCommand(method='do.job', params={'a': 1, 'b': 2}, max_timeout=100)

Note

Timeouts for each command does not override task total timeout. The task will be aborted when the task deadline is reached even if a command can technically be executed.

Retries

To allow retries yoe can set max_retries. When a task with retries fails, it will be re-queued as soon as possible and retries counter will be incremented until no retries left or the task has succeeded.

You can set a stage the task should be restarted using restart_policy which can be one of: CURRENT - default, restart from the current stage, or FIRST - discard the results and restart from the first stage.

The absolute limit on number of retries is regulated by MAX_RETRIES.

task = await task_service.create(Task(
    id='server.sync',
    commands=[{'method': 'server.sync'}],
    max_retries=3,
    restart_policy='CURRENT'  # CURRENT or FIRST
))

Periodic tasks

Use cron field to execute tasks periodically. See cron format for more details.

task = await task_service.create(Task(
    id='cache.update'
    commands=[TaskCommand(method='cache.update')],
    cron='* * * * 1'
))

Note

Sometimes a task manager will not schedule cron tasks at precise intervals. The reasons may be: task takes too much time between iterations (considering that max_exec_timeout allows that), the task was restarted, etc. This could lead to the actual intervals being larger than specified in the cron settings.

Sharing data between stages

Templates syntax is fully supported in task commands. At the start of a stage its command is evaluated using task data, which consists of kws dict from kws, executor specific to each executor defined in data dict, task id and the results from previous stages starting from “0”.

template_data = {
  'id': 'my_task',
  'kws': {'command_name': 'items.delete'},
  'executor': {'some_value_from_executor_config': True},
  '0': {
    'data': [
      {'id': '001'},
      {'id': '005'},
      {'id': '101'}
    ]
  }
}

You can use field names in square brackets to access these values. Note that you can use templates for both method name and params. See Template from kaiju-tools package for details.

task = await task_service.create(Task(
    id='my_task',
    commands=[
        TaskCommand(method='items.m_get' params={'conditions': {'marked': True}})
        TaskCommand(method='[kws.command_name]' params={'id': '[0.data.id]'})
    ],
    kws={'command_name': 'items.m_delete'}
))

Conditions

You can specify a condition for a single command using Condition syntax. The command will be executed if the condition result against task template data is True. Otherwise an executor will skip the execution and write None to the command result. Note that from the executor/manager perspective a skipped stage is still considered ‘executed’ but with no actual command submitted to the RPC.

from kaiju_tools.templates import Condition

task = await task_service.create(Task(
  app=self.app.name, id='products.update_prices',
  commands=[
      # suppose it returns a 'count' of actually updated items
      TaskCommand(method='products.update_attributes'), # -> {'count': 1000}
      # this command will be executed only if the previous stage 'count' > 0
      TaskCommand(
        method='products_cache.refresh',
        condition=Condition({'0.count': {'gt': 0}})
      )
  ]
))

You can create even more complicated syntax using templates (see Sharing data between stages).

task = await task_service.create(Task(
  app=self.app.name, id='products.update_prices',
  commands=[
      # suppose you have 'update_threshold'- min number of updated rows required for a cache refresh
      TaskCommand(method='settings.get'),  # -> {..., 'cache': {'update_threshold': 42, ...}}
       # suppose it returns 'count' of actually updated items
      TaskCommand(method='products.update_attributes'),  # -> {'count': 1000}
      # refresh cache only if updated item 'count' > 'update_threshold'
      TaskCommand(
        method='products_cache.refresh',
        condition=Condition({'1.count': {'gt': '[0.cache.update_threshold]'}})
      )
  ]
))

Timers

Imagine you have two sequential commands the first of them requiring additional grace time. To do this you can insert a Timer between them with a timeout specified in seconds. The task will become WAITING with the executor being able to process other tasks. Once the timer has been reached, the task will be queued from the next stage (similarly to suspended tasks). See an example below.

from kaiju_tasks import Timer, Task

task = await task_service.create(Task(
  app=self.app.name, id='orders.delete_expired',
  commands=[
      TaskCommand(method='orders.delete', params={'conditions': {'expired': True}}),
      Timer(100),
      TaskCommand(method='cache.clear', params=None)
  ]
))

Note

Timers do not shield tasks from cancellation once the task max timeout is reached. You should estimate possible durations of your tasks and specify appropriate max_timeout values to prevent it.

Messages (hooks)

You can use Message command to specify that a task need user input (i.e. received message) to continue. This command indicates that the executor must drop the task (which itself becomes WAITING) and the task will not continue until it receives a message via write_message(). Once a message is received the task will be queued again with the message data available in stage results (under message stage index).

You can use it to wait for some intermediate data which would take too long to acquire, or when your task needs actual human input to continue.

See the example below.

from kaiju_tasks import Timer, Task

task = await task_service.create(Task(
  app=self.app.name,
  id='orders.delete_expired',
  commands=[
       # ask for a data report file
      TaskCommand(method='statistics.request_report', params={'callback_id': '[id]'}),
      # wait until TaskService.write_message receives a message for this task
      Message(max_timeout=None),
      # message will be available in the results
      TaskCommand(method='files.download', params={'url': '[1.url]'})
  ]
))

Example of a message:

POST /public/rpc
{
  "method": "tasks.write_message",
  "params": {
    "id": "my_task_id",
    "data": {"url": "http://statistics.example/reports/324934"}
  }
}

Note

Messages do not shield tasks from cancellation once the task max timeout is reached. You should estimate possible durations of your tasks and specify appropriate max_timeout values to prevent it.

Results

Once a stage is completed, its result will be stored in result list. An error is stored in Task.error. There’s also exit_code - a exit code where 0 means OK and everything else indicates an error. See ExitCode for a list of used exit codes.

You may use TaskService to fetch task results and other data.

# TaskService
data = await task_service.get(
  id=my_task['id'],
  columns=['result', 'exit_code', 'error']
)
# exit_code != 0 means error, exit_code == None means that the task yet to be executed

Once a task is restarted (for any reason) its results and exit code are cleared and a new job_id is assigned to the task. You can still access previous results from the notifications table if you enabled notify. See NotificationService and Notification for details about notification attributes.

# NotificationService
# all results for the task sorted by date
await notifications.list(
  conditions={'task_id': task['id']},
  sort=[{'desc': 'created'}],
  columns=['result']
)
# a result for a particular job
data = await notifications.list(
  conditions={'task_id': task['id'], 'job_id': task['job_id']},
  columns=['result']
)

Chaining tasks

You can chain tasks by providing next_task parameter when adding a new task. The value must be an id of an existing task. The referenced task will be scheduled for execution once the main task has finished.

There are a few rules for task chaining:

  • The task must finish successfully. A failed task does not trigger the next one.

  • If the next task is already QUEUED, EXECUTED or SUSPENDED it will not be re-scheduled.

  • Disabled tasks are also not executed.

An example of task chaining is shown below.

next_task = await task_service.create(
  Task(commands=[
    TaskCommand(method='do.something.else')
  ])
)
task = await task_service.create(
  Task(commands=[
    TaskCommand(method='do.something')
  ], next_task=next_task['id'])
)

Permissions

Permission management for tasks and notifications includes checking current user id when user makes a request to TaskService or NotificationService. Unless a user has specific permission (see Permission) one can view and edit only tasks and notifications with user_id equals to the current user id. System users can access all tasks and notifications.

Resetting tasks

It’s unlikely that you would need to reset a task. However you can use reset_task() (“tasks.reset” in RPC) to reset a particular task to its initial state. The task then will be re-queued by the manager.

# TaskService
await task_service.reset(id='my_task_id')
POST {"method": "tasks.reset", "params": {"id": "my_task_id"}}

Log tracing

A new random job_id is assigned to a task when it’s QUEUED (unless it was retried or suspended). This id is both written to the tasks table and sent as a correlation id in related requests. It is also saved in job_id for this task notifications. You can use this id to get the complete log for this task.