services - application services¶
- class TaskService[source]¶
Bases:
SQLService[str,Task],PermHook,PublicInterfaceTasks public interface.
This service should be used to create, view or modify tasks. The interface is the same as in SQLService.
Task service is not required by executors so you can have a single task service for different executor applications.
- service_name: str = 'tasks'¶
you may define a custom service name here
- async delete_old_tasks(interval_days: int = 7) None[source]¶
Delete old tasks and notifications (excluding periodic and system tasks).
RPC method: tasks.delete_old_tasks
This method can be used to remove finished not needed tasks from the database. It will not delete cron tasks or tasks marked as system. The proper way to use it is to make a cron task with this method.
- Parameters:
interval_days – delete tasks and notifications older than interval_days days
- async reset_task(id: str) bool[source]¶
Reset task to
IDLE.RPC method: tasks.reset
All results and execution data will be removed and a new job_id will be assigned to the task.
- Parameters:
id – task id
- Returns:
True if task has been restarted, False if it can’t be restarted
- async write_message(id: str, data: dict) bool[source]¶
Send a message to a running task.
RPC method: tasks.write_message
The task must be
WAITING. The task will continue to the next stage only after it receives a message.- Parameters:
id – task id
data – message data will be available in the task results for the next stage
- Returns:
True if a task has received the message
- DEFAULT_ROW_LIMIT = 24¶
defaults of LIMIT on queries
- MAX_ROW_LIMIT = 1000¶
max size of queries
- __init__(app, database_service: DatabaseService | str = None, logger=None)¶
Initialize.
- async create(data: dict, columns: Collection[str] | Literal['*'] | None = '*', _connection=None, on_conflict=None, on_conflict_keys=None, on_conflict_values=None) _Row¶
Create a single object.
- Parameters:
data – objects data
columns – columns to return, None for no return
on_conflict – on conflict clause if required (‘do_nothing’, ‘do_update’)
on_conflict_keys – list of on conflict constraints
on_conflict_values – an object with on conflict values, used only by do_update clause
_connection – optional connection object (when using inside a transactional block)
- Returns:
inserted object
- async delete(id: _Id, columns: Collection[str] | Literal['*'] | None = None, _connection=None) _Row¶
Remove a single object from a table.
If you have composite primary keys you should pass dictionary objects in id field, like this:
await service.delete({'id': 1, 'key': 'abc'}, ...)
- Raises:
NotFound – if object doesn’t exist or already was deleted
- static delete_condition_hook(sql)¶
Set up specific delete conditions.
- discover_service(name: str | Service | False | None, cls: str | type | Iterable[str | type] = None, required=True)¶
Discover a service using specified name and/or service class.
- Parameters:
name – specify a service name or service instance (in latter case it will be returned as is) False means that nothing will be returned, i.e. service will be disabled
cls – specify service class. If name wasn’t specified, then the first service matching given class will be returned. If name and class both were specified, then the type check will be performed on a newly discovered service
required – means that an exception will rise if service doesn’t exist otherwise in this case None will be returned
- async exists(id: _Id, _connection=None) bool¶
Return True if object exists. False otherwise.
If you have composite primary keys you should pass dictionary objects in id field, like this:
await service.exists({'id': 1, 'key': 'abc'})
- async get(id: _Id, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) _Row¶
Return information about an object.
If you have composite primary keys you should pass dictionary objects in id field, like this:
await service.get({'id': 1, 'key': 'abc'}, ...)
- Raises:
NotFound –
- static get_condition_hook(sql)¶
Set up specific get conditions.
- get_request_context() RequestContext | None¶
Get current user request context.
- get_session()¶
Get current user session.
- get_user_id()¶
Return current session user id.
- has_permission(permission: str) bool¶
Check if a user session has a particular permission.
- insert_columns = None¶
you may specify insert columns here
- static insert_condition_hook(sql)¶
Set up specific insert conditions.
- async iter(conditions: dict | list[dict] | None = None, sort: list[Union[kaiju_db.services._SortDesc, kaiju_db.services._SortAsc, str]] | None = None, offset: int = 0, limit: int = DEFAULT_ROW_LIMIT, columns: Collection[str] | Literal['*'] | None = '*') AsyncGenerator[List[_Row], None]¶
Iterate over listed data.
Almost the same as SQLService.list but returns a generator which iterates over the query content. It’s not intended to be used by a client but inside the app or the service itself.
See SQLService.list for info about params.
- async list(conditions: dict | list[dict] | None = None, sort: list[Union[kaiju_db.services._SortDesc, kaiju_db.services._SortAsc, str]] | None = None, offset: int = 0, limit: int = DEFAULT_ROW_LIMIT, count: bool = True, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) _List¶
List rows with pagination and conditions.
- Parameters:
conditions – optional query conditions
sort – optional row ordering
offset – optional row offset
limit – optional row limit
count – calculate page count
columns – columns to return
_connection – optional connection object (when using inside a transactional block)
- Returns:
This method may return different data depending on the provided params
Condition example:
service.list( conditions={ 'tag': ['tag_1', 'tag_2', 'tag_3'], # IN condition 'active': True, # EQ condition 'value': {'gt': 41, 'le': 42'}, # num conditions, 'text': {'like': 'sht'}, # text field "likeness" } )
Available numeric conditions: gt, lt, ge, le, eq Available other conditions: like
Sort example:
service.list( sort=['tag', {'desc': 'timestamp'}] # order matters )
Available sorting conditions: desc, asc (default)
You can use this method for counting without returning any results. Just set the limit to zero. Optionally, you can also set the counting precision.
service.list( conditions={ ... }, precision=50 limit=0 )
Contrary, if you don’t need counting, you can disable it. No count / page data will be available then.
service.list( conditions={ ... }, count=False )
Precision uses a number of table samples to estimate the count. If the precision is set to 0 or None, then the exact count will be performed.
Attention
Precision is not working at the moment (don’t know why).
If count argument is False, then count, page and pages result values will be None.
If columns is None, then data will be None and on_page will be zero.
{ count: Optional[int] #: total rows matching the query, None if count hasn't been requested offset: int #: row offset for this selection page: Optional[int] #: current page number, None if count hasn't been requested pages: Optional[int] #: total pages, None if count hasn't been requested on_page: int #: number of rows on this page data: Optional[List[dict]] #: returned rows, None if limit was set to 0 }
- async m_create(data: Collection[dict], columns: Collection[str] | Literal['*'] | None = '*', _connection=None, on_conflict: str = None, on_conflict_keys: list = None, on_conflict_values: dict = None) list[_Row]¶
Create multiple objects.
- Parameters:
data – list of objects data
columns – columns to return, None for no return
on_conflict – on conflict clause if required (‘do_nothing’, ‘do_update’)
on_conflict_keys – list of on conflict constraints
on_conflict_values – an object with on conflict values, used only by do_update clause
_connection – optional connection object (when using inside a transactional block)
- Returns:
inserted objects
- async m_delete(id: Collection[_Id] = None, conditions: dict | list[dict] | None = None, columns: Collection[str] | Literal['*'] | None = None, _connection=None) list[_Row]¶
Remove multiple objects from a table. Non-existing objects will be skipped.
If you have composite primary keys you should pass dictionary objects in id field, like this:
await service.m_delete([{'id': 1, 'key': 'abc'}, ...], ...)
- async m_exists(id: Collection[_Id], _connection=None) frozenset[_Id]¶
Return a set of existing IDs for a list of IDs.
If you have composite primary keys you should pass dictionary objects in id field, like this:
await service.m_exists([{'id': 1, 'key': 'abc'}, ...])
- async m_get(id: Collection[_Id] = None, conditions: dict | list[dict] | None = None, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) list[_Row]¶
Return multiple objects.
Objects that don’t exist will be skipped. Returns all data at once without pagination. Use SQLService.list if you want pagination or sorting.
If you have composite primary keys you should pass dictionary objects in id field, like this:
await service.m_get([{'id': 1, 'key': 'abc'}, ...], ...)
- async m_update(id: Collection[_Id], data: dict, conditions: dict | list[dict] | None = None, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) list[_Row]¶
Update multiple objects with the same data. Non-existing objects will be skipped.
If you have composite primary keys you should pass dictionary objects in id field, like this:
await service.m_update([{'id': 1, 'key': 'abc'}, ...], ...)
- property permission_modify: str¶
Get a modify permission key.
- property permission_view: str¶
Get a view permission key.
- select_columns = None¶
you can specify a whitelist of output columns here
- system_user() bool¶
Check if user session has the system scope.
- async update(id: _Id, data: dict, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) _Row¶
Update a single object. Raises error if object doesn’t exist.
If you have composite primary keys you should pass dictionary objects in id field, like this:
await service.update({'id': 1, 'key': 'abc'}, ...)
- update_columns = None¶
you may specify columns for update here
- static update_condition_hook(sql)¶
Set up specific update conditions.
- class NotificationService[source]¶
Bases:
SQLService[UUID,Notification],PermHook,PublicInterfaceTask notifications interface.
Notification service stores task job history for notify tasks. Its interface is similar to SQLService.
Executors do not require notification service.
- service_name: str = 'notifications'¶
you may define a custom service name here
- update_columns = {'enabled'}¶
you may specify columns for update here
- DEFAULT_ROW_LIMIT = 24¶
defaults of LIMIT on queries
- MAX_ROW_LIMIT = 1000¶
max size of queries
- __init__(app, database_service: DatabaseService | str = None, logger=None)¶
Initialize.
- async create(data: dict, columns: Collection[str] | Literal['*'] | None = '*', _connection=None, on_conflict=None, on_conflict_keys=None, on_conflict_values=None) _Row¶
Create a single object.
- Parameters:
data – objects data
columns – columns to return, None for no return
on_conflict – on conflict clause if required (‘do_nothing’, ‘do_update’)
on_conflict_keys – list of on conflict constraints
on_conflict_values – an object with on conflict values, used only by do_update clause
_connection – optional connection object (when using inside a transactional block)
- Returns:
inserted object
- async delete(id: _Id, columns: Collection[str] | Literal['*'] | None = None, _connection=None) _Row¶
Remove a single object from a table.
If you have composite primary keys you should pass dictionary objects in id field, like this:
await service.delete({'id': 1, 'key': 'abc'}, ...)
- Raises:
NotFound – if object doesn’t exist or already was deleted
- static delete_condition_hook(sql)¶
Set up specific delete conditions.
- discover_service(name: str | Service | False | None, cls: str | type | Iterable[str | type] = None, required=True)¶
Discover a service using specified name and/or service class.
- Parameters:
name – specify a service name or service instance (in latter case it will be returned as is) False means that nothing will be returned, i.e. service will be disabled
cls – specify service class. If name wasn’t specified, then the first service matching given class will be returned. If name and class both were specified, then the type check will be performed on a newly discovered service
required – means that an exception will rise if service doesn’t exist otherwise in this case None will be returned
- async exists(id: _Id, _connection=None) bool¶
Return True if object exists. False otherwise.
If you have composite primary keys you should pass dictionary objects in id field, like this:
await service.exists({'id': 1, 'key': 'abc'})
- async get(id: _Id, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) _Row¶
Return information about an object.
If you have composite primary keys you should pass dictionary objects in id field, like this:
await service.get({'id': 1, 'key': 'abc'}, ...)
- Raises:
NotFound –
- static get_condition_hook(sql)¶
Set up specific get conditions.
- get_request_context() RequestContext | None¶
Get current user request context.
- get_session()¶
Get current user session.
- get_user_id()¶
Return current session user id.
- has_permission(permission: str) bool¶
Check if a user session has a particular permission.
- insert_columns = None¶
you may specify insert columns here
- static insert_condition_hook(sql)¶
Set up specific insert conditions.
- async iter(conditions: dict | list[dict] | None = None, sort: list[Union[kaiju_db.services._SortDesc, kaiju_db.services._SortAsc, str]] | None = None, offset: int = 0, limit: int = DEFAULT_ROW_LIMIT, columns: Collection[str] | Literal['*'] | None = '*') AsyncGenerator[List[_Row], None]¶
Iterate over listed data.
Almost the same as SQLService.list but returns a generator which iterates over the query content. It’s not intended to be used by a client but inside the app or the service itself.
See SQLService.list for info about params.
- async list(conditions: dict | list[dict] | None = None, sort: list[Union[kaiju_db.services._SortDesc, kaiju_db.services._SortAsc, str]] | None = None, offset: int = 0, limit: int = DEFAULT_ROW_LIMIT, count: bool = True, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) _List¶
List rows with pagination and conditions.
- Parameters:
conditions – optional query conditions
sort – optional row ordering
offset – optional row offset
limit – optional row limit
count – calculate page count
columns – columns to return
_connection – optional connection object (when using inside a transactional block)
- Returns:
This method may return different data depending on the provided params
Condition example:
service.list( conditions={ 'tag': ['tag_1', 'tag_2', 'tag_3'], # IN condition 'active': True, # EQ condition 'value': {'gt': 41, 'le': 42'}, # num conditions, 'text': {'like': 'sht'}, # text field "likeness" } )
Available numeric conditions: gt, lt, ge, le, eq Available other conditions: like
Sort example:
service.list( sort=['tag', {'desc': 'timestamp'}] # order matters )
Available sorting conditions: desc, asc (default)
You can use this method for counting without returning any results. Just set the limit to zero. Optionally, you can also set the counting precision.
service.list( conditions={ ... }, precision=50 limit=0 )
Contrary, if you don’t need counting, you can disable it. No count / page data will be available then.
service.list( conditions={ ... }, count=False )
Precision uses a number of table samples to estimate the count. If the precision is set to 0 or None, then the exact count will be performed.
Attention
Precision is not working at the moment (don’t know why).
If count argument is False, then count, page and pages result values will be None.
If columns is None, then data will be None and on_page will be zero.
{ count: Optional[int] #: total rows matching the query, None if count hasn't been requested offset: int #: row offset for this selection page: Optional[int] #: current page number, None if count hasn't been requested pages: Optional[int] #: total pages, None if count hasn't been requested on_page: int #: number of rows on this page data: Optional[List[dict]] #: returned rows, None if limit was set to 0 }
- async m_create(data: Collection[dict], columns: Collection[str] | Literal['*'] | None = '*', _connection=None, on_conflict: str = None, on_conflict_keys: list = None, on_conflict_values: dict = None) list[_Row]¶
Create multiple objects.
- Parameters:
data – list of objects data
columns – columns to return, None for no return
on_conflict – on conflict clause if required (‘do_nothing’, ‘do_update’)
on_conflict_keys – list of on conflict constraints
on_conflict_values – an object with on conflict values, used only by do_update clause
_connection – optional connection object (when using inside a transactional block)
- Returns:
inserted objects
- async m_delete(id: Collection[_Id] = None, conditions: dict | list[dict] | None = None, columns: Collection[str] | Literal['*'] | None = None, _connection=None) list[_Row]¶
Remove multiple objects from a table. Non-existing objects will be skipped.
If you have composite primary keys you should pass dictionary objects in id field, like this:
await service.m_delete([{'id': 1, 'key': 'abc'}, ...], ...)
- async m_exists(id: Collection[_Id], _connection=None) frozenset[_Id]¶
Return a set of existing IDs for a list of IDs.
If you have composite primary keys you should pass dictionary objects in id field, like this:
await service.m_exists([{'id': 1, 'key': 'abc'}, ...])
- async m_get(id: Collection[_Id] = None, conditions: dict | list[dict] | None = None, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) list[_Row]¶
Return multiple objects.
Objects that don’t exist will be skipped. Returns all data at once without pagination. Use SQLService.list if you want pagination or sorting.
If you have composite primary keys you should pass dictionary objects in id field, like this:
await service.m_get([{'id': 1, 'key': 'abc'}, ...], ...)
- async m_update(id: Collection[_Id], data: dict, conditions: dict | list[dict] | None = None, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) list[_Row]¶
Update multiple objects with the same data. Non-existing objects will be skipped.
If you have composite primary keys you should pass dictionary objects in id field, like this:
await service.m_update([{'id': 1, 'key': 'abc'}, ...], ...)
- property permission_modify: str¶
Get a modify permission key.
- property permission_view: str¶
Get a view permission key.
- static prepare_insert_data(data: dict)¶
Define your custom row init logic here.
- static prepare_update_data(data: dict)¶
You may define your custom row update logic here.
- select_columns = None¶
you can specify a whitelist of output columns here
- system_user() bool¶
Check if user session has the system scope.
- async update(id: _Id, data: dict, columns: Collection[str] | Literal['*'] | None = '*', _connection=None) _Row¶
Update a single object. Raises error if object doesn’t exist.
If you have composite primary keys you should pass dictionary objects in id field, like this:
await service.update({'id': 1, 'key': 'abc'}, ...)
- static update_condition_hook(sql)¶
Set up specific update conditions.
- class TaskManager[source]¶
Bases:
ContextableService,PublicInterfaceTask manager schedules tasks execution.
- service_name = 'taskman'¶
you may define a custom service name here
- __init__(app, stream_client: StreamRPCClient = None, database_service: DatabaseService = None, redis_transport: RedisTransportService = None, notification_service: NotificationService = None, scheduler_service: Scheduler = None, executor_topic: str = Topic.EXECUTOR, refresh_rate: int = 1, suspended_task_lifetime_hours: int = 24, logger=None)[source]¶
Initialize.
- Parameters:
app – web app
database_service – database connector instance or service name
stream_client – stream client to the executor
redis_transport – a cache for executor states :param notification_service: notification service instance or service name
scheduler_service – internal loop scheduler
refresh_rate – watcher loop refresh rate in seconds
executor_topic – optional topic name for executor
suspended_task_lifetime_hours – if task was last queued before this interval, it won’t be executed again
logger – optional logger
- async list_active_executors() dict[source]¶
Return executor ids and their last ping time.
RPC method: manager.list_active_executors
- async ping(executor_id: UUID) None[source]¶
Receive ping from an executor.
RPC method: manager.ping
This method is used by executors to tell the manager that they are alive.
- Parameters:
executor_id – executor instance app id
- async suspend_executor(executor_id: UUID) None[source]¶
Suspend an executor and its tasks.
RPC method: manager.suspend_executor
This method is used by an executor when it’s about to exit.
- Parameters:
executor_id – executor instance app id
- async execute_stage(task_id: str, executor_id: UUID, stage: int, timestamp: int) None[source]¶
Accept an executor report on stage execution.
RPC method: manager.execute_stage
This method is used by an executor before each stage to notify the manager which stage is being executed.
- Parameters:
task_id – task id
executor_id – executor instance app id
stage – current stage
timestamp – executor UNIX time
- async wait_for_timer(task_id: str, executor_id: UUID, stage: int, timer: int, timestamp: int) None[source]¶
Require a task to wait for a message to continue.
RPC method: manager.wait_for_timer
The task will continue after certain timeout. This method is used by an executor when it encounters a
Timerspecial command.- Parameters:
task_id – task id
executor_id – executor instance app id
stage – current stage
timer – timer value in seconds
timestamp – executor UNIX time
- async wait_for_message(task_id: str, executor_id: UUID, stage: int, timestamp: int) None[source]¶
Require a task to wait for a message to continue.
RPC method: manager.wait_for_message
The task will continue when a message to this task is received by the TaskService. This method is used by an executor when it encounters a
Messagespecial command.- Parameters:
task_id – task id
executor_id – executor instance app id
stage – current stage
timestamp – executor UNIX time
- async write_stage(task_id: str, executor_id: UUID, stage: int, stages: int, result: Any, error: bool, timestamp: int) None[source]¶
Write stage result to the task table.
RPC method: manager.write_stage
This method is used by executors to return each stage results to the manager. If it’s the last stage or there was an error, the task will be finished and notification may be created.
- Parameters:
task_id – task id
executor_id – executor instance app id
stage – current stage
stages – total number of stages
result – result value
error – result is an error
timestamp – executor UNIX time
- class TaskExecutor[source]¶
Bases:
ContextableService,PublicInterfaceTask executor receives and processes tasks from a manager.
Executor can be of different app than the manager. Executors do not require access to the tasks table or task / notification services but do require configured RPC stream to the manager. They also require an executor stream listener with exposed TaskExecutor.run_task method.
- __init__(app, stream_client: StreamRPCClient = None, manager_app: str = None, manager_topic: str = Topic.MANAGER, rpc_service: JSONRPCServer = None, scheduler: Scheduler = None, data: dict = None, logger=None)[source]¶
Initialize.
- Parameters:
app – web app
manager_app – manager app name (by default: same as executor)
manager_topic – manager topic name
rpc_service – local rpc server name or instance
scheduler – local scheduler
stream_client – stream client to the manager
data – additional data for command templates
logger – optional logger instance
- async run_task(data: ExecutorTask) None[source]¶
Run a task locally.
RPC method: executor.run_task
This method is used by the task manager to send tasks to executors and should be exposed as public in the executor RPC stream.
An executor runs a task stage by stage in a local RPC server with correlation_id equals to the task job_id. An executor sends execute_stage message to the manager before each stage and write_stage with stage results after it’s completed.
The method returns nothing because all task data is sent in separate stream requests.
- Parameters:
data – task partial data