Source code for evmos.proto.autogen.py.ibc.applications.interchain_accounts.controller.v1

# Generated by the protocol buffer compiler.  DO NOT EDIT!
# sources: ibc/applications/interchain_accounts/controller/v1/controller.proto, ibc/applications/interchain_accounts/controller/v1/query.proto, ibc/applications/interchain_accounts/controller/v1/tx.proto
# plugin: python-betterproto
# This file has been @generated

from dataclasses import dataclass
from typing import (
    TYPE_CHECKING,
    Dict,
    Optional,
)

import betterproto
import grpclib
from betterproto.grpc.grpclib_server import ServiceBase

from .....core.channel import v1 as ____core_channel_v1__
from ... import v1 as __v1__


if TYPE_CHECKING:
    import grpclib.server
    from betterproto.grpc.grpclib_client import MetadataLike
    from grpclib.metadata import Deadline


[docs] @dataclass(eq=False, repr=False) class Params(betterproto.Message): """ Params defines the set of on-chain interchain accounts parameters. The following parameters may be used to disable the controller submodule. """ controller_enabled: bool = betterproto.bool_field(1) """controller_enabled enables or disables the controller submodule."""
[docs] @dataclass(eq=False, repr=False) class QueryInterchainAccountRequest(betterproto.Message): """ QueryInterchainAccountRequest is the request type for the Query/InterchainAccount RPC method. """ owner: str = betterproto.string_field(1) connection_id: str = betterproto.string_field(2)
[docs] @dataclass(eq=False, repr=False) class QueryInterchainAccountResponse(betterproto.Message): """ QueryInterchainAccountResponse the response type for the Query/InterchainAccount RPC method. """ address: str = betterproto.string_field(1)
[docs] @dataclass(eq=False, repr=False) class QueryParamsRequest(betterproto.Message): """QueryParamsRequest is the request type for the Query/Params RPC method.""" pass
[docs] @dataclass(eq=False, repr=False) class QueryParamsResponse(betterproto.Message): """QueryParamsResponse is the response type for the Query/Params RPC method.""" params: "Params" = betterproto.message_field(1) """params defines the parameters of the module."""
[docs] @dataclass(eq=False, repr=False) class MsgRegisterInterchainAccount(betterproto.Message): """MsgRegisterInterchainAccount defines the payload for Msg/RegisterAccount""" owner: str = betterproto.string_field(1) connection_id: str = betterproto.string_field(2) version: str = betterproto.string_field(3) ordering: "____core_channel_v1__.Order" = betterproto.enum_field(4)
[docs] @dataclass(eq=False, repr=False) class MsgRegisterInterchainAccountResponse(betterproto.Message): """ MsgRegisterInterchainAccountResponse defines the response for Msg/RegisterAccount """ channel_id: str = betterproto.string_field(1) port_id: str = betterproto.string_field(2)
[docs] @dataclass(eq=False, repr=False) class MsgSendTx(betterproto.Message): """MsgSendTx defines the payload for Msg/SendTx""" owner: str = betterproto.string_field(1) connection_id: str = betterproto.string_field(2) packet_data: "__v1__.InterchainAccountPacketData" = betterproto.message_field(3) relative_timeout: int = betterproto.uint64_field(4) """ Relative timeout timestamp provided will be added to the current block time during transaction execution. The timeout timestamp must be non-zero. """
[docs] @dataclass(eq=False, repr=False) class MsgSendTxResponse(betterproto.Message): """MsgSendTxResponse defines the response for MsgSendTx""" sequence: int = betterproto.uint64_field(1)
[docs] @dataclass(eq=False, repr=False) class MsgUpdateParams(betterproto.Message): """MsgUpdateParams defines the payload for Msg/UpdateParams""" signer: str = betterproto.string_field(1) """signer address""" params: "Params" = betterproto.message_field(2) """ params defines the 27-interchain-accounts/controller parameters to update. NOTE: All parameters must be supplied. """
[docs] @dataclass(eq=False, repr=False) class MsgUpdateParamsResponse(betterproto.Message): """MsgUpdateParamsResponse defines the response for Msg/UpdateParams""" pass
[docs] class QueryStub(betterproto.ServiceStub):
[docs] async def interchain_account( self, query_interchain_account_request: "QueryInterchainAccountRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "QueryInterchainAccountResponse": return await self._unary_unary( "/ibc.applications.interchain_accounts.controller.v1.Query/InterchainAccount", query_interchain_account_request, QueryInterchainAccountResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def params( self, query_params_request: "QueryParamsRequest", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "QueryParamsResponse": return await self._unary_unary( "/ibc.applications.interchain_accounts.controller.v1.Query/Params", query_params_request, QueryParamsResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] class MsgStub(betterproto.ServiceStub):
[docs] async def register_interchain_account( self, msg_register_interchain_account: "MsgRegisterInterchainAccount", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MsgRegisterInterchainAccountResponse": return await self._unary_unary( "/ibc.applications.interchain_accounts.controller.v1.Msg/RegisterInterchainAccount", msg_register_interchain_account, MsgRegisterInterchainAccountResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def send_tx( self, msg_send_tx: "MsgSendTx", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MsgSendTxResponse": return await self._unary_unary( "/ibc.applications.interchain_accounts.controller.v1.Msg/SendTx", msg_send_tx, MsgSendTxResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def update_params( self, msg_update_params: "MsgUpdateParams", *, timeout: Optional[float] = None, deadline: Optional["Deadline"] = None, metadata: Optional["MetadataLike"] = None, ) -> "MsgUpdateParamsResponse": return await self._unary_unary( "/ibc.applications.interchain_accounts.controller.v1.Msg/UpdateParams", msg_update_params, MsgUpdateParamsResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] class QueryBase(ServiceBase):
[docs] async def interchain_account( self, query_interchain_account_request: "QueryInterchainAccountRequest" ) -> "QueryInterchainAccountResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def params( self, query_params_request: "QueryParamsRequest" ) -> "QueryParamsResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_interchain_account( self, stream: "grpclib.server.Stream[QueryInterchainAccountRequest, QueryInterchainAccountResponse]", ) -> None: request = await stream.recv_message() response = await self.interchain_account(request) await stream.send_message(response) async def __rpc_params( self, stream: "grpclib.server.Stream[QueryParamsRequest, QueryParamsResponse]" ) -> None: request = await stream.recv_message() response = await self.params(request) await stream.send_message(response) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/ibc.applications.interchain_accounts.controller.v1.Query/InterchainAccount": grpclib.const.Handler( self.__rpc_interchain_account, grpclib.const.Cardinality.UNARY_UNARY, QueryInterchainAccountRequest, QueryInterchainAccountResponse, ), "/ibc.applications.interchain_accounts.controller.v1.Query/Params": grpclib.const.Handler( self.__rpc_params, grpclib.const.Cardinality.UNARY_UNARY, QueryParamsRequest, QueryParamsResponse, ), }
[docs] class MsgBase(ServiceBase):
[docs] async def register_interchain_account( self, msg_register_interchain_account: "MsgRegisterInterchainAccount" ) -> "MsgRegisterInterchainAccountResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def send_tx(self, msg_send_tx: "MsgSendTx") -> "MsgSendTxResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def update_params( self, msg_update_params: "MsgUpdateParams" ) -> "MsgUpdateParamsResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_register_interchain_account( self, stream: "grpclib.server.Stream[MsgRegisterInterchainAccount, MsgRegisterInterchainAccountResponse]", ) -> None: request = await stream.recv_message() response = await self.register_interchain_account(request) await stream.send_message(response) async def __rpc_send_tx( self, stream: "grpclib.server.Stream[MsgSendTx, MsgSendTxResponse]" ) -> None: request = await stream.recv_message() response = await self.send_tx(request) await stream.send_message(response) async def __rpc_update_params( self, stream: "grpclib.server.Stream[MsgUpdateParams, MsgUpdateParamsResponse]" ) -> None: request = await stream.recv_message() response = await self.update_params(request) await stream.send_message(response) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/ibc.applications.interchain_accounts.controller.v1.Msg/RegisterInterchainAccount": grpclib.const.Handler( self.__rpc_register_interchain_account, grpclib.const.Cardinality.UNARY_UNARY, MsgRegisterInterchainAccount, MsgRegisterInterchainAccountResponse, ), "/ibc.applications.interchain_accounts.controller.v1.Msg/SendTx": grpclib.const.Handler( self.__rpc_send_tx, grpclib.const.Cardinality.UNARY_UNARY, MsgSendTx, MsgSendTxResponse, ), "/ibc.applications.interchain_accounts.controller.v1.Msg/UpdateParams": grpclib.const.Handler( self.__rpc_update_params, grpclib.const.Cardinality.UNARY_UNARY, MsgUpdateParams, MsgUpdateParamsResponse, ), }