Source code for xrpc.dsl

import signal as signalz
from enum import Enum
from typing import NamedTuple, Tuple, Optional


[docs]class RPCType(Enum): """The calling convention of the RPC point""" Repliable = 1 """Reply is expected from the receiver (OK-RECEIVED) Beware this does dead-lock services when they both try to send a repliable request at the same time to each other """ Durable = 2 """we only make sure the packet is received and do not wait for reply (UNDECIDED-RECEIVED)""" Signalling = 3 """we don't care if the packet is received (UNDECIDED-UNDECIDED)""" def __repr__(self): return str(self.name)
ATTR_RPC = 'rpc_conf' ATTR_REGULAR = 'rpc_regular' ATTR_SIGNAL = 'rpc_signal' ATTR_STARTUP = 'rpc_startup' ATTR_SOCKETIO = 'rpc_socketio' DEFAULT_GROUP = 'default'
[docs]class rpc(NamedTuple): type: RPCType = RPCType.Repliable """Selected calling convention for the RPC call""" group: str = DEFAULT_GROUP exc: bool = False """If an exception is raised while processing the packet from the transport, run this one""" # add 'RPC GROUPS' to allow certain ports to be used differently # todo: rpcs returning iterables def __call__(self, fn): setattr(fn, ATTR_RPC, self) return fn
[docs]class regular(NamedTuple): initial: Optional[float] = 0. """Initial wait time in seconds""" tick: bool = False """Run this function on every tick (this should affect the wait times)""" def __call__(self, fn): setattr(fn, ATTR_REGULAR, self) return fn
[docs]class signal(NamedTuple): codes: Tuple[int] = (signalz.SIGTERM, signalz.SIGINT) """Connect to this signal number""" def __call__(self, fn): setattr(fn, ATTR_SIGNAL, self) return fn
[docs]class startup(NamedTuple): empty: int = 0 def __call__(self, fn): setattr(fn, ATTR_STARTUP, self) return fn
# wait for a decision on file descriptor. class socketio(): empty: int = 0 def __call__(self, fn): setattr(fn, ATTR_SOCKETIO, self) return fn # todo unhandled_exception handler # todo startup handler # todo exit handler