Source code for evmos.proto.autogen.py.cosmos.authz.v1beta1

# Generated by the protocol buffer compiler.  DO NOT EDIT!
# sources: cosmos/authz/v1beta1/authz.proto, cosmos/authz/v1beta1/event.proto, cosmos/authz/v1beta1/genesis.proto, cosmos/authz/v1beta1/query.proto, cosmos/authz/v1beta1/tx.proto
# plugin: python-betterproto
# This file has been @generated
from dataclasses import dataclass
from datetime import datetime
from typing import (
    TYPE_CHECKING,
    Dict,
    List,
    Optional,
)

import betterproto
import betterproto.lib.google.protobuf as betterproto_lib_google_protobuf
import grpclib
from betterproto.grpc.grpclib_server import ServiceBase

from ...base.query import v1beta1 as __base_query_v1_beta1__


if TYPE_CHECKING:
    import grpclib.server
    from betterproto.grpc.grpclib_client import MetadataLike
    from grpclib.metadata import Deadline


[docs]@dataclass(eq=False, repr=False) class EventGrant(betterproto.Message): """EventGrant is emitted on Msg/Grant""" msg_type_url: str = betterproto.string_field(2) """Msg type URL for which an autorization is granted""" granter: str = betterproto.string_field(3) """Granter account address""" grantee: str = betterproto.string_field(4) """Grantee account address"""
[docs]@dataclass(eq=False, repr=False) class EventRevoke(betterproto.Message): """EventRevoke is emitted on Msg/Revoke""" msg_type_url: str = betterproto.string_field(2) """Msg type URL for which an autorization is revoked""" granter: str = betterproto.string_field(3) """Granter account address""" grantee: str = betterproto.string_field(4) """Grantee account address"""
[docs]@dataclass(eq=False, repr=False) class GenericAuthorization(betterproto.Message): """ GenericAuthorization gives the grantee unrestricted permissions to execute the provided method on behalf of the granter's account. """ msg: str = betterproto.string_field(1) """ Msg, identified by it's type URL, to grant unrestricted permissions to execute """
[docs]@dataclass(eq=False, repr=False) class Grant(betterproto.Message): """ Grant gives permissions to execute the provide method with expiration time. """ authorization: 'betterproto_lib_google_protobuf.Any' = betterproto.message_field(1) expiration: datetime = betterproto.message_field(2)
[docs]@dataclass(eq=False, repr=False) class GrantAuthorization(betterproto.Message): """ GrantAuthorization extends a grant with both the addresses of the grantee and granter. It is used in genesis.proto and query.proto Since: cosmos-sdk 0.45.2 """ granter: str = betterproto.string_field(1) grantee: str = betterproto.string_field(2) authorization: 'betterproto_lib_google_protobuf.Any' = betterproto.message_field(3) expiration: datetime = betterproto.message_field(4)
[docs]@dataclass(eq=False, repr=False) class QueryGrantsRequest(betterproto.Message): """QueryGrantsRequest is the request type for the Query/Grants RPC method.""" granter: str = betterproto.string_field(1) grantee: str = betterproto.string_field(2) msg_type_url: str = betterproto.string_field(3) """ Optional, msg_type_url, when set, will query only grants matching given msg type. """ pagination: '__base_query_v1_beta1__.PageRequest' = betterproto.message_field(4) """pagination defines an pagination for the request."""
[docs]@dataclass(eq=False, repr=False) class QueryGrantsResponse(betterproto.Message): """ QueryGrantsResponse is the response type for the Query/Authorizations RPC method. """ grants: List['Grant'] = betterproto.message_field(1) """authorizations is a list of grants granted for grantee by granter.""" pagination: '__base_query_v1_beta1__.PageResponse' = betterproto.message_field(2) """pagination defines an pagination for the response."""
[docs]@dataclass(eq=False, repr=False) class QueryGranterGrantsRequest(betterproto.Message): """ QueryGranterGrantsRequest is the request type for the Query/GranterGrants RPC method. """ granter: str = betterproto.string_field(1) pagination: '__base_query_v1_beta1__.PageRequest' = betterproto.message_field(2) """pagination defines an pagination for the request."""
[docs]@dataclass(eq=False, repr=False) class QueryGranterGrantsResponse(betterproto.Message): """ QueryGranterGrantsResponse is the response type for the Query/GranterGrants RPC method. """ grants: List['GrantAuthorization'] = betterproto.message_field(1) """grants is a list of grants granted by the granter.""" pagination: '__base_query_v1_beta1__.PageResponse' = betterproto.message_field(2) """pagination defines an pagination for the response."""
[docs]@dataclass(eq=False, repr=False) class QueryGranteeGrantsRequest(betterproto.Message): """ QueryGranteeGrantsRequest is the request type for the Query/IssuedGrants RPC method. """ grantee: str = betterproto.string_field(1) pagination: '__base_query_v1_beta1__.PageRequest' = betterproto.message_field(2) """pagination defines an pagination for the request."""
[docs]@dataclass(eq=False, repr=False) class QueryGranteeGrantsResponse(betterproto.Message): """ QueryGranteeGrantsResponse is the response type for the Query/GranteeGrants RPC method. """ grants: List['GrantAuthorization'] = betterproto.message_field(1) """grants is a list of grants granted to the grantee.""" pagination: '__base_query_v1_beta1__.PageResponse' = betterproto.message_field(2) """pagination defines an pagination for the response."""
[docs]@dataclass(eq=False, repr=False) class GenesisState(betterproto.Message): """GenesisState defines the authz module's genesis state.""" authorization: List['GrantAuthorization'] = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False) class MsgGrant(betterproto.Message): """ MsgGrant is a request type for Grant method. It declares authorization to the grantee on behalf of the granter with the provided expiration time. """ granter: str = betterproto.string_field(1) grantee: str = betterproto.string_field(2) grant: 'Grant' = betterproto.message_field(3)
[docs]@dataclass(eq=False, repr=False) class MsgExecResponse(betterproto.Message): """MsgExecResponse defines the Msg/MsgExecResponse response type.""" results: List[bytes] = betterproto.bytes_field(1)
[docs]@dataclass(eq=False, repr=False) class MsgExec(betterproto.Message): """ MsgExec attempts to execute the provided messages using authorizations granted to the grantee. Each message should have only one signer corresponding to the granter of the authorization. """ grantee: str = betterproto.string_field(1) msgs: List['betterproto_lib_google_protobuf.Any'] = betterproto.message_field(2) """ Authorization Msg requests to execute. Each msg must implement Authorization interface The x/authz will try to find a grant matching (msg.signers[0], grantee, MsgTypeURL(msg)) triple and validate it. """
[docs]@dataclass(eq=False, repr=False) class MsgGrantResponse(betterproto.Message): """MsgGrantResponse defines the Msg/MsgGrant response type.""" pass
[docs]@dataclass(eq=False, repr=False) class MsgRevoke(betterproto.Message): """ MsgRevoke revokes any authorization with the provided sdk.Msg type on the granter's account with that has been granted to the grantee. """ granter: str = betterproto.string_field(1) grantee: str = betterproto.string_field(2) msg_type_url: str = betterproto.string_field(3)
[docs]@dataclass(eq=False, repr=False) class MsgRevokeResponse(betterproto.Message): """MsgRevokeResponse defines the Msg/MsgRevokeResponse response type.""" pass
[docs]class QueryStub(betterproto.ServiceStub):
[docs] async def grants( self, query_grants_request: 'QueryGrantsRequest', *, timeout: Optional[float] = None, deadline: Optional['Deadline'] = None, metadata: Optional['MetadataLike'] = None ) -> 'QueryGrantsResponse': return await self._unary_unary( '/cosmos.authz.v1beta1.Query/Grants', query_grants_request, QueryGrantsResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def granter_grants( self, query_granter_grants_request: 'QueryGranterGrantsRequest', *, timeout: Optional[float] = None, deadline: Optional['Deadline'] = None, metadata: Optional['MetadataLike'] = None ) -> 'QueryGranterGrantsResponse': return await self._unary_unary( '/cosmos.authz.v1beta1.Query/GranterGrants', query_granter_grants_request, QueryGranterGrantsResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def grantee_grants( self, query_grantee_grants_request: 'QueryGranteeGrantsRequest', *, timeout: Optional[float] = None, deadline: Optional['Deadline'] = None, metadata: Optional['MetadataLike'] = None ) -> 'QueryGranteeGrantsResponse': return await self._unary_unary( '/cosmos.authz.v1beta1.Query/GranteeGrants', query_grantee_grants_request, QueryGranteeGrantsResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs]class MsgStub(betterproto.ServiceStub):
[docs] async def grant( self, msg_grant: 'MsgGrant', *, timeout: Optional[float] = None, deadline: Optional['Deadline'] = None, metadata: Optional['MetadataLike'] = None ) -> 'MsgGrantResponse': return await self._unary_unary( '/cosmos.authz.v1beta1.Msg/Grant', msg_grant, MsgGrantResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def exec( self, msg_exec: 'MsgExec', *, timeout: Optional[float] = None, deadline: Optional['Deadline'] = None, metadata: Optional['MetadataLike'] = None ) -> 'MsgExecResponse': return await self._unary_unary( '/cosmos.authz.v1beta1.Msg/Exec', msg_exec, MsgExecResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs] async def revoke( self, msg_revoke: 'MsgRevoke', *, timeout: Optional[float] = None, deadline: Optional['Deadline'] = None, metadata: Optional['MetadataLike'] = None ) -> 'MsgRevokeResponse': return await self._unary_unary( '/cosmos.authz.v1beta1.Msg/Revoke', msg_revoke, MsgRevokeResponse, timeout=timeout, deadline=deadline, metadata=metadata, )
[docs]class QueryBase(ServiceBase):
[docs] async def grants( self, query_grants_request: 'QueryGrantsRequest' ) -> 'QueryGrantsResponse': raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def granter_grants( self, query_granter_grants_request: 'QueryGranterGrantsRequest' ) -> 'QueryGranterGrantsResponse': raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def grantee_grants( self, query_grantee_grants_request: 'QueryGranteeGrantsRequest' ) -> 'QueryGranteeGrantsResponse': raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_grants( self, stream: 'grpclib.server.Stream[QueryGrantsRequest, QueryGrantsResponse]' ) -> None: request = await stream.recv_message() response = await self.grants(request) await stream.send_message(response) async def __rpc_granter_grants( self, stream: 'grpclib.server.Stream[QueryGranterGrantsRequest, QueryGranterGrantsResponse]', ) -> None: request = await stream.recv_message() response = await self.granter_grants(request) await stream.send_message(response) async def __rpc_grantee_grants( self, stream: 'grpclib.server.Stream[QueryGranteeGrantsRequest, QueryGranteeGrantsResponse]', ) -> None: request = await stream.recv_message() response = await self.grantee_grants(request) await stream.send_message(response) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { '/cosmos.authz.v1beta1.Query/Grants': grpclib.const.Handler( self.__rpc_grants, grpclib.const.Cardinality.UNARY_UNARY, QueryGrantsRequest, QueryGrantsResponse, ), '/cosmos.authz.v1beta1.Query/GranterGrants': grpclib.const.Handler( self.__rpc_granter_grants, grpclib.const.Cardinality.UNARY_UNARY, QueryGranterGrantsRequest, QueryGranterGrantsResponse, ), '/cosmos.authz.v1beta1.Query/GranteeGrants': grpclib.const.Handler( self.__rpc_grantee_grants, grpclib.const.Cardinality.UNARY_UNARY, QueryGranteeGrantsRequest, QueryGranteeGrantsResponse, ), }
[docs]class MsgBase(ServiceBase):
[docs] async def grant(self, msg_grant: 'MsgGrant') -> 'MsgGrantResponse': raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def exec(self, msg_exec: 'MsgExec') -> 'MsgExecResponse': raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def revoke(self, msg_revoke: 'MsgRevoke') -> 'MsgRevokeResponse': raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_grant( self, stream: 'grpclib.server.Stream[MsgGrant, MsgGrantResponse]' ) -> None: request = await stream.recv_message() response = await self.grant(request) await stream.send_message(response) async def __rpc_exec( self, stream: 'grpclib.server.Stream[MsgExec, MsgExecResponse]' ) -> None: request = await stream.recv_message() response = await self.exec(request) await stream.send_message(response) async def __rpc_revoke( self, stream: 'grpclib.server.Stream[MsgRevoke, MsgRevokeResponse]' ) -> None: request = await stream.recv_message() response = await self.revoke(request) await stream.send_message(response) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { '/cosmos.authz.v1beta1.Msg/Grant': grpclib.const.Handler( self.__rpc_grant, grpclib.const.Cardinality.UNARY_UNARY, MsgGrant, MsgGrantResponse, ), '/cosmos.authz.v1beta1.Msg/Exec': grpclib.const.Handler( self.__rpc_exec, grpclib.const.Cardinality.UNARY_UNARY, MsgExec, MsgExecResponse, ), '/cosmos.authz.v1beta1.Msg/Revoke': grpclib.const.Handler( self.__rpc_revoke, grpclib.const.Cardinality.UNARY_UNARY, MsgRevoke, MsgRevokeResponse, ), }