# Generated by the protocol buffer compiler. DO NOT EDIT!
# sources: ibc/applications/interchain_accounts/controller/v1/controller.proto, ibc/applications/interchain_accounts/controller/v1/query.proto, ibc/applications/interchain_accounts/controller/v1/tx.proto
# plugin: python-betterproto
# This file has been @generated
from dataclasses import dataclass
from typing import (
TYPE_CHECKING,
Dict,
Optional,
)
import betterproto
import grpclib
from betterproto.grpc.grpclib_server import ServiceBase
from .....core.channel import v1 as ____core_channel_v1__
from ... import v1 as __v1__
if TYPE_CHECKING:
import grpclib.server
from betterproto.grpc.grpclib_client import MetadataLike
from grpclib.metadata import Deadline
[docs]
@dataclass(eq=False, repr=False)
class Params(betterproto.Message):
"""
Params defines the set of on-chain interchain accounts parameters.
The following parameters may be used to disable the controller submodule.
"""
controller_enabled: bool = betterproto.bool_field(1)
"""controller_enabled enables or disables the controller submodule."""
[docs]
@dataclass(eq=False, repr=False)
class QueryInterchainAccountRequest(betterproto.Message):
"""
QueryInterchainAccountRequest is the request type for the Query/InterchainAccount
RPC method.
"""
owner: str = betterproto.string_field(1)
connection_id: str = betterproto.string_field(2)
[docs]
@dataclass(eq=False, repr=False)
class QueryInterchainAccountResponse(betterproto.Message):
"""
QueryInterchainAccountResponse the response type for the Query/InterchainAccount RPC
method.
"""
address: str = betterproto.string_field(1)
[docs]
@dataclass(eq=False, repr=False)
class QueryParamsRequest(betterproto.Message):
"""QueryParamsRequest is the request type for the Query/Params RPC method."""
pass
[docs]
@dataclass(eq=False, repr=False)
class QueryParamsResponse(betterproto.Message):
"""QueryParamsResponse is the response type for the Query/Params RPC method."""
params: "Params" = betterproto.message_field(1)
"""params defines the parameters of the module."""
[docs]
@dataclass(eq=False, repr=False)
class MsgRegisterInterchainAccount(betterproto.Message):
"""MsgRegisterInterchainAccount defines the payload for Msg/RegisterAccount"""
owner: str = betterproto.string_field(1)
connection_id: str = betterproto.string_field(2)
version: str = betterproto.string_field(3)
ordering: "____core_channel_v1__.Order" = betterproto.enum_field(4)
[docs]
@dataclass(eq=False, repr=False)
class MsgRegisterInterchainAccountResponse(betterproto.Message):
"""
MsgRegisterInterchainAccountResponse defines the response for Msg/RegisterAccount
"""
channel_id: str = betterproto.string_field(1)
port_id: str = betterproto.string_field(2)
[docs]
@dataclass(eq=False, repr=False)
class MsgSendTx(betterproto.Message):
"""MsgSendTx defines the payload for Msg/SendTx"""
owner: str = betterproto.string_field(1)
connection_id: str = betterproto.string_field(2)
packet_data: "__v1__.InterchainAccountPacketData" = betterproto.message_field(3)
relative_timeout: int = betterproto.uint64_field(4)
"""
Relative timeout timestamp provided will be added to the current block time during
transaction execution.
The timeout timestamp must be non-zero.
"""
[docs]
@dataclass(eq=False, repr=False)
class MsgSendTxResponse(betterproto.Message):
"""MsgSendTxResponse defines the response for MsgSendTx"""
sequence: int = betterproto.uint64_field(1)
[docs]
@dataclass(eq=False, repr=False)
class MsgUpdateParams(betterproto.Message):
"""MsgUpdateParams defines the payload for Msg/UpdateParams"""
signer: str = betterproto.string_field(1)
"""signer address"""
params: "Params" = betterproto.message_field(2)
"""
params defines the 27-interchain-accounts/controller parameters to update.
NOTE: All parameters must be supplied.
"""
[docs]
@dataclass(eq=False, repr=False)
class MsgUpdateParamsResponse(betterproto.Message):
"""MsgUpdateParamsResponse defines the response for Msg/UpdateParams"""
pass
[docs]
class QueryStub(betterproto.ServiceStub):
[docs]
async def interchain_account(
self,
query_interchain_account_request: "QueryInterchainAccountRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "QueryInterchainAccountResponse":
return await self._unary_unary(
"/ibc.applications.interchain_accounts.controller.v1.Query/InterchainAccount",
query_interchain_account_request,
QueryInterchainAccountResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
[docs]
async def params(
self,
query_params_request: "QueryParamsRequest",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "QueryParamsResponse":
return await self._unary_unary(
"/ibc.applications.interchain_accounts.controller.v1.Query/Params",
query_params_request,
QueryParamsResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
[docs]
class MsgStub(betterproto.ServiceStub):
[docs]
async def register_interchain_account(
self,
msg_register_interchain_account: "MsgRegisterInterchainAccount",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "MsgRegisterInterchainAccountResponse":
return await self._unary_unary(
"/ibc.applications.interchain_accounts.controller.v1.Msg/RegisterInterchainAccount",
msg_register_interchain_account,
MsgRegisterInterchainAccountResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
[docs]
async def send_tx(
self,
msg_send_tx: "MsgSendTx",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "MsgSendTxResponse":
return await self._unary_unary(
"/ibc.applications.interchain_accounts.controller.v1.Msg/SendTx",
msg_send_tx,
MsgSendTxResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
[docs]
async def update_params(
self,
msg_update_params: "MsgUpdateParams",
*,
timeout: Optional[float] = None,
deadline: Optional["Deadline"] = None,
metadata: Optional["MetadataLike"] = None,
) -> "MsgUpdateParamsResponse":
return await self._unary_unary(
"/ibc.applications.interchain_accounts.controller.v1.Msg/UpdateParams",
msg_update_params,
MsgUpdateParamsResponse,
timeout=timeout,
deadline=deadline,
metadata=metadata,
)
[docs]
class QueryBase(ServiceBase):
[docs]
async def interchain_account(
self, query_interchain_account_request: "QueryInterchainAccountRequest"
) -> "QueryInterchainAccountResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def params(
self, query_params_request: "QueryParamsRequest"
) -> "QueryParamsResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_interchain_account(
self,
stream: "grpclib.server.Stream[QueryInterchainAccountRequest, QueryInterchainAccountResponse]",
) -> None:
request = await stream.recv_message()
response = await self.interchain_account(request)
await stream.send_message(response)
async def __rpc_params(
self, stream: "grpclib.server.Stream[QueryParamsRequest, QueryParamsResponse]"
) -> None:
request = await stream.recv_message()
response = await self.params(request)
await stream.send_message(response)
def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
return {
"/ibc.applications.interchain_accounts.controller.v1.Query/InterchainAccount": grpclib.const.Handler(
self.__rpc_interchain_account,
grpclib.const.Cardinality.UNARY_UNARY,
QueryInterchainAccountRequest,
QueryInterchainAccountResponse,
),
"/ibc.applications.interchain_accounts.controller.v1.Query/Params": grpclib.const.Handler(
self.__rpc_params,
grpclib.const.Cardinality.UNARY_UNARY,
QueryParamsRequest,
QueryParamsResponse,
),
}
[docs]
class MsgBase(ServiceBase):
[docs]
async def register_interchain_account(
self, msg_register_interchain_account: "MsgRegisterInterchainAccount"
) -> "MsgRegisterInterchainAccountResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def send_tx(self, msg_send_tx: "MsgSendTx") -> "MsgSendTxResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs]
async def update_params(
self, msg_update_params: "MsgUpdateParams"
) -> "MsgUpdateParamsResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_register_interchain_account(
self,
stream: "grpclib.server.Stream[MsgRegisterInterchainAccount, MsgRegisterInterchainAccountResponse]",
) -> None:
request = await stream.recv_message()
response = await self.register_interchain_account(request)
await stream.send_message(response)
async def __rpc_send_tx(
self, stream: "grpclib.server.Stream[MsgSendTx, MsgSendTxResponse]"
) -> None:
request = await stream.recv_message()
response = await self.send_tx(request)
await stream.send_message(response)
async def __rpc_update_params(
self, stream: "grpclib.server.Stream[MsgUpdateParams, MsgUpdateParamsResponse]"
) -> None:
request = await stream.recv_message()
response = await self.update_params(request)
await stream.send_message(response)
def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
return {
"/ibc.applications.interchain_accounts.controller.v1.Msg/RegisterInterchainAccount": grpclib.const.Handler(
self.__rpc_register_interchain_account,
grpclib.const.Cardinality.UNARY_UNARY,
MsgRegisterInterchainAccount,
MsgRegisterInterchainAccountResponse,
),
"/ibc.applications.interchain_accounts.controller.v1.Msg/SendTx": grpclib.const.Handler(
self.__rpc_send_tx,
grpclib.const.Cardinality.UNARY_UNARY,
MsgSendTx,
MsgSendTxResponse,
),
"/ibc.applications.interchain_accounts.controller.v1.Msg/UpdateParams": grpclib.const.Handler(
self.__rpc_update_params,
grpclib.const.Cardinality.UNARY_UNARY,
MsgUpdateParams,
MsgUpdateParamsResponse,
),
}