Source code for kaiju_tasks.types

"""Data types."""
import abc
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import ClassVar, TypedDict
from uuid import UUID

from kaiju_tools.encoding import Serializable
from kaiju_tools.registry import ClassRegistry
from kaiju_tools.rpc import RPCRequest
from kaiju_tools.templates import Condition as Condition


__all__ = [
    'TaskStatus',
    'RestartPolicy',
    'Limit',
    'TaskCommand',
    'Task',
    'Notification',
    'ExecutorTask',
    'ExitCode',
    'Message',
    'Timer',
    'SPECIAL_COMMANDS',
]


[docs]class TaskCommand(TypedDict, total=False): """Task command. It's just a typed dict providing an interfaces for task command format. .. code-block:: python Task(commands=[ TaskCommand(method='do.something', params={'value': True}) ]) """ method: str #: method name as in RPC request params: dict | None #: params as in RPC request max_timeout: int | None #: optional command execution timeout in seconds condition: Condition | dict #: optional command exec condition (must be True to execute)
class _SpecialCommand(Serializable, abc.ABC): """Special instructions for a task executor (base class).""" method: ClassVar[str] #: command name is required @abc.abstractmethod def repr(self) -> TaskCommand: """Get a task command dict."""
[docs]@dataclass(slots=True) class Message(_SpecialCommand): """Message hook. A task must wait until a message is received for this task to continue. .. code-block:: python Task(commands=[ TaskCommand(method='do.first'), Message(), TaskCommand(method='do.second') ]) Upon this stage the task will be put in :py:attr:`TaskStatus.WAITING` and the executor will drop the task. The manager will wait until the task service has received a message for this particular task. The message will be saved in the stage results (stage "1" in the example) and the task will continue. This command allows you to arbitrarily send data to a running task, for example to create tasks which depend on user input / external systems. """ method = '_MESSAGE_' max_timeout: int | None = None """Currently not used.""" def repr(self) -> TaskCommand: return TaskCommand(method=self.method, params=None, max_timeout=self.max_timeout)
[docs]@dataclass(slots=True) class Timer(_SpecialCommand): """Timer hook. The task must wait for a timer to continue. Use in your task chain of commands: .. code-block:: python Task(commands=[ TaskCommand(method='do.first'), Timer(60), TaskCommand(method='do.second') ]) Upon this stage the task will be put in :py:attr:`TaskStatus.WAITING` and the executor will drop the task. Once the timer is reached the manager will continue the task execution. Basically the process is similar to suspended tasks but with a wait timeout between stages. Timer always writes None into its stage results. """ method = '_TIMER_' timer: int """Wait time in seconds.""" def repr(self) -> TaskCommand: return TaskCommand(method=self.method, params={'timer': self.timer})
class _SpecialCommandsRegistry(ClassRegistry[str, _SpecialCommand]): """Registry for special task commands.""" @classmethod def get_base_classes(cls) -> tuple[type, ...]: return (_SpecialCommand,) def get_key(self, obj: _SpecialCommand) -> str: return obj.method SPECIAL_COMMANDS = _SpecialCommandsRegistry() SPECIAL_COMMANDS.register_from_namespace(locals())
[docs]class Limit(Enum): """Global task limits and parameters.""" MAX_STAGES = 100 """Max allowed number of commands inside of a `Task.commands` block.""" MAX_RETRIES = 10 """Max allowed number of task restarts set by `Task.max_retries`.""" MIN_T = 10 """Minimum allowed task timeout value in seconds set by `Task.max_exec_timeout`.""" DEFAULT_T = 300 """Default task timeout value in seconds set by `Task.max_exec_timeout`.""" MAX_T = 3600 * 4 """Maximum allowed task timeout value in seconds set by `Task.max_exec_timeout`.""" PING_INTERVAL = 30 """Executor ping interval in seconds. Each executor sends signals to the manager according to this interval. If an executor misses several consequent pings, the manager will suspend its tasks and remove the executor from the list of registered executors.""" SUSPEND_AFTER_PINGS = 3 """Number of pings for an executor to miss to be suspended by the manager."""
[docs]class TaskStatus(Enum): """Task status types.""" IDLE = 'IDLE' """Initial state. All newly created, reset tasks and restarted cron tasks become `IDLE` until they are queued by the manager.""" QUEUED = 'QUEUED' """Task is queued for execution by the manager. This status means that the task has been put to the queue (executor stream) but not yet has been acquired.""" EXECUTED = 'EXECUTED' """Task has been acquired by an executor and is being executed.""" SUSPENDED = 'SUSPENDED' """Executor running this task has been suspended due to an exit signal or because it missed several ping requests. Suspended tasks are `QUEUED` by the manager in the next cycle. """ WAITING = 'WAITING' """Task has encountered a :py:class:`~kaiju_tasks.types.Timer` or :py:class:`~kaiju_tasks.types.Message` special command and has been put on hold by the manager. Executor has dropped the task and it will not continue until the wait condition (either a timer or an external message) has been satisfied. Manager will put waiting tasks back to `QUEUED` once the condition is met. """ FINISHED = 'FINISHED' """Task has finished successfully and the result is available in `Task.result`.""" FAILED = 'FAILED' """Task has finished with an error, the error is available in `Task.error`."""
[docs]class ExitCode(Enum): """Task execution unix style exit codes.""" SUCCESS = 0 """Task is completed.""" EXECUTION_ERROR = 1 """One of the task commands has failed.""" ABORTED = 130 """Task has been aborted by the manager due to timeout or other reason."""
[docs]class RestartPolicy(Enum): """Task restart policy types. This setting can be set in `Task.restart_policy` to tell the manager which stage you want task to be restarted from in case of an error. Note that this setting is useless when `Task.max_retries` is not set. """ CURRENT = 'CURRENT' """Restart from the current (i.e. first failed) stage. Results from the previous stages will be preserved.""" FIRST = 'FIRST' """Clear all results and restart fresh from the first stage."""
[docs]class Task(TypedDict, total=False): """Task object. This is a typed dict which provides hints for task data. """ id: str #: generated / user-defined unique identifier # executor instructions app_name: str #: executor type (app.name) commands: list[TaskCommand | RPCRequest] #: sequential list of commands kws: dict #: additional kws template arguments # manager instructions enabled: bool #: inactive tasks are not processed cron: str #: cron instructions for periodic tasks max_exec_timeout: int #: (s) max allowed execution time in total max_retries: int #: max retries for a failed task (0 for no retries) restart_policy: str #: how the task will be restarted on failure, see :py:class:`~kaiju_tasks.types.RestartPolicy` notify: bool #: notify user about status changes next_task: str | None #: next task to run after finishing of this one system: bool #: system task (should never be removed by cleaning jobs) # meta description: str | None #: task long description, completely optional meta: dict #: task metadata, unused by the services group: str | None #: optional task group group_id: int | None #: optional task group id # managed params status: str #: current task status, see :py:class:`~kaiju_tasks.types.TaskStatus` result: list #: task execution result, a list of stage returns stage: int #: current stage (command) being executed stages: int #: number of commands in this task queued_at: int | None #: UNIX time last queued exec_deadline: int | None #: UNIX time deadline wait_deadline: int | None #: UNIX time deadline for a timer command (see :py:class:`~kaiju_tasks.types.Timer`) next_run: int | None #: UNIX time for next run status_change: int | None #: last change of status user_id: UUID | None #: user created the task executor_id: UUID | None #: which executor has this task job_id: str | None #: updated for each new run retries: int #: current number of retries created: datetime #: when task record was added to the table exit_code: int | None #: exit (error) code similar to UNIX codes (see :py:class:`~kaiju_tasks.types.ExitCode`) error: dict | None #: error.repr() if there's an error
class ExecutorTask(TypedDict): """Task data provided to an executor by the manager.""" id: str #: task id commands: list[TaskCommand | RPCRequest] #: sequential list of commands kws: dict #: additional template arguments result: list #: task execution result, a list of stage returns stage: int #: current stage being executed (or about to execute) stages: int #: total number of stages exec_deadline: int #: UNIX time deadline job_id: str #: current job id for this task
[docs]class Notification(TypedDict, total=False): """Notification object. This is a typed dict which provides hints for task notifications data. """ id: UUID #: generated message: str | None #: human-readable message or tag kws: dict | None #: format keywords created: datetime #: timestamp enabled: bool #: mark as read user_id: UUID | None #: receiver task_id: str | None #: task id job_id: str | None #: job id status: str | None #: task job status result: list | None #: job results exit_code: int | None #: unix style exit code (see :py:class:`~kaiju_tasks.types.ExitCode`) error: dict | None #: error.repr() if the task has failed