toolrack.aio

Utilities based on the asyncio library.

exception toolrack.aio.AlreadyRunning

The TimedCall is already running.

exception toolrack.aio.NotRunning

The TimedCall is not running.

class toolrack.aio.PeriodicCall(func: Callable, *args, **kwargs)

A TimedCall called at a fixed time intervals.

start(interval: Union[int, float], now: bool = True)

Start calling the function periodically.

Parameters:
  • interval – the time interval in seconds between calls.
  • now – whether to make the first call immediately.
class toolrack.aio.ProcessParserProtocol(future: _asyncio.Future, out_parser=None, err_parser=None)

Collect process stdout and stderr.

Line parser functions can be passed for stdout and stderr, and they are called on each full line of output.

When the process ends, the future returns a tuple with the full stdout and stderr. Each tuple element is None if a parser is passed for that stream.

Parameters:
  • future (asyncio.Future) – a Future called with a tuple with (stdout, stderr) from the process once it it exits.
  • out_parser (callable) – an optional parser for the process standard output.
  • err_parser (callable) – an optional parser for the process standard error.
connection_lost(exc)

Called when the connection is lost or closed.

The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).

pipe_data_received(fd, data)

Called when the subprocess writes data into stdout/stderr pipe.

fd is int file descriptor. data is bytes object.

class toolrack.aio.StreamHelper(callback: Optional[Callable[[str], None]] = None, separator: str = 'n')

Helper to cache data until full lines of text are received.

This is useful to collect data from a stream and process them when full lines are received. For example:

stream = StreamHelper(callback)
stream.receive_data('line one\nline two')
stream.receive_data('continues here\n')

would call callback twice, one with 'line one' and one with 'line two continues here'

Parameters:
  • callback (callable) – an optional function which is called with full lines of text from the stream.
  • separator (str) – the line separator
flush_partial()

Flush and process pending data from a partial line.

get_data()

Return the full content of the stream.

receive_data(data: str)

Receive data and process them.

If a callback has been passed to the class, it’s called for each full line of text.

class toolrack.aio.TimedCall(func: Callable, *args, **kwargs)

Call a function based on a timer.

The class takes a function with optional arguments. Upon start(), the function is scheduled at specified times until stop() is called (or the time iterator is exausted).

Parameters:
  • func – the function to call periodically.
  • args – arguments to pass to the function.
  • kwargs – keyword arguments to pass to the function.
running

Whether the PeriodicCall is currently running.

start(times_iter: Iterable[Union[float, int]])

Start calling the function at specified times.

Parameters:times_iter – an iterable yielding times to execute the function at. If the iterator exhausts, the TimedCall is stopped. Times must be compatible with loop.time().
stop()

Stop calling the function periodically.

It returns an asyncio.Future to wait for the stop to complete.