module documentation

Undocumented

Function get_callable_name Undocumented
Function get_operation_factory Return the OperationHandler factory function for the object.
Function is_async_callable Return True if obj is an async callable.
Function is_callable Return True if obj is a callable.
Function is_subtype Undocumented
Function set_operation_factory Set the OperationHandler factory for this object.
Function set_service_definition Set the nexusrpc.ServiceDefinition for this class.
Constant _NEXUS_OPERATION_ATTR_NAME Undocumented
Constant _NEXUS_OPERATION_FACTORY_ATTR_NAME Undocumented
Constant _NEXUS_SERVICE_DEFINITION_ATTR_NAME Undocumented
def get_callable_name(fn: Callable[..., Any]) -> str:

Undocumented

def get_operation_factory(obj: Any) -> Callable[[Any], OperationHandler[Any, Any]] | None:

Return the OperationHandler factory function for the object.

def is_async_callable(obj: Any) -> TypeGuard[Callable[..., Awaitable[Any]]]:

Return True if obj is an async callable.

Supports partials of async callable class instances.

def is_callable(obj: Any) -> TypeGuard[Callable[..., Any]]:

Return True if obj is a callable.

def is_subtype(type1: type[Any], type2: type[Any]) -> bool:

Undocumented

def set_operation_factory(obj: Any, operation_factory: Callable[[Any], OperationHandler[InputT, OutputT]]):

Set the OperationHandler factory for this object.

obj should be an operation start method.

def set_service_definition(cls: type[ServiceT], service_definition: nexusrpc.ServiceDefinition):

Set the nexusrpc.ServiceDefinition for this class.

_NEXUS_OPERATION_ATTR_NAME: str =

Undocumented

Value
'__nexus_operation__'
_NEXUS_OPERATION_FACTORY_ATTR_NAME: str =

Undocumented

Value
'__nexus_operation_factory__'
_NEXUS_SERVICE_DEFINITION_ATTR_NAME: str =

Undocumented

Value
'__nexus_service_definition__'