services - application services

class TaskService[source]

Bases: SQLService[str, Task], PermHook, PublicInterface

Tasks public interface.

This service should be used to create, view or modify tasks. The interface is the same as in SQLService.

Task service is not required by executors so you can have a single task service for different executor applications.

service_name: str = 'tasks'

you may define a custom service name here

async delete_old_tasks(interval_days: int = 7) None[source]

Delete old tasks and notifications (excluding periodic and system tasks).

RPC method: tasks.delete_old_tasks

This method can be used to remove finished not needed tasks from the database. It will not delete cron tasks or tasks marked as system. The proper way to use it is to make a cron task with this method.

Parameters:

interval_days – delete tasks and notifications older than interval_days days

async reset_task(id: str) bool[source]

Reset task to IDLE.

RPC method: tasks.reset

All results and execution data will be removed and a new job_id will be assigned to the task.

Parameters:

id – task id

Returns:

True if task has been restarted, False if it can’t be restarted

async write_message(id: str, data: dict) bool[source]

Send a message to a running task.

RPC method: tasks.write_message

The task must be WAITING. The task will continue to the next stage only after it receives a message.

Parameters:
  • id – task id

  • data – message data will be available in the task results for the next stage

Returns:

True if a task has received the message

prepare_insert_data(data: dict)[source]

Prepare task object.

prepare_update_data(data: dict)[source]

Prepare task object.

DEFAULT_ROW_LIMIT = 24

defaults of LIMIT on queries

MAX_ROW_LIMIT = 1000

max size of queries

__init__(app, database_service: DatabaseService | str = None, logger=None)

Initialize.

async create(data: dict, columns: Collection[str] | Literal['*'] | None = '*', _connection=None, on_conflict=None, on_conflict_keys=None, on_conflict_values=None) _Row

Create a single object.

Parameters:
  • data – objects data

  • columns – columns to return, None for no return

  • on_conflict – on conflict clause if required (‘do_nothing’, ‘do_update’)

  • on_conflict_keys – list of on conflict constraints

  • on_conflict_values – an object with on conflict values, used only by do_update clause

  • _connection – optional connection object (when using inside a transactional block)

Returns:

inserted object

async delete(id: _Id, columns: Collection[str] | Literal['*'] | None = None, _connection=None) _Row

Remove a single object from a table.

If you have composite primary keys you should pass dictionary objects in id field, like this:

await service.delete({'id': 1, 'key': 'abc'}, ...)
Raises:

NotFound – if object doesn’t exist or already was deleted

static delete_condition_hook(sql)

Set up specific delete conditions.

discover_service(name: str | Service | False | None, cls: str | type | Iterable[str | type] = None, required=True)

Discover a service using specified name and/or service class.

Parameters:
  • name – specify a service name or service instance (in latter case it will be returned as is) False means that nothing will be returned, i.e. service will be disabled

  • cls – specify service class. If name wasn’t specified, then the first service matching given class will be returned. If name and class both were specified, then the type check will be performed on a newly discovered service

  • required – means that an exception will rise if service doesn’t exist otherwise in this case None will be returned

async exists(id: _Id, _connection=None) bool

Return True if object exists. False otherwise.

If you have composite primary keys you should pass dictionary objects in id field, like this:

await service.exists({'id': 1, 'key': 'abc'})
async get(id: _Id, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) _Row

Return information about an object.

If you have composite primary keys you should pass dictionary objects in id field, like this:

await service.get({'id': 1, 'key': 'abc'}, ...)
Raises:

NotFound

static get_condition_hook(sql)

Set up specific get conditions.

get_request_context() RequestContext | None

Get current user request context.

get_session()

Get current user session.

get_user_id()

Return current session user id.

has_permission(permission: str) bool

Check if a user session has a particular permission.

insert_columns = None

you may specify insert columns here

static insert_condition_hook(sql)

Set up specific insert conditions.

async iter(conditions: dict | list[dict] | None = None, sort: list[Union[kaiju_db.services._SortDesc, kaiju_db.services._SortAsc, str]] | None = None, offset: int = 0, limit: int = DEFAULT_ROW_LIMIT, columns: Collection[str] | Literal['*'] | None = '*') AsyncGenerator[List[_Row], None]

Iterate over listed data.

Almost the same as SQLService.list but returns a generator which iterates over the query content. It’s not intended to be used by a client but inside the app or the service itself.

See SQLService.list for info about params.

async list(conditions: dict | list[dict] | None = None, sort: list[Union[kaiju_db.services._SortDesc, kaiju_db.services._SortAsc, str]] | None = None, offset: int = 0, limit: int = DEFAULT_ROW_LIMIT, count: bool = True, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) _List

List rows with pagination and conditions.

Parameters:
  • conditions – optional query conditions

  • sort – optional row ordering

  • offset – optional row offset

  • limit – optional row limit

  • count – calculate page count

  • columns – columns to return

  • _connection – optional connection object (when using inside a transactional block)

Returns:

This method may return different data depending on the provided params

Condition example:

service.list(
    conditions={
        'tag': ['tag_1', 'tag_2', 'tag_3'],  # IN condition
        'active': True,                      # EQ condition
        'value': {'gt': 41, 'le': 42'},      # num conditions,
        'text': {'like': 'sht'},             # text field "likeness"
    }
)

Available numeric conditions: gt, lt, ge, le, eq Available other conditions: like

Sort example:

service.list(
    sort=['tag', {'desc': 'timestamp'}]     # order matters
)

Available sorting conditions: desc, asc (default)

You can use this method for counting without returning any results. Just set the limit to zero. Optionally, you can also set the counting precision.

service.list(
    conditions={ ... },
    precision=50
    limit=0
)

Contrary, if you don’t need counting, you can disable it. No count / page data will be available then.

service.list(
    conditions={ ... },
    count=False
)

Precision uses a number of table samples to estimate the count. If the precision is set to 0 or None, then the exact count will be performed.

Attention

Precision is not working at the moment (don’t know why).

If count argument is False, then count, page and pages result values will be None.

If columns is None, then data will be None and on_page will be zero.

{
    count: Optional[int]          #: total rows matching the query, None if count hasn't been requested
    offset: int                   #: row offset for this selection
    page: Optional[int]           #: current page number, None if count hasn't been requested
    pages: Optional[int]          #: total pages, None if count hasn't been requested
    on_page: int                  #: number of rows on this page
    data: Optional[List[dict]]    #: returned rows, None if limit was set to 0
}
async m_create(data: Collection[dict], columns: Collection[str] | Literal['*'] | None = '*', _connection=None, on_conflict: str = None, on_conflict_keys: list = None, on_conflict_values: dict = None) list[_Row]

Create multiple objects.

Parameters:
  • data – list of objects data

  • columns – columns to return, None for no return

  • on_conflict – on conflict clause if required (‘do_nothing’, ‘do_update’)

  • on_conflict_keys – list of on conflict constraints

  • on_conflict_values – an object with on conflict values, used only by do_update clause

  • _connection – optional connection object (when using inside a transactional block)

Returns:

inserted objects

async m_delete(id: Collection[_Id] = None, conditions: dict | list[dict] | None = None, columns: Collection[str] | Literal['*'] | None = None, _connection=None) list[_Row]

Remove multiple objects from a table. Non-existing objects will be skipped.

If you have composite primary keys you should pass dictionary objects in id field, like this:

await service.m_delete([{'id': 1, 'key': 'abc'}, ...], ...)
async m_exists(id: Collection[_Id], _connection=None) frozenset[_Id]

Return a set of existing IDs for a list of IDs.

If you have composite primary keys you should pass dictionary objects in id field, like this:

await service.m_exists([{'id': 1, 'key': 'abc'}, ...])
async m_get(id: Collection[_Id] = None, conditions: dict | list[dict] | None = None, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) list[_Row]

Return multiple objects.

Objects that don’t exist will be skipped. Returns all data at once without pagination. Use SQLService.list if you want pagination or sorting.

If you have composite primary keys you should pass dictionary objects in id field, like this:

await service.m_get([{'id': 1, 'key': 'abc'}, ...], ...)
async m_update(id: Collection[_Id], data: dict, conditions: dict | list[dict] | None = None, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) list[_Row]

Update multiple objects with the same data. Non-existing objects will be skipped.

If you have composite primary keys you should pass dictionary objects in id field, like this:

await service.m_update([{'id': 1, 'key': 'abc'}, ...], ...)
property permission_modify: str

Get a modify permission key.

property permission_view: str

Get a view permission key.

select_columns = None

you can specify a whitelist of output columns here

system_user() bool

Check if user session has the system scope.

async update(id: _Id, data: dict, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) _Row

Update a single object. Raises error if object doesn’t exist.

If you have composite primary keys you should pass dictionary objects in id field, like this:

await service.update({'id': 1, 'key': 'abc'}, ...)
update_columns = None

you may specify columns for update here

static update_condition_hook(sql)

Set up specific update conditions.

class NotificationService[source]

Bases: SQLService[UUID, Notification], PermHook, PublicInterface

Task notifications interface.

Notification service stores task job history for notify tasks. Its interface is similar to SQLService.

Executors do not require notification service.

service_name: str = 'notifications'

you may define a custom service name here

update_columns = {'enabled'}

you may specify columns for update here

DEFAULT_ROW_LIMIT = 24

defaults of LIMIT on queries

MAX_ROW_LIMIT = 1000

max size of queries

__init__(app, database_service: DatabaseService | str = None, logger=None)

Initialize.

async create(data: dict, columns: Collection[str] | Literal['*'] | None = '*', _connection=None, on_conflict=None, on_conflict_keys=None, on_conflict_values=None) _Row

Create a single object.

Parameters:
  • data – objects data

  • columns – columns to return, None for no return

  • on_conflict – on conflict clause if required (‘do_nothing’, ‘do_update’)

  • on_conflict_keys – list of on conflict constraints

  • on_conflict_values – an object with on conflict values, used only by do_update clause

  • _connection – optional connection object (when using inside a transactional block)

Returns:

inserted object

async delete(id: _Id, columns: Collection[str] | Literal['*'] | None = None, _connection=None) _Row

Remove a single object from a table.

If you have composite primary keys you should pass dictionary objects in id field, like this:

await service.delete({'id': 1, 'key': 'abc'}, ...)
Raises:

NotFound – if object doesn’t exist or already was deleted

static delete_condition_hook(sql)

Set up specific delete conditions.

discover_service(name: str | Service | False | None, cls: str | type | Iterable[str | type] = None, required=True)

Discover a service using specified name and/or service class.

Parameters:
  • name – specify a service name or service instance (in latter case it will be returned as is) False means that nothing will be returned, i.e. service will be disabled

  • cls – specify service class. If name wasn’t specified, then the first service matching given class will be returned. If name and class both were specified, then the type check will be performed on a newly discovered service

  • required – means that an exception will rise if service doesn’t exist otherwise in this case None will be returned

async exists(id: _Id, _connection=None) bool

Return True if object exists. False otherwise.

If you have composite primary keys you should pass dictionary objects in id field, like this:

await service.exists({'id': 1, 'key': 'abc'})
async get(id: _Id, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) _Row

Return information about an object.

If you have composite primary keys you should pass dictionary objects in id field, like this:

await service.get({'id': 1, 'key': 'abc'}, ...)
Raises:

NotFound

static get_condition_hook(sql)

Set up specific get conditions.

get_request_context() RequestContext | None

Get current user request context.

get_session()

Get current user session.

get_user_id()

Return current session user id.

has_permission(permission: str) bool

Check if a user session has a particular permission.

insert_columns = None

you may specify insert columns here

static insert_condition_hook(sql)

Set up specific insert conditions.

async iter(conditions: dict | list[dict] | None = None, sort: list[Union[kaiju_db.services._SortDesc, kaiju_db.services._SortAsc, str]] | None = None, offset: int = 0, limit: int = DEFAULT_ROW_LIMIT, columns: Collection[str] | Literal['*'] | None = '*') AsyncGenerator[List[_Row], None]

Iterate over listed data.

Almost the same as SQLService.list but returns a generator which iterates over the query content. It’s not intended to be used by a client but inside the app or the service itself.

See SQLService.list for info about params.

async list(conditions: dict | list[dict] | None = None, sort: list[Union[kaiju_db.services._SortDesc, kaiju_db.services._SortAsc, str]] | None = None, offset: int = 0, limit: int = DEFAULT_ROW_LIMIT, count: bool = True, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) _List

List rows with pagination and conditions.

Parameters:
  • conditions – optional query conditions

  • sort – optional row ordering

  • offset – optional row offset

  • limit – optional row limit

  • count – calculate page count

  • columns – columns to return

  • _connection – optional connection object (when using inside a transactional block)

Returns:

This method may return different data depending on the provided params

Condition example:

service.list(
    conditions={
        'tag': ['tag_1', 'tag_2', 'tag_3'],  # IN condition
        'active': True,                      # EQ condition
        'value': {'gt': 41, 'le': 42'},      # num conditions,
        'text': {'like': 'sht'},             # text field "likeness"
    }
)

Available numeric conditions: gt, lt, ge, le, eq Available other conditions: like

Sort example:

service.list(
    sort=['tag', {'desc': 'timestamp'}]     # order matters
)

Available sorting conditions: desc, asc (default)

You can use this method for counting without returning any results. Just set the limit to zero. Optionally, you can also set the counting precision.

service.list(
    conditions={ ... },
    precision=50
    limit=0
)

Contrary, if you don’t need counting, you can disable it. No count / page data will be available then.

service.list(
    conditions={ ... },
    count=False
)

Precision uses a number of table samples to estimate the count. If the precision is set to 0 or None, then the exact count will be performed.

Attention

Precision is not working at the moment (don’t know why).

If count argument is False, then count, page and pages result values will be None.

If columns is None, then data will be None and on_page will be zero.

{
    count: Optional[int]          #: total rows matching the query, None if count hasn't been requested
    offset: int                   #: row offset for this selection
    page: Optional[int]           #: current page number, None if count hasn't been requested
    pages: Optional[int]          #: total pages, None if count hasn't been requested
    on_page: int                  #: number of rows on this page
    data: Optional[List[dict]]    #: returned rows, None if limit was set to 0
}
async m_create(data: Collection[dict], columns: Collection[str] | Literal['*'] | None = '*', _connection=None, on_conflict: str = None, on_conflict_keys: list = None, on_conflict_values: dict = None) list[_Row]

Create multiple objects.

Parameters:
  • data – list of objects data

  • columns – columns to return, None for no return

  • on_conflict – on conflict clause if required (‘do_nothing’, ‘do_update’)

  • on_conflict_keys – list of on conflict constraints

  • on_conflict_values – an object with on conflict values, used only by do_update clause

  • _connection – optional connection object (when using inside a transactional block)

Returns:

inserted objects

async m_delete(id: Collection[_Id] = None, conditions: dict | list[dict] | None = None, columns: Collection[str] | Literal['*'] | None = None, _connection=None) list[_Row]

Remove multiple objects from a table. Non-existing objects will be skipped.

If you have composite primary keys you should pass dictionary objects in id field, like this:

await service.m_delete([{'id': 1, 'key': 'abc'}, ...], ...)
async m_exists(id: Collection[_Id], _connection=None) frozenset[_Id]

Return a set of existing IDs for a list of IDs.

If you have composite primary keys you should pass dictionary objects in id field, like this:

await service.m_exists([{'id': 1, 'key': 'abc'}, ...])
async m_get(id: Collection[_Id] = None, conditions: dict | list[dict] | None = None, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) list[_Row]

Return multiple objects.

Objects that don’t exist will be skipped. Returns all data at once without pagination. Use SQLService.list if you want pagination or sorting.

If you have composite primary keys you should pass dictionary objects in id field, like this:

await service.m_get([{'id': 1, 'key': 'abc'}, ...], ...)
async m_update(id: Collection[_Id], data: dict, conditions: dict | list[dict] | None = None, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) list[_Row]

Update multiple objects with the same data. Non-existing objects will be skipped.

If you have composite primary keys you should pass dictionary objects in id field, like this:

await service.m_update([{'id': 1, 'key': 'abc'}, ...], ...)
property permission_modify: str

Get a modify permission key.

property permission_view: str

Get a view permission key.

static prepare_insert_data(data: dict)

Define your custom row init logic here.

static prepare_update_data(data: dict)

You may define your custom row update logic here.

select_columns = None

you can specify a whitelist of output columns here

system_user() bool

Check if user session has the system scope.

async update(id: _Id, data: dict, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) _Row

Update a single object. Raises error if object doesn’t exist.

If you have composite primary keys you should pass dictionary objects in id field, like this:

await service.update({'id': 1, 'key': 'abc'}, ...)
static update_condition_hook(sql)

Set up specific update conditions.

class TaskManager[source]

Bases: ContextableService, PublicInterface

Task manager schedules tasks execution.

service_name = 'taskman'

you may define a custom service name here

__init__(app, stream_client: StreamRPCClient = None, database_service: DatabaseService = None, redis_transport: RedisTransportService = None, notification_service: NotificationService = None, scheduler_service: Scheduler = None, executor_topic: str = Topic.EXECUTOR, refresh_rate: int = 1, suspended_task_lifetime_hours: int = 24, logger=None)[source]

Initialize.

Parameters:
  • app – web app

  • database_service – database connector instance or service name

  • stream_client – stream client to the executor

  • redis_transport – a cache for executor states :param notification_service: notification service instance or service name

  • scheduler_service – internal loop scheduler

  • refresh_rate – watcher loop refresh rate in seconds

  • executor_topic – optional topic name for executor

  • suspended_task_lifetime_hours – if task was last queued before this interval, it won’t be executed again

  • logger – optional logger

async list_active_executors() dict[source]

Return executor ids and their last ping time.

RPC method: manager.list_active_executors

async ping(executor_id: UUID) None[source]

Receive ping from an executor.

RPC method: manager.ping

This method is used by executors to tell the manager that they are alive.

Parameters:

executor_id – executor instance app id

async suspend_executor(executor_id: UUID) None[source]

Suspend an executor and its tasks.

RPC method: manager.suspend_executor

This method is used by an executor when it’s about to exit.

Parameters:

executor_id – executor instance app id

async execute_stage(task_id: str, executor_id: UUID, stage: int, timestamp: int) None[source]

Accept an executor report on stage execution.

RPC method: manager.execute_stage

This method is used by an executor before each stage to notify the manager which stage is being executed.

Parameters:
  • task_id – task id

  • executor_id – executor instance app id

  • stage – current stage

  • timestamp – executor UNIX time

async wait_for_timer(task_id: str, executor_id: UUID, stage: int, timer: int, timestamp: int) None[source]

Require a task to wait for a message to continue.

RPC method: manager.wait_for_timer

The task will continue after certain timeout. This method is used by an executor when it encounters a Timer special command.

Parameters:
  • task_id – task id

  • executor_id – executor instance app id

  • stage – current stage

  • timer – timer value in seconds

  • timestamp – executor UNIX time

async wait_for_message(task_id: str, executor_id: UUID, stage: int, timestamp: int) None[source]

Require a task to wait for a message to continue.

RPC method: manager.wait_for_message

The task will continue when a message to this task is received by the TaskService. This method is used by an executor when it encounters a Message special command.

Parameters:
  • task_id – task id

  • executor_id – executor instance app id

  • stage – current stage

  • timestamp – executor UNIX time

async write_stage(task_id: str, executor_id: UUID, stage: int, stages: int, result: Any, error: bool, timestamp: int) None[source]

Write stage result to the task table.

RPC method: manager.write_stage

This method is used by executors to return each stage results to the manager. If it’s the last stage or there was an error, the task will be finished and notification may be created.

Parameters:
  • task_id – task id

  • executor_id – executor instance app id

  • stage – current stage

  • stages – total number of stages

  • result – result value

  • error – result is an error

  • timestamp – executor UNIX time

class TaskExecutor[source]

Bases: ContextableService, PublicInterface

Task executor receives and processes tasks from a manager.

Executor can be of different app than the manager. Executors do not require access to the tasks table or task / notification services but do require configured RPC stream to the manager. They also require an executor stream listener with exposed TaskExecutor.run_task method.

__init__(app, stream_client: StreamRPCClient = None, manager_app: str = None, manager_topic: str = Topic.MANAGER, rpc_service: JSONRPCServer = None, scheduler: Scheduler = None, data: dict = None, logger=None)[source]

Initialize.

Parameters:
  • app – web app

  • manager_app – manager app name (by default: same as executor)

  • manager_topic – manager topic name

  • rpc_service – local rpc server name or instance

  • scheduler – local scheduler

  • stream_client – stream client to the manager

  • data – additional data for command templates

  • logger – optional logger instance

async run_task(data: ExecutorTask) None[source]

Run a task locally.

RPC method: executor.run_task

This method is used by the task manager to send tasks to executors and should be exposed as public in the executor RPC stream.

An executor runs a task stage by stage in a local RPC server with correlation_id equals to the task job_id. An executor sends execute_stage message to the manager before each stage and write_stage with stage results after it’s completed.

The method returns nothing because all task data is sent in separate stream requests.

Parameters:

data – task partial data