Source code for evmos.proto.autogen.py.ibc.applications.interchain_accounts.host.v1

# Generated by the protocol buffer compiler.  DO NOT EDIT!
# sources: ibc/applications/interchain_accounts/host/v1/host.proto, ibc/applications/interchain_accounts/host/v1/query.proto, ibc/applications/interchain_accounts/host/v1/tx.proto
# plugin: python-betterproto
# This file has been @generated

from dataclasses import dataclass
from typing import (
    TYPE_CHECKING,
    Dict,
    List,
    Optional,
)

import betterproto
import grpclib
from betterproto.grpc.grpclib_server import ServiceBase


if TYPE_CHECKING:
    import grpclib.server
    from betterproto.grpc.grpclib_client import MetadataLike
    from grpclib.metadata import Deadline


[docs] @dataclass(eq=False, repr=False) class Params(betterproto.Message): """ Params defines the set of on-chain interchain accounts parameters. The following parameters may be used to disable the host submodule. """ host_enabled: bool = betterproto.bool_field(1) """host_enabled enables or disables the host submodule.""" allow_messages: List[str] = betterproto.string_field(2) """ allow_messages defines a list of sdk message typeURLs allowed to be executed on a host chain. """
[docs] @dataclass(eq=False, repr=False) class QueryRequest(betterproto.Message): """ QueryRequest defines the parameters for a particular query request by an interchain account. """ path: str = betterproto.string_field(1) """ path defines the path of the query request as defined by ADR-021. https://github.com/cosmos/cosmos-sdk/blob/main/docs/architecture/adr-021-protobuf-query-encoding.md#custom-query-registration-and-routing """ data: bytes = betterproto.bytes_field(2) """ data defines the payload of the query request as defined by ADR-021. https://github.com/cosmos/cosmos-sdk/blob/main/docs/architecture/adr-021-protobuf-query-encoding.md#custom-query-registration-and-routing """
[docs] @dataclass(eq=False, repr=False) class QueryParamsRequest(betterproto.Message): """QueryParamsRequest is the request type for the Query/Params RPC method.""" pass
[docs] @dataclass(eq=False, repr=False) class QueryParamsResponse(betterproto.Message): """QueryParamsResponse is the response type for the Query/Params RPC method.""" params: "Params" = betterproto.message_field(1) """params defines the parameters of the module."""
[docs] @dataclass(eq=False, repr=False) class MsgUpdateParams(betterproto.Message): """MsgUpdateParams defines the payload for Msg/UpdateParams""" signer: str = betterproto.string_field(1) """signer address""" params: "Params" = betterproto.message_field(2) """ params defines the 27-interchain-accounts/host parameters to update. NOTE: All parameters must be supplied. """
[docs] @dataclass(eq=False, repr=False) class MsgUpdateParamsResponse(betterproto.Message): """MsgUpdateParamsResponse defines the response for Msg/UpdateParams""" pass
[docs] @dataclass(eq=False, repr=False) class MsgModuleQuerySafe(betterproto.Message): """MsgModuleQuerySafe defines the payload for Msg/ModuleQuerySafe""" signer: str = betterproto.string_field(1) """signer address""" requests: List["QueryRequest"] = betterproto.message_field(2) """requests defines the module safe queries to execute."""
[docs] @dataclass(eq=False, repr=False) class MsgModuleQuerySafeResponse(betterproto.Message): """MsgModuleQuerySafeResponse defines the response for Msg/ModuleQuerySafe""" height: int = betterproto.uint64_field(1) """height at which the responses were queried""" responses: List[bytes] = betterproto.bytes_field(2) """protobuf encoded responses for each query"""
[docs] class QueryStub(betterproto.ServiceStub):
[docs] async def params( self, query_params_request: "QueryParamsRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "QueryParamsResponse": return await self._unary_unary( "/ibc.applications.interchain_accounts.host.v1.Query/Params", query_params_request, QueryParamsResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] class MsgStub(betterproto.ServiceStub):
[docs] async def update_params( self, msg_update_params: "MsgUpdateParams", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MsgUpdateParamsResponse": return await self._unary_unary( "/ibc.applications.interchain_accounts.host.v1.Msg/UpdateParams", msg_update_params, MsgUpdateParamsResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def module_query_safe( self, msg_module_query_safe: "MsgModuleQuerySafe", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MsgModuleQuerySafeResponse": return await self._unary_unary( "/ibc.applications.interchain_accounts.host.v1.Msg/ModuleQuerySafe", msg_module_query_safe, MsgModuleQuerySafeResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] class QueryBase(ServiceBase):
[docs] async def params( self, query_params_request: "QueryParamsRequest" ) -> "QueryParamsResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_params( self, stream: "grpclib.server.Stream[QueryParamsRequest, QueryParamsResponse]" ) -> None: request = await stream.recv_message() response = await self.params(request) await stream.send_message(response) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/ibc.applications.interchain_accounts.host.v1.Query/Params": grpclib.const.Handler( self.__rpc_params, grpclib.const.Cardinality.UNARY_UNARY, QueryParamsRequest, QueryParamsResponse, ), }
[docs] class MsgBase(ServiceBase):
[docs] async def update_params( self, msg_update_params: "MsgUpdateParams" ) -> "MsgUpdateParamsResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def module_query_safe( self, msg_module_query_safe: "MsgModuleQuerySafe" ) -> "MsgModuleQuerySafeResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_update_params( self, stream: "grpclib.server.Stream[MsgUpdateParams, MsgUpdateParamsResponse]" ) -> None: request = await stream.recv_message() response = await self.update_params(request) await stream.send_message(response) async def __rpc_module_query_safe( self, stream: "grpclib.server.Stream[MsgModuleQuerySafe, MsgModuleQuerySafeResponse]", ) -> None: request = await stream.recv_message() response = await self.module_query_safe(request) await stream.send_message(response) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/ibc.applications.interchain_accounts.host.v1.Msg/UpdateParams": grpclib.const.Handler( self.__rpc_update_params, grpclib.const.Cardinality.UNARY_UNARY, MsgUpdateParams, MsgUpdateParamsResponse, ), "/ibc.applications.interchain_accounts.host.v1.Msg/ModuleQuerySafe": grpclib.const.Handler( self.__rpc_module_query_safe, grpclib.const.Cardinality.UNARY_UNARY, MsgModuleQuerySafe, MsgModuleQuerySafeResponse, ), }