"""Hashing utilities for EIP-712 messages.
Copied under the MIT license from
https://github.com/ethereum/eth-account/blob/main/eth_account/_utils/encode_typed_data/encoding_and_hashing.py
with minor modifications.
"""
from __future__ import annotations
from collections.abc import Iterable, Iterator, Mapping, Sequence
from itertools import chain, groupby
from operator import itemgetter
from typing import Any, TypedDict
from eth_abi import encode, is_encodable, is_encodable_type
from eth_abi.grammar import parse
from eth_utils import keccak, to_tuple
from typing_extensions import TypeAlias
class _FieldDef(TypedDict):
name: str
type: str
_MsgTypes: TypeAlias = Mapping[str, Sequence[_FieldDef]]
[docs]
def get_dependencies(primary_type: str, types: _MsgTypes) -> tuple[str, ...]:
"""Perform DFS to get all the dependencies of the `primary_type`."""
deps = set()
struct_names_yet_to_be_expanded = [primary_type]
while struct_names_yet_to_be_expanded:
struct_name = struct_names_yet_to_be_expanded.pop()
deps.add(struct_name)
fields = types[struct_name]
for field in fields:
field_type = field["type"]
# Handle array types
if is_array_type(field_type):
field_type = field_type[: field_type.index("[")]
if field_type in types and field_type not in deps:
# We don't need to expand types that are not user defined (customized)
# Also skip types that we have already encountered
struct_names_yet_to_be_expanded.append(field_type)
# Don't need to make a struct as dependency of itself
deps.remove(primary_type)
return tuple(deps)
[docs]
def field_identifier(field: _FieldDef) -> str:
"""Stringify a field in ``'TYPE NAME'`` format."""
return f"{field['type']} {field['name']}"
[docs]
def encode_struct(struct_name: str, struct_field_types: Iterable[_FieldDef]) -> str:
"""Stringify a single struct in ``'NAME(type1 name1,type2 name2,...)'`` format."""
return "{name}({args})".format(
name=struct_name,
args=",".join(map(field_identifier, struct_field_types)),
)
[docs]
def encode_type(primary_type: str, types: _MsgTypes) -> str:
"""Encode type as concatenation of itself and all dependencies (alphabetical order).
The type of a struct is encoded as
.. code-block:: text
name ‖ "(" ‖ member₁ ‖ "," ‖ member₂ ‖ "," ‖ … ‖ memberₙ ")"
where each member is written as ``type ‖ " " ‖ name``.
"""
# Getting the dependencies and sorting them alphabetically as per EIP712
deps = get_dependencies(primary_type, types)
return "".join(
encode_struct(struct_name, types[struct_name])
for struct_name in chain((primary_type,), sorted(deps))
)
[docs]
def hash_struct_type(primary_type: str, types: _MsgTypes) -> bytes:
"""Hash string representation of type of struct."""
return keccak(text=encode_type(primary_type, types))
[docs]
def is_array_type(type_: str) -> bool:
"""Identify if type such as ``person[]`` or ``person[2]`` is an array."""
return type_.endswith("]")
[docs]
@to_tuple
def get_depths_and_dimensions(data: Any, depth: int) -> Iterator[tuple[int, int]]:
"""Generate tuples of depth and dimension of each element at that depth."""
if not isinstance(data, (list, tuple)):
# Not checking for Iterable instance, because even Dictionaries and strings
# are considered as iterables, but that's not what we want the condition to be.
return
yield depth, len(data)
for item in data:
# iterating over all 1 dimension less sub-data items
yield from get_depths_and_dimensions(item, depth + 1)
[docs]
def get_array_dimensions(data: Any) -> tuple[int | str, ...]:
"""Given an data item, check that it is an array and return the dimensions.
Examples:
>>> get_array_dimensions([[1, 2, 3], [4, 5, 6]])
(3, 2)
"""
depths_and_dimensions = get_depths_and_dimensions(data, 0)
# re-form as a dictionary with `depth` as key, and all of the dimensions
# found at that depth.
grouped_by_depth = {
depth: tuple(dimension for _, dimension in group)
for depth, group in groupby(depths_and_dimensions, itemgetter(0))
}
return tuple(
# check that all dimensions are the same, else use "dynamic"
dimensions[0] if all(dim == dimensions[0] for dim in dimensions) else "dynamic"
for _depth, dimensions in sorted(grouped_by_depth.items(), reverse=True)
)
def _check_dimensions(field_type: str, value: Any) -> None:
# Get the dimensions from the value
array_dimensions = get_array_dimensions(value)
# Get the dimensions from what was declared in the schema
parsed_field_type = parse(field_type)
for given, expected in zip(array_dimensions, parsed_field_type.arrlist):
if not expected:
# Skip empty or dynamically declared dimensions
continue
if given != expected[0]:
# Dimensions should match with declared schema
expected_dimensions_repr = tuple(
x[0] if x else "dynamic" for x in parsed_field_type.arrlist
)
raise TypeError(
f"Array data `{value}` has dimensions `{array_dimensions}`"
f" whereas the schema has dimensions `{expected_dimensions_repr}`"
)
def _encode_array_field(
types: _MsgTypes, name: str, field_type: str, value: Any
) -> bytes:
_check_dimensions(field_type, value)
if value:
field_type_of_inside_array = field_type[: field_type.rindex("[")]
field_type_value_pairs = [
encode_field(types, name, field_type_of_inside_array, item)
for item in value
]
data_types, data_hashes = zip(*field_type_value_pairs)
else:
data_types = data_hashes = ()
return keccak(encode(data_types, data_hashes))
[docs]
def encode_field(
types: _MsgTypes, name: str, field_type: str, value: Any
) -> tuple[str, bytes]:
"""Encode field according to given type.
Args:
types: mapping with all known types
name: field name
field_type: type of field (must be present in ``types`` or be basic type)
value: value to encode
Returns:
tuple of form (result_type, encoded_bytes)
"""
if value is None:
raise ValueError(f"Missing value for field {name} of type {field_type}")
if field_type in types:
return ("bytes32", keccak(encode_data(field_type, types, value)))
if field_type == "bytes":
if not isinstance(value, bytes):
raise TypeError(
f"Value of field `{name}` ({value}) is of the type `{type(value)}`, "
f"but expected bytes value"
)
return ("bytes32", keccak(value))
if field_type == "string":
if not isinstance(value, str):
raise TypeError(
f"Value of field `{name}` ({value}) is of the type `{type(value)}`, "
f"but expected string value"
)
return ("bytes32", keccak(text=value))
if is_array_type(field_type):
encoded = _encode_array_field(types, name, field_type, value)
return ("bytes32", encoded)
# First checking to see if field_type is valid as per abi
if not is_encodable_type(field_type):
raise TypeError(f"Received Invalid type `{field_type}` in field `{name}`")
# Next, see if the value is encodable as the specified field_type
if is_encodable(field_type, value):
# field_type is a valid type and the provided value is encodable as that type
return (field_type, value)
raise TypeError(
f"Value of `{name}` ({value}) is not encodable as type `{field_type}`. "
f"If the base type is correct, verify that the value does not "
f"exceed the specified size for the type."
)
[docs]
def encode_data(primary_type: str, types: _MsgTypes, data: Any) -> bytes:
"""Encode data defined by ``primary_type``."""
encoded_types = ["bytes32"]
encoded_values = [hash_struct_type(primary_type, types)]
for field in types[primary_type]:
type_, value = encode_field(
types, field["name"], field["type"], data[field["name"]]
)
encoded_types.append(type_)
encoded_values.append(value)
return encode(encoded_types, encoded_values)