User guide ========== Creating tasks ______________ The best way to manage tasks is by using :py:class:`~kaiju_tasks.services.TaskService`. Each new task must have :py:attr:`~kaiju_tasks.types.Task.commands` with a list of JSONRPC request body templates (see :py:class:`~kaiju_tasks.types.TaskCommand`). Each request in the list of commands is considered a *stage*. The stages will be executed consequently. The result of each stage is sent back to the manager and stored in the task result field. Maximum number of commands per task is regulated by :py:obj:`~kaiju_tasks.types.Limit.MAX_STAGES`. .. code-block:: python from kaiju_tasks import Task, TaskCommand, TaskService # TaskService task = await task_service.create(Task( app=self.app.name, id='orders.delete_expired', commands=[ TaskCommand(method='orders.delete', params={'conditions': {'expired': True}}), TaskCommand(method='cache.clear', params=None) ] )) It is recommended to manage parameters using :py:attr:`~kaiju_tasks.types.Task.kws` and templates, see `Sharing data between stages`_ for more info. It's reasonable to include :py:attr:`~kaiju_tasks.types.Task.app_name` to tell the manager which app would run this task. You can access current app name using `self.app.name` from inside your service. It's also reasonable to manually set :py:class:`~kaiju_tasks.types.Task.id` when creating a periodic or triggered task so it will be easier to manipulate it in the table. The id can be any string. See :py:obj:`~kaiju_tasks.services.task_schema` and :py:obj:`~kaiju_tasks.types.Task` for the complete list of attributes. Task status ___________ Each task has its state stored in :py:attr:`~kaiju_tasks.types.Task.status` field of the tasks table. All new tasks are created in :py:attr:`~kaiju_tasks.types.TaskStatus.IDLE` state. Before sending a task into the stream, the manager switches its status to :py:attr:`~kaiju_tasks.types.TaskStatus.QUEUED`. When an executor callbacks the task id to the manager the task is switched to :py:attr:`~kaiju_tasks.types.TaskStatus.EXECUTED` acknowledging the fact that it is now in process. Eventually it becomes :py:attr:`~kaiju_tasks.types.TaskStatus.FINISHED` / :py:attr:`~kaiju_tasks.types.TaskStatus.FAILED` depending on the result. Failed tasks can be restarted (see `Retries`_). Finished tasks can be restarted too if they have run intervals configured (see `Periodic tasks`_). Occasionally a task may become :py:attr:`~kaiju_tasks.types.TaskStatus.SUSPENDED`. It means that the executor who was running the task is exited or not responding. The manager will automatically try to re-queue such tasks as soon as possible. See :py:obj:`~kaiju_tasks.types.TaskStatus` for a list of task states. .. image:: diagrams/task_state_activity.svg :width: 600 :alt: task state activity Timeouts ________ Each task has an assigned :py:attr:`~kaiju_tasks.types.Task.max_exec_timeout` which determines a total timeout in seconds for all task stages. The timeout interval is calculated from the :py:attr:`~kaiju_tasks.types.Task.queued_at` (queue time) and the resulting deadline is stored in :py:attr:`~kaiju_tasks.types.Task.exec_deadline`. When a task reaches its timeout it is automatically aborted in the executor spawning a `TimeoutError`. .. note:: It's recommended to configure timeouts for each task manually depending on how much theoretically it may take to complete. Note however that an internal RPC server of the executor may have *max_timeout* value lower than the one tou have set for the task. Defaults for timeouts are :py:obj:`~kaiju_tasks.types.Limit.DEFAULT_T` and :py:obj:`~kaiju_tasks.types.Limit.MIN_T`, :py:obj:`~kaiju_tasks.types.Limit.MAX_T`. Additionally each command supports its own timeout using optional `max_timeout` argument. .. code-block:: python TaskCommand(method='do.job', params={'a': 1, 'b': 2}, max_timeout=100) .. note:: Timeouts for each command does not override task total timeout. The task will be aborted when the task deadline is reached even if a command can technically be executed. Retries _______ To allow retries yoe can set :py:attr:`~kaiju_tasks.types.Task.max_retries`. When a task with retries fails, it will be re-queued as soon as possible and :py:attr:`~kaiju_tasks.types.Task.retries` counter will be incremented until no retries left or the task has succeeded. You can set a stage the task should be restarted using :py:attr:`~kaiju_tasks.types.Task.restart_policy` which can be one of: :py:attr:`~kaiju_tasks.types.RestartPolicy.CURRENT` - default, restart from the current stage, or :py:attr:`~kaiju_tasks.types.RestartPolicy.FIRST` - discard the results and restart from the first stage. The absolute limit on number of retries is regulated by :py:obj:`~kaiju_tasks.types.Limit.MAX_RETRIES`. .. code-block:: python task = await task_service.create(Task( id='server.sync', commands=[{'method': 'server.sync'}], max_retries=3, restart_policy='CURRENT' # CURRENT or FIRST )) Periodic tasks ______________ Use :py:attr:`~kaiju_tasks.types.Task.cron` field to execute tasks periodically. See `cron format `_ for more details. .. code-block:: python task = await task_service.create(Task( id='cache.update' commands=[TaskCommand(method='cache.update')], cron='* * * * 1' )) .. note:: Sometimes a task manager will not schedule cron tasks at precise intervals. The reasons may be: task takes too much time between iterations (considering that :py:attr:`~kaiju_tasks.types.Task.max_exec_timeout` allows that), the task was restarted, etc. This could lead to the actual intervals being larger than specified in the cron settings. Sharing data between stages ___________________________ Templates syntax is fully supported in task commands. At the start of a stage its command is evaluated using task data, which consists of `kws` dict from :py:attr:`~kaiju_tasks.types.Task.kws`, `executor` specific to each executor defined in :py:attr:`~kaiju_tasks.services.TaskExecutor.data` dict, task id and the results from previous stages starting from "0". .. code-block:: python template_data = { 'id': 'my_task', 'kws': {'command_name': 'items.delete'}, 'executor': {'some_value_from_executor_config': True}, '0': { 'data': [ {'id': '001'}, {'id': '005'}, {'id': '101'} ] } } You can use field names in square brackets to access these values. Note that you can use templates for both method name and params. See `Template `_ from kaiju-tools package for details. .. code-block:: python task = await task_service.create(Task( id='my_task', commands=[ TaskCommand(method='items.m_get' params={'conditions': {'marked': True}}) TaskCommand(method='[kws.command_name]' params={'id': '[0.data.id]'}) ], kws={'command_name': 'items.m_delete'} )) Conditions __________ You can specify a condition for a single command using `Condition `_ syntax. The command will be executed if the condition result against task template data is `True`. Otherwise an executor will skip the execution and write `None` to the command result. Note that from the executor/manager perspective a skipped stage is still considered 'executed' but with no actual command submitted to the RPC. .. code-block:: python from kaiju_tools.templates import Condition task = await task_service.create(Task( app=self.app.name, id='products.update_prices', commands=[ # suppose it returns a 'count' of actually updated items TaskCommand(method='products.update_attributes'), # -> {'count': 1000} # this command will be executed only if the previous stage 'count' > 0 TaskCommand( method='products_cache.refresh', condition=Condition({'0.count': {'gt': 0}}) ) ] )) You can create even more complicated syntax using templates (see `Sharing data between stages`_). .. code-block:: python task = await task_service.create(Task( app=self.app.name, id='products.update_prices', commands=[ # suppose you have 'update_threshold'- min number of updated rows required for a cache refresh TaskCommand(method='settings.get'), # -> {..., 'cache': {'update_threshold': 42, ...}} # suppose it returns 'count' of actually updated items TaskCommand(method='products.update_attributes'), # -> {'count': 1000} # refresh cache only if updated item 'count' > 'update_threshold' TaskCommand( method='products_cache.refresh', condition=Condition({'1.count': {'gt': '[0.cache.update_threshold]'}}) ) ] )) Timers ______ Imagine you have two sequential commands the first of them requiring additional grace time. To do this you can insert a :py:class:`~kaiju_tasks.types.Timer` between them with a timeout specified in seconds. The task will become :py:attr:`~kaiju_tasks.types.TaskStatus.WAITING` with the executor being able to process other tasks. Once the timer has been reached, the task will be queued from the next stage (similarly to suspended tasks). See an example below. .. code-block:: python from kaiju_tasks import Timer, Task task = await task_service.create(Task( app=self.app.name, id='orders.delete_expired', commands=[ TaskCommand(method='orders.delete', params={'conditions': {'expired': True}}), Timer(100), TaskCommand(method='cache.clear', params=None) ] )) .. note:: Timers do not shield tasks from cancellation once the task max timeout is reached. You should estimate possible durations of your tasks and specify appropriate :py:attr:`~kaiju_tasks.types.Task.max_timeout` values to prevent it. Messages (hooks) ________________ You can use :py:class:`~kaiju_tasks.types.Message` command to specify that a task need user input (i.e. received message) to continue. This command indicates that the executor must drop the task (which itself becomes :py:attr:`~kaiju_tasks.types.TaskStatus.WAITING`) and the task will not continue until it receives a message via :py:meth:`~kaiju_tasks.services.TaskService.write_message`. Once a message is received the task will be queued again with the message data available in stage results (under message stage index). You can use it to wait for some intermediate data which would take too long to acquire, or when your task needs actual human input to continue. See the example below. .. code-block:: python from kaiju_tasks import Timer, Task task = await task_service.create(Task( app=self.app.name, id='orders.delete_expired', commands=[ # ask for a data report file TaskCommand(method='statistics.request_report', params={'callback_id': '[id]'}), # wait until TaskService.write_message receives a message for this task Message(max_timeout=None), # message will be available in the results TaskCommand(method='files.download', params={'url': '[1.url]'}) ] )) Example of a message: .. code-block:: POST /public/rpc { "method": "tasks.write_message", "params": { "id": "my_task_id", "data": {"url": "http://statistics.example/reports/324934"} } } .. note:: Messages do not shield tasks from cancellation once the task max timeout is reached. You should estimate possible durations of your tasks and specify appropriate :py:attr:`~kaiju_tasks.types.Task.max_timeout` values to prevent it. Results _______ Once a stage is completed, its result will be stored in :py:attr:`~kaiju_tasks.types.Task.result` list. An error is stored in :py:class:`~kaiju_tasks.types.Task`.error. There's also :py:attr:`~kaiju_tasks.types.Task.exit_code` - a `exit code `_ where 0 means OK and everything else indicates an error. See :py:class:`~kaiju_tasks.types.ExitCode` for a list of used exit codes. You may use :py:class:`~kaiju_tasks.services.TaskService` to fetch task results and other data. .. code-block:: python # TaskService data = await task_service.get( id=my_task['id'], columns=['result', 'exit_code', 'error'] ) # exit_code != 0 means error, exit_code == None means that the task yet to be executed Once a task is restarted (for any reason) its results and exit code are cleared and a new :py:attr:`~kaiju_tasks.types.Task.job_id` is assigned to the task. You can still access previous results from the notifications table if you enabled :py:attr:`~kaiju_tasks.types.Task.notify`. See :py:obj:`~kaiju_tasks.services.NotificationService` and :py:obj:`~kaiju_tasks.types.Notification` for details about notification attributes. .. code-block:: python # NotificationService # all results for the task sorted by date await notifications.list( conditions={'task_id': task['id']}, sort=[{'desc': 'created'}], columns=['result'] ) # a result for a particular job data = await notifications.list( conditions={'task_id': task['id'], 'job_id': task['job_id']}, columns=['result'] ) Chaining tasks ______________ You can chain tasks by providing :py:attr:`~kaiju_tasks.types.Task.next_task` parameter when adding a new task. The value must be an id of an existing task. The referenced task will be scheduled for execution once the main task has finished. There are a few rules for task chaining: - The task must finish successfully. A failed task does not trigger the next one. - If the next task is already :py:attr:`~kaiju_tasks.types.TaskStatus.QUEUED`, :py:attr:`~kaiju_tasks.types.TaskStatus.EXECUTED` or :py:attr:`~kaiju_tasks.types.TaskStatus.SUSPENDED` it will not be re-scheduled. - Disabled tasks are also not executed. An example of task chaining is shown below. .. code-block:: python next_task = await task_service.create( Task(commands=[ TaskCommand(method='do.something.else') ]) ) task = await task_service.create( Task(commands=[ TaskCommand(method='do.something') ], next_task=next_task['id']) ) Permissions ___________ Permission management for tasks and notifications includes checking current user id when user makes a request to :py:class:`~kaiju_tasks.services.TaskService` or :py:class:`~kaiju_tasks.services.NotificationService`. Unless a user has specific permission (see :py:class:`~kaiju_tasks.services.TaskService.Permission`) one can view and edit only tasks and notifications with `user_id` equals to the current user id. System users can access all tasks and notifications. Resetting tasks _______________ It's unlikely that you would need to reset a task. However you can use :py:meth:`~kaiju_tasks.services.TaskService.reset_task` ("tasks.reset" in RPC) to reset a particular task to its initial state. The task then will be re-queued by the manager. .. code-block:: python # TaskService await task_service.reset(id='my_task_id') .. code-block:: POST {"method": "tasks.reset", "params": {"id": "my_task_id"}} Log tracing ___________ A new random :py:attr:`~kaiju_tasks.types.Task.job_id` is assigned to a task when it's :py:attr:`~kaiju_tasks.types.TaskStatus.QUEUED` (unless it was retried or suspended). This id is both written to the tasks table and sent as a correlation id in related requests. It is also saved in :py:attr:`~kaiju_tasks.types.Notification.job_id` for this task notifications. You can use this id to get the complete log for this task.