Operation guide =============== Manager loop ____________ :py:class:`~kaiju_tasks.services.TaskManager` schedules tasks from the tasks table. One manager can schedule tasks for multiple executors and even for multiple applications. The core element of the process is :py:meth:`~kaiju_tasks.services.TaskManager.loop`. At each cycle of the loop several things happen. - Task manager will check all executors and expell dead ones (see `Executor state monitoring`_). - :py:attr:`~kaiju_tasks.types.TaskStatus.QUEUED` or :py:attr:`~kaiju_tasks.types.TaskStatus.EXECUTED` tasks which reached timeout but has not been cancelled by their executors are set to :py:attr:`~kaiju_tasks.types.TaskStatus.FAILED` with :py:class:`~kaiju_tasks.types.Task`.exit_code 130. - Cron tasks in :py:attr:`~kaiju_tasks.types.TaskStatus.FINISHED` and :py:attr:`~kaiju_tasks.types.TaskStatus.FAILED` states will be restarted. - :py:attr:`~kaiju_tasks.types.TaskStatus.SUSPENDED` tasks will be queued and sent back to executor streams. - :py:attr:`~kaiju_tasks.types.TaskStatus.IDLE` tasks with :py:class:`~kaiju_tasks.types.Task`.next_run < now() will be set to :py:attr:`~kaiju_tasks.types.TaskStatus.QUEUED` and sent to executors as well. - :py:attr:`~kaiju_tasks.types.TaskStatus.FAILED` tasks with available retries will be queued and their retry counter will be incremented. For a full list of task states see :py:class:`~kaiju_tasks.etc.TaskStatus`. See the diagram for details. .. image:: diagrams/manager_loop.svg :width: 600 :alt: manager loop cycle Task execution ______________ A queued task is sent to the executor stream as a :py:meth:`~kaiju_tasks.services.TaskExecutor.run_task` RPC command. The namespace of the stream is determined by the `app_name` attribute of the task. Any executor pulling from this stream will execute a run_task command. The task is executed stage by stage consequently. At each stage an RPC message is sent to :py:meth:`~kaiju_tasks.services.TaskManager.execute_stage` to tell that the stage is being executed. Then :py:class:`~kaiju_tasks.services.TaskExecutor` performs the stage command using its own internal RPC server. At the end of each stage the stage result is sent to :py:meth:`~kaiju_tasks.services.TaskManager.write_stage` method. There is a possibility that the executor will receive a *SIGTERM* while executing a task. In such case it will suspend itself and all of its tasks by :py:meth:`~kaiju_tasks.services.TaskManager.suspend_executor` command. .. image:: diagrams/task_execution.svg :width: 600 :alt: task execution process Executor state monitoring _________________________ :py:class:`~kaiju_tasks.services.TaskManager` monitors executor state using ping messages from executors. An active executor sends ping to :py:meth:`~kaiju_tasks.services.TaskManager.ping` through RPC stream. If a particular executor doesn't ping for too long, all of the tasks assigned to this executor will be moved into :py:attr:`~kaiju_tasks.types.TaskStatus.SUSPENDED` state by the manager (and rescheduled in the next cycle). If the supposedly dead executor somehow will be able to send results for these tasks, such results will be ignored by the manager. Ping intervals and executor lifetime limits are described in :py:class:`~kaiju_tasks.types.Limit`. Configuration _____________ The executor-manager system is stream-based and does not require direct HTTP connections. You can use multiple executor apps and instances with a single or multiple managers. However since there is no real-time urgency in executing service tasks, it would be reasonable to configure a single manager instance and just restart it on failure. A simplified minimal config for an executor is shown below. Basically it requires a working RPC server and a stream listener connected to a specific topic. You don't need to configure task or notification interface for an executor. You also should provide `manager_topic` name to the executor to send data to the manager. If you'd like to configure a single manager for multiple applications you should also provide `manager_namespace` specifying a manager app name (otherwise the executor's app namespace will be used). .. code-block:: yaml - cls: RedisStreamRPCClient - cls: RedisListener # executor stream name: redis_stream.executor enabled: "[services_tasks_executor_enabled]" settings: group_id: "[main_name]" topic: executor max_batch_size: 10 max_parallel_batches: 1 scope: SYSTEM - cls: TaskExecutor # user task executor enabled: "[services_tasks_executor_enabled]" A manager would require database access as well as notification and task interfaces. .. code-block:: yaml - cls: NotificationService - cls: RedisStreamRPCClient - cls: RedisListener # manager stream name: redis_stream.manager enabled: "[services_tasks_manager_enabled]" settings: group_id: "[main_name]" topic: manager scope: SYSTEM - cls: TaskManager # user task manager enabled: "[services_tasks_manager_enabled]" In this configuration *my_app* executor would listen to *dev.my_app.executor* stream and will send to *dev.manager_app.manager* stream, and the manager will listen to *dev.manager_app.manager* and send tasks to *dev..executor* streams depending on :py:class:`~kaiju_tasks.types.Task`.app_name value.