JavaPorts
Javaports: A Component-based Framework for Network Computing

A Load Balancing SPMD Application

This is a Manager-Workers style distributed Single Program Multiple Data (SPMD) application. Task T1 assumes the Manager's role, while the other tasks assume a Worker's role. The Manager task uses the QoS API to send the largest size messages over the links with the best Throughput.

The Configuration File for this Application

BEGIN CONFIGURATION
BEGIN DEFINITIONS
DEFINE APPLICATION "thradapt"
DEFINE MACHINE M1="gold" MASTER
DEFINE MACHINE M2="fluorine"
DEFINE MACHINE M3="copper" 
DEFINE TASK T1="spmd" NUMOFPORTS=2 
DEFINE TASK T2="spmd" NUMOFPORTS=2 
DEFINE TASK T3="spmd" NUMOFPORTS=2 
END DEFINITIONS
BEGIN ALLOCATIONS
ALLOCATE T1 M1
ALLOCATE T2 M2
ALLOCATE T3 M3
END ALLOCATIONS
BEGIN CONNECTIONS
CONNECT T1.P[0] T2.P[0]
CONNECT T1.P[1] T3.P[0]
CONNECT T2.P[1] T3.P[1]
END CONNECTIONS
END CONFIGURATION

NOTE: The above configuration file corresponds to an application built to execute on a local platform. In order to be able to run this application on another platform you will need to adjust the machine domain names (e.g. "gold", "copper") to the machine domain names on the target cluster. For machine domain names only the machine name should suffice. Machines utilized in running an application should belong to the same domain.

The QoS Setup File

RunQoSSystem=true
MeasLinkData=true
MinMsgSize=40
MaxMsgSize=32000
NumOfPastPointsToRetain=10
NumOfProbesPerPoint=3
LocalQoSDataUpdatePeriod=30
GlobalQoSDataUpdatePeriod=60

NOTE: The above setup file (QoSSetup.txt) contains the default parameters used by the QoS middleware. Make sure that the RunQoSSystem and MeasLinkData variables are set to "true" to turn QoS monitoring on while the application is running.

Completed Java Code for the Monitor Template


package thradapt;

import SCLASS.*;
import Lib.*;

import java.*;
import java.lang.*;
import java.util.*;
import java.rmi.*;
import java.rmi.server.*;
import java.rmi.registry.*;
import java.io.Serializable;
import java.rmi.server.UnicastRemoteObject;

public class spmd extends Thread {
  private Port[] port_;
  private QoSService qosservice_;
  private String AppName_;
  private String TaskVarName_;

  // User class variables

  public spmd(String AppName, String TaskVarName)
  {
    super();
    AppName_ = AppName;
    TaskVarName_ = TaskVarName;
  }

  public synchronized void run () 
  {
    try {
      PortManager portmanager = new PortManager();      
      port_ = portmanager.configure(AppName_, TaskVarName_);
      qosservice_ = new QoSService(AppName_, TaskVarName_);

      // User Application code


      // initialize the the size of the arrays to be sent
      int [] arraySize = {1000, 100};
      String exitSignal = new String("EXIT"); 
      String tvn = qosservice_.GetTaskVarName();

      // Task T1 is the Manager 
      if(tvn != null && tvn.compareTo("T1") == 0){
        System.out.println("Manager " + tvn + ": is UP ..."); 

        // get and print the Manager's TaskView
        TaskView tv = qosservice_.GetTaskView();
        System.out.println("Manager " + tvn + ": Here is my TaskView");
        tv.PrintTaskView();

        // get the Manager's port views
        PortView [] portViews = tv.GetPortViews(); 
        
        // sort the port views in descending order by link Throughput
        PortView [] sortedViews = qosservice_.SortPortViews(portViews,
                                                ResourceAttribute.THROUGHPUT, 
                                                false, 1);

        // send the largest arrays over the links with highest Throughput
        for(int p = 0; p < portViews.length; p++){
          int portIndex = sortedViews[p].GetIndex(); 
          int [] array = new int[arraySize[p]]; 
          port_[portIndex].SyncWrite(array, 0);
        }

        // send exit signals to Workers
        for(int p = 0; p < port_.length; p++){
          port_[p].SyncWrite(exitSignal, 1);
        }

        System.out.println("Manager " + tvn + ": Completed ...");
      }
      else{// All other tasks are Workers 
        System.out.println("Worker " + tvn + ": is UP ...");

        while(true){
           // get an array from the Manager task
           int  [] myArray = (int [])port_[0].AsyncRead(0);

           // get an exit signal from the Manager task
           String str = (String)port_[0].AsyncRead(1);

           if(myArray != null){
             System.out.println("Worker " + tvn + ": my vector size = " + myArray.length);
           }

           if(str != null && str.compareTo("EXIT") == 0){
              System.out.println("Worker " + tvn + ": Completed ...");
              break;
           }
        }
      }


      qosservice_.release();
      portmanager.release(qosservice_);
    }
    catch(Throwable e) {
      e.printStackTrace();
      System.exit(-1);
    }
  }

  public static void main (String args[]) 
  {
    spmd spmdThread = new spmd(args[0], args[1]);
    spmdThread.setPriority(Thread.MIN_PRIORITY);
    spmdThread.start();
    try {
      spmdThread.join();
    } catch(Throwable e) {
      e.printStackTrace();
      System.exit(-1);
    }
      System.exit(0);
  }
}

* NOTE: The code in blue is automatically generated by the JavaPorts Application Configuration Toolset (JPACT) based on the parsed application configuration file. While the code in black is the user code that was added to the original task templates.

:: Last update: 03/14/2010 ::