push.lib

This section contains documentation for the internal library of PusH and is recommended for developers and advanced users only.

push.lib.context_switch

class context_switch.ParticleCache(mk_module: Callable, args: List[any], cache_size: int, device: int, lock: allocate_lock)

Bases: object

Loads particles on and off the accelerator.

mk_module

The function to create a new module.

Type:

Callable

args

The arguments to pass to the mk_module function.

Type:

List[any]

cache_size

The maximum cache size.

Type:

int

device

The device id.

Type:

int

lock

The lock for managing pinned particles.

Type:

threading.Lock

contains(pid)

Check if the cache contains a particle.

Parameters:

pid – The particle id.

Returns:

True if the cache contains the particle, False otherwise.

Return type:

bool

create(pid: int, mk_optim: Callable, mk_scheduler: Callable | None = None, prior=False, train_key=None) Module

Create a new module and manage the cache.

Parameters:
  • pid (int) – The particle id.

  • mk_optim (Callable) – The function to create a new optimizer.

Returns:

The created module.

Return type:

nn.Module

particles() List[int]

Returns a list of particle ids in the cache.

Returns:

A list of particle ids.

Return type:

List[int]

release(pid, thread)

Release a pinned particle.

Parameters:
  • pid – The particle id.

  • thread – The thread associated with the particle.

Returns:

None

save_to_disk(pid, path='./particles')

Saves the module associated with a particle to disk.

Parameters:
  • pid (int) – The particle id.

  • path (str, optional) – The path to save the particle. Defaults to “./particles”.

Returns:

None

try_pin(pid: int) bool

Attempt to pin a particle.

Parameters:

pid (int) – The particle id.

Returns:

True if the pin attempt is successful, False otherwise.

Return type:

bool

try_read(pid: int, pin=False, msg=None) Module

Attempt to read a particle.

Parameters:
  • pid (int) – The particle id.

  • pin (bool, optional) – Whether to pin the particle. Defaults to False.

  • msg – Additional message. Defaults to None.

Returns:

The module associated with the particle.

Return type:

nn.Module

unpin(pid: int) None

Unpin a particle.

Parameters:

pid (int) – The particle id.

Returns:

None

class context_switch.ParticleCacheLRU(mk_module: Callable, args: List[any], cache_size: int, device: int)

Bases: object

Loads particles on and off the accelerator.

mk_module

The function to create a new module.

Type:

Callable

args

The arguments to pass to the mk_module function.

Type:

List[any]

cache_size

The maximum cache size.

Type:

int

device

The device id.

Type:

int

contains(pid)

Check if the cache contains a particle.

Parameters:

pid – The particle id.

Returns:

True if the cache contains the particle, False otherwise.

Return type:

bool

create(pid: int, mk_optim: Callable) Module

Create a new module and manage the cache.

Parameters:
  • pid (int) – The particle id.

  • mk_optim (Callable) – The function to create a new optimizer.

Returns:

The created module.

Return type:

nn.Module

particles() List[int]

Get a list of particle ids in the cache.

Returns:

A list of particle ids.

Return type:

List[int]

read(pid: int) Module

Read a particle from cache.

Parameters:

pid (int) – The particle id.

Returns:

The module associated with the particle.

Return type:

nn.Module

write(pid: int, module: Module) None

Write a particle to cache.

Parameters:
  • pid (int) – The particle id.

  • module (nn.Module) – The module to write to cache.

Returns:

None

push.lib.messages

class messages.MSG

Bases: object

Base class for messages in the system.

class messages.NELBroadcastParticlesAckMSG

Bases: MSG

Acknowledgment message for broadcasting particles in the Node Event Loop.

class messages.NELBroadcastParticlesMSG(in_queues: List[Any], out_queues: List[Any], particle_to_device: Dict[int, int])

Bases: MSG

Message for broadcasting particles in the Node Event Loop.

in_queues

List of input queues.

Type:

List[Queue]

out_queues

List of output queues.

Type:

List[Queue]

particle_to_device

Mapping of particle ids to devices.

Type:

Dict[int, int]

class messages.NELSaveModel(pid_fid: Tuple[int, int])

Bases: MSG

Message for saving the model in the Node Event Loop.

pid_fid

Tuple of particle id and file id.

Type:

Tuple[int, int]

class messages.NELSaveModelAckPDMSG(pid_fid: Tuple[int, int])

Bases: MSG

Acknowledgment message for saving the model in the Node Event Loop.

pid_fid

Tuple of particle id and file id.

Type:

Tuple[int, int]

class messages.NodeEvtLoopCleanupMSG

Bases: MSG

Message indicating the cleanup of the Node Event Loop.

class messages.NodeEvtLoopInitMSG

Bases: MSG

Message indicating the initialization of the Node Event Loop.

class messages.ReceiveFuncAckMSG

Bases: MSG

Acknowledgment message for receiving function calls in Particle Communication.

class messages.ReceiveFuncAckPDMSG(pid_fid: Tuple[int, int], result: Any)

Bases: MSG

Acknowledgment message for receiving function calls in Push Distribution.

pid_fid

Tuple of particle id and function id.

Type:

Tuple[int, int]

result

The result of the function call.

Type:

Any

class messages.ReceiveFuncMSG(pid_fid: Tuple[int, int], pid: int, msg_name: str, args: List[Any])

Bases: MSG

Message for receiving function calls in Particle Communication.

pid_fid

Tuple of particle id and function id.

Type:

Tuple[int, int]

pid

The particle id.

Type:

int

msg_name

The name of the message.

Type:

str

args

List of arguments for the function call.

Type:

List[Any]

class messages.ReceiveFuncPDMSG(pid_fid: Tuple[int, int], pid_to: int, msg: str, args: List[Any])

Bases: MSG

Message for receiving function calls in Push Distribution.

pid_fid

Tuple of particle id and function id.

Type:

Tuple[int, int]

pid_to

The target particle id.

Type:

int

msg

The message identifier.

Type:

str

args

List of arguments for the function call.

Type:

List[Any]

class messages.ReceiveGetAckMSG(fid: int, pid: int, params: List[Tensor], params_grad: List[Tensor])

Bases: MSG

Acknowledgment message for receiving particle data in Particle Communication.

fid

The function id.

Type:

int

pid

The particle id.

Type:

int

params

List of parameters.

Type:

List[torch.Tensor]

params_grad

List of parameter gradients.

Type:

List[torch.Tensor]

class messages.ReceiveGetMSG(pid_fid: Tuple[int, int], pid_caller: int, pid: int)

Bases: MSG

Message for receiving particle data in Particle Communication.

pid_fid

Tuple of particle id and function id.

Type:

Tuple[int, int]

pid_caller

The particle id of the caller.

Type:

int

pid

The particle id.

Type:

int

class messages.ReceiveParametersAckPDMSG(pid_fid: Tuple[int, int], params: List[Tensor])

Bases: MSG

Acknowledgment message for receiving parameters in Push Distribution.

pid_fid

Tuple of particle id and function id.

Type:

Tuple[int, int]

params

List of parameters.

Type:

List[torch.Tensor]

class messages.ReceiveParametersPDMSG(pid_fid: Tuple[int, int], pid: int)

Bases: MSG

Message for receiving parameters in Push Distribution.

pid_fid

Tuple of particle id and function id.

Type:

Tuple[int, int]

pid

The particle id.

Type:

int

class messages.ReceiveParticleInitAckPDMSG

Bases: MSG

Acknowledgment message for particle initialization in Push Distribution.

class messages.ReceiveParticleInitPDMSG(device: int, pid: int, mk_optim: Callable, mk_scheduler: Callable, prior: bool, train_key: int, receive: Callable, state: Any)

Bases: MSG

Message for initializing particle reception in Push Distribution.

device

The device id.

Type:

int

pid

The particle id.

Type:

int

mk_optim

The function to create a new optimizer.

Type:

Callable

receive

The function to receive particles.

Type:

Callable

state

State information for initialization.

Type:

Any

class messages.ReceiveRegisterAckPDMSG

Bases: MSG

Acknowledgment message for registering particle reception in Push Distribution.

class messages.ReceiveRegisterPDMSG(pid: int, msg: str, fn: Callable, state: Dict[str, Any])

Bases: MSG

Message for registering particle reception in Push Distribution.

pid

The particle id.

Type:

int

msg

The message identifier.

Type:

str

fn

The function associated with the registration.

Type:

Callable

state

State information for registration.

Type:

Dict[str, Any]

push.lib.node_event_loop

class node_event_loop.NodeEventLoop(mk_module: Callable, args: List[any], in_queue: Queue, out_queue: Queue, rank: int, devices: int, cache_size: int, view_size: int)

Bases: Waitable

The Node Event Loop (NEL) is the main event loop for a node.

The NEL is responsible for mapping particles to devices and executing operations on particles.

Parameters:
  • mk_module (Callable) – Function that creates a module.

  • args (List[any]) – Arguments to pass to mk_module.

  • in_queue (mp.Queue) – Receiving message queue.

  • out_queue (mp.Queue) – Direct queue to PusH.

  • rank (int) – Rank of NEL.

  • devices (int) – Devices on this NEL.

  • cache_size (int) – Size of particle cache.

  • view_size (int) – Size of view cache.

forward(pid: int, x: Tensor, *args: any) PFuture

Executes the forward pass of the particle’s module.

Parameters:
  • pid (int) – Identifier of the particle.

  • x (torch.Tensor) – Input tensor for the forward pass.

  • *args – Variable length argument list for the forward pass.

Returns:

A future representing the result of the forward pass.

Return type:

PFuture

get(pid_curr: int, pid: int) PFuture

Retrieves data from another particle and returns a future.

Parameters:
  • pid_curr (int) – Identifier of the current particle.

  • pid (int) – Identifier of the target particle.

Returns:

A future representing the result of the retrieval.

Return type:

PFuture

particles() List[int]

Returns a list of particle pid’s on all devices.

Returns:

List of pid’s.

Return type:

List[int]

register_receive(pid: int, msg: str, fn: Callable, state: dict) None

Register receive functionality for particle pid.

Parameters:
  • pid (int) – Particle identifier.

  • msg (str) – Message to respond to.

  • fn (Callable) – Function to execute on msg.

  • state (dict) – User state.

scheduler_step(pid: int, *args: any) PFuture

Performs a training step, including forward and backward passes.

Parameters:
  • pid (int) – Identifier of the particle.

  • loss_fn (Callable) – Loss function used in the training step.

  • data (torch.Tensor) – Input data for the training step.

  • label (torch.Tensor) – Ground truth labels for the training step.

  • *args – Variable length argument list for the training step.

Returns:

A future representing the result of the training step.

Return type:

PFuture

send(send_particle: Particle, pid_curr: int, pid: int, msg_name: str, *args: any) PFuture

Sends a message to another particle for execution and returns a future.

Parameters:
  • send_particle (Particle) – Particle instance sending the message.

  • pid_curr (int) – Identifier of the current particle.

  • pid (int) – Identifier of the target particle.

  • msg_name (str) – Name of the message to be executed.

  • *args – Variable length argument list for the message.

Returns:

A future representing the result of the execution.

Return type:

PFuture

step(pid: int, loss_fn: Callable, data: Tensor, label: Tensor, *args: any) PFuture

Performs a training step, including forward and backward passes.

Parameters:
  • pid (int) – Identifier of the particle.

  • loss_fn (Callable) – Loss function used in the training step.

  • data (torch.Tensor) – Input data for the training step.

  • label (torch.Tensor) – Ground truth labels for the training step.

  • *args – Variable length argument list for the training step.

Returns:

A future representing the result of the training step.

Return type:

PFuture

wait(pfutures: List[PFuture]) List[any]

Waits for a list of futures to complete and returns the results.

Parameters:

pfutures (List[PFuture]) – List of futures to wait for.

Returns:

List of results corresponding to the completed futures.

Return type:

List[any]

zero_grad(pid: int) PFuture

Clears gradients of the parameters in the particle’s module.

Parameters:

pid (int) – Identifier of the particle.

Returns:

A future representing the completion of the operation.

Return type:

PFuture

push.lib.utils

utils.detach_to_cpu(val: Dict | List | Tensor) Dict | List | Tensor

Detaches and moves a nested structure of dictionaries, lists, and torch tensors to the CPU.

Parameters:

val (Union[Dict, List, torch.Tensor]) – The input data structure.

Returns:

The detached and moved data structure to the CPU.

Return type:

Union[Dict, List, torch.Tensor]

utils.detach_to_device(device: int, val: Dict | List | Tuple | Tensor, requires_grad=False) Dict | List | Tensor

Detaches and moves a nested structure of dictionaries, lists, tuples, and torch tensors to the specified device.

Parameters:
  • device (int) – The target device identifier.

  • val (Union[Dict, List, Tuple, torch.Tensor]) – The input data structure.

  • requires_grad (bool) – If True, retains the computational graph for torch tensors.

Returns:

The detached and moved data structure.

Return type:

Union[Dict, List, torch.Tensor]

utils.to_device(device: int, val: Dict | List | Tuple | Tensor) Dict | List | Tensor

Moves a nested structure of dictionaries, lists, tuples, and torch tensors to the specified device.

Parameters:
  • device (int) – The target device identifier.

  • val (Union[Dict, List, Tuple, torch.Tensor]) – The input data structure.

Returns:

The data structure with elements moved to the specified device.

Return type:

Union[Dict, List, torch.Tensor]

push.lib.waitable

class waitable.Waitable

Bases: object

An abstract class representing an object with a waitable operation.

None
_wait(fid) Any

Wait for the completion of a specific operation identified by the future ID (fid). This method should be implemented by subclasses.

Usage:

This class can be subclassed to create objects that support waitable operations.