push.lib
This section contains documentation for the internal library of PusH and is recommended for developers and advanced users only.
push.lib.context_switch
- class context_switch.ParticleCache(mk_module: Callable, args: List[any], cache_size: int, device: int, lock: allocate_lock)
Bases:
objectLoads particles on and off the accelerator.
- mk_module
The function to create a new module.
- Type:
Callable
- args
The arguments to pass to the mk_module function.
- Type:
List[any]
- lock
The lock for managing pinned particles.
- Type:
- contains(pid)
Check if the cache contains a particle.
- Parameters:
pid – The particle id.
- Returns:
True if the cache contains the particle, False otherwise.
- Return type:
- create(pid: int, mk_optim: Callable, mk_scheduler: Callable | None = None, prior=False, train_key=None) Module
Create a new module and manage the cache.
- Parameters:
pid (int) – The particle id.
mk_optim (Callable) – The function to create a new optimizer.
- Returns:
The created module.
- Return type:
nn.Module
- particles() List[int]
Returns a list of particle ids in the cache.
- Returns:
A list of particle ids.
- Return type:
List[int]
- release(pid, thread)
Release a pinned particle.
- Parameters:
pid – The particle id.
thread – The thread associated with the particle.
- Returns:
None
- save_to_disk(pid, path='./particles')
Saves the module associated with a particle to disk.
- class context_switch.ParticleCacheLRU(mk_module: Callable, args: List[any], cache_size: int, device: int)
Bases:
objectLoads particles on and off the accelerator.
- mk_module
The function to create a new module.
- Type:
Callable
- args
The arguments to pass to the mk_module function.
- Type:
List[any]
- contains(pid)
Check if the cache contains a particle.
- Parameters:
pid – The particle id.
- Returns:
True if the cache contains the particle, False otherwise.
- Return type:
- create(pid: int, mk_optim: Callable) Module
Create a new module and manage the cache.
- Parameters:
pid (int) – The particle id.
mk_optim (Callable) – The function to create a new optimizer.
- Returns:
The created module.
- Return type:
nn.Module
- particles() List[int]
Get a list of particle ids in the cache.
- Returns:
A list of particle ids.
- Return type:
List[int]
push.lib.messages
- class messages.NELBroadcastParticlesAckMSG
Bases:
MSGAcknowledgment message for broadcasting particles in the Node Event Loop.
- class messages.NELBroadcastParticlesMSG(in_queues: List[Any], out_queues: List[Any], particle_to_device: Dict[int, int])
Bases:
MSGMessage for broadcasting particles in the Node Event Loop.
- in_queues
List of input queues.
- Type:
List[Queue]
- out_queues
List of output queues.
- Type:
List[Queue]
- class messages.NELSaveModel(pid_fid: Tuple[int, int])
Bases:
MSGMessage for saving the model in the Node Event Loop.
- class messages.NELSaveModelAckPDMSG(pid_fid: Tuple[int, int])
Bases:
MSGAcknowledgment message for saving the model in the Node Event Loop.
- class messages.NodeEvtLoopCleanupMSG
Bases:
MSGMessage indicating the cleanup of the Node Event Loop.
- class messages.NodeEvtLoopInitMSG
Bases:
MSGMessage indicating the initialization of the Node Event Loop.
- class messages.ReceiveFuncAckMSG
Bases:
MSGAcknowledgment message for receiving function calls in Particle Communication.
- class messages.ReceiveFuncAckPDMSG(pid_fid: Tuple[int, int], result: Any)
Bases:
MSGAcknowledgment message for receiving function calls in Push Distribution.
- result
The result of the function call.
- Type:
Any
- class messages.ReceiveFuncMSG(pid_fid: Tuple[int, int], pid: int, msg_name: str, args: List[Any])
Bases:
MSGMessage for receiving function calls in Particle Communication.
- args
List of arguments for the function call.
- Type:
List[Any]
- class messages.ReceiveFuncPDMSG(pid_fid: Tuple[int, int], pid_to: int, msg: str, args: List[Any])
Bases:
MSGMessage for receiving function calls in Push Distribution.
- args
List of arguments for the function call.
- Type:
List[Any]
- class messages.ReceiveGetAckMSG(fid: int, pid: int, params: List[Tensor], params_grad: List[Tensor])
Bases:
MSGAcknowledgment message for receiving particle data in Particle Communication.
- params
List of parameters.
- Type:
List[torch.Tensor]
- params_grad
List of parameter gradients.
- Type:
List[torch.Tensor]
- class messages.ReceiveGetMSG(pid_fid: Tuple[int, int], pid_caller: int, pid: int)
Bases:
MSGMessage for receiving particle data in Particle Communication.
- class messages.ReceiveParametersAckPDMSG(pid_fid: Tuple[int, int], params: List[Tensor])
Bases:
MSGAcknowledgment message for receiving parameters in Push Distribution.
- params
List of parameters.
- Type:
List[torch.Tensor]
- class messages.ReceiveParametersPDMSG(pid_fid: Tuple[int, int], pid: int)
Bases:
MSGMessage for receiving parameters in Push Distribution.
- class messages.ReceiveParticleInitAckPDMSG
Bases:
MSGAcknowledgment message for particle initialization in Push Distribution.
- class messages.ReceiveParticleInitPDMSG(device: int, pid: int, mk_optim: Callable, mk_scheduler: Callable, prior: bool, train_key: int, receive: Callable, state: Any)
Bases:
MSGMessage for initializing particle reception in Push Distribution.
- mk_optim
The function to create a new optimizer.
- Type:
Callable
- receive
The function to receive particles.
- Type:
Callable
- state
State information for initialization.
- Type:
Any
- class messages.ReceiveRegisterAckPDMSG
Bases:
MSGAcknowledgment message for registering particle reception in Push Distribution.
push.lib.node_event_loop
- class node_event_loop.NodeEventLoop(mk_module: Callable, args: List[any], in_queue: Queue, out_queue: Queue, rank: int, devices: int, cache_size: int, view_size: int)
Bases:
WaitableThe Node Event Loop (NEL) is the main event loop for a node.
The NEL is responsible for mapping particles to devices and executing operations on particles.
- Parameters:
mk_module (Callable) – Function that creates a module.
args (List[any]) – Arguments to pass to mk_module.
in_queue (mp.Queue) – Receiving message queue.
out_queue (mp.Queue) – Direct queue to PusH.
rank (int) – Rank of NEL.
devices (int) – Devices on this NEL.
cache_size (int) – Size of particle cache.
view_size (int) – Size of view cache.
- forward(pid: int, x: Tensor, *args: any) PFuture
Executes the forward pass of the particle’s module.
- particles() List[int]
Returns a list of particle pid’s on all devices.
- Returns:
List of pid’s.
- Return type:
List[int]
- register_receive(pid: int, msg: str, fn: Callable, state: dict) None
Register receive functionality for particle pid.
- scheduler_step(pid: int, *args: any) PFuture
Performs a training step, including forward and backward passes.
- Parameters:
pid (int) – Identifier of the particle.
loss_fn (Callable) – Loss function used in the training step.
data (torch.Tensor) – Input data for the training step.
label (torch.Tensor) – Ground truth labels for the training step.
*args – Variable length argument list for the training step.
- Returns:
A future representing the result of the training step.
- Return type:
- send(send_particle: Particle, pid_curr: int, pid: int, msg_name: str, *args: any) PFuture
Sends a message to another particle for execution and returns a future.
- Parameters:
- Returns:
A future representing the result of the execution.
- Return type:
- step(pid: int, loss_fn: Callable, data: Tensor, label: Tensor, *args: any) PFuture
Performs a training step, including forward and backward passes.
- Parameters:
pid (int) – Identifier of the particle.
loss_fn (Callable) – Loss function used in the training step.
data (torch.Tensor) – Input data for the training step.
label (torch.Tensor) – Ground truth labels for the training step.
*args – Variable length argument list for the training step.
- Returns:
A future representing the result of the training step.
- Return type:
push.lib.utils
- utils.detach_to_cpu(val: Dict | List | Tensor) Dict | List | Tensor
Detaches and moves a nested structure of dictionaries, lists, and torch tensors to the CPU.
- Parameters:
val (Union[Dict, List, torch.Tensor]) – The input data structure.
- Returns:
The detached and moved data structure to the CPU.
- Return type:
Union[Dict, List, torch.Tensor]
- utils.detach_to_device(device: int, val: Dict | List | Tuple | Tensor, requires_grad=False) Dict | List | Tensor
Detaches and moves a nested structure of dictionaries, lists, tuples, and torch tensors to the specified device.
- Parameters:
- Returns:
The detached and moved data structure.
- Return type:
Union[Dict, List, torch.Tensor]
- utils.to_device(device: int, val: Dict | List | Tuple | Tensor) Dict | List | Tensor
Moves a nested structure of dictionaries, lists, tuples, and torch tensors to the specified device.
- Parameters:
device (int) – The target device identifier.
val (Union[Dict, List, Tuple, torch.Tensor]) – The input data structure.
- Returns:
The data structure with elements moved to the specified device.
- Return type:
Union[Dict, List, torch.Tensor]
push.lib.waitable
- class waitable.Waitable
Bases:
objectAn abstract class representing an object with a waitable operation.
- None
- _wait(fid) Any
Wait for the completion of a specific operation identified by the future ID (fid). This method should be implemented by subclasses.
- Usage:
This class can be subclassed to create objects that support waitable operations.