A Load Balancing SPMD Application
This is a Manager-Workers style distributed Single Program
Multiple Data (SPMD) application. Task
T1 assumes the Manager's role, while the other
tasks assume a Worker's role. The Manager task
uses the QoS API to send the largest size messages
over the links with the best Throughput.
The Configuration File for this Application
BEGIN CONFIGURATION
BEGIN DEFINITIONS
DEFINE APPLICATION "thradapt"
DEFINE MACHINE M1="gold" MASTER
DEFINE MACHINE M2="fluorine"
DEFINE MACHINE M3="copper"
DEFINE TASK T1="spmd" NUMOFPORTS=2
DEFINE TASK T2="spmd" NUMOFPORTS=2
DEFINE TASK T3="spmd" NUMOFPORTS=2
END DEFINITIONS
BEGIN ALLOCATIONS
ALLOCATE T1 M1
ALLOCATE T2 M2
ALLOCATE T3 M3
END ALLOCATIONS
BEGIN CONNECTIONS
CONNECT T1.P[0] T2.P[0]
CONNECT T1.P[1] T3.P[0]
CONNECT T2.P[1] T3.P[1]
END CONNECTIONS
END CONFIGURATION
NOTE: The above configuration file corresponds to an
application built to execute on a local platform.
In order to be able to run this application on another
platform you will need to adjust the machine domain
names (e.g. "gold", "copper") to the machine domain
names on the target cluster. For machine domain names
only the machine name should suffice. Machines utilized
in running an application should belong to the same
domain.
The QoS Setup File
RunQoSSystem=true
MeasLinkData=true
MinMsgSize=40
MaxMsgSize=32000
NumOfPastPointsToRetain=10
NumOfProbesPerPoint=3
LocalQoSDataUpdatePeriod=30
GlobalQoSDataUpdatePeriod=60
NOTE: The above setup file (QoSSetup.txt) contains
the default parameters used by the QoS middleware. Make sure that
the RunQoSSystem and MeasLinkData variables are set to "true"
to turn QoS monitoring on while the application is running.
Completed Java Code for the Monitor Template
package thradapt;
import SCLASS.*;
import Lib.*;
import java.*;
import java.lang.*;
import java.util.*;
import java.rmi.*;
import java.rmi.server.*;
import java.rmi.registry.*;
import java.io.Serializable;
import java.rmi.server.UnicastRemoteObject;
public class spmd extends Thread {
private Port[] port_;
private QoSService qosservice_;
private String AppName_;
private String TaskVarName_;
// User class variables
public spmd(String AppName, String TaskVarName)
{
super();
AppName_ = AppName;
TaskVarName_ = TaskVarName;
}
public synchronized void run ()
{
try {
PortManager portmanager = new PortManager();
port_ = portmanager.configure(AppName_, TaskVarName_);
qosservice_ = new QoSService(AppName_, TaskVarName_);
// User Application code
// initialize the the size of the arrays to be sent
int [] arraySize = {1000, 100};
String exitSignal = new String("EXIT");
String tvn = qosservice_.GetTaskVarName();
// Task T1 is the Manager
if(tvn != null && tvn.compareTo("T1") == 0){
System.out.println("Manager " + tvn + ": is UP ...");
// get and print the Manager's TaskView
TaskView tv = qosservice_.GetTaskView();
System.out.println("Manager " + tvn + ": Here is my TaskView");
tv.PrintTaskView();
// get the Manager's port views
PortView [] portViews = tv.GetPortViews();
// sort the port views in descending order by link Throughput
PortView [] sortedViews = qosservice_.SortPortViews(portViews,
ResourceAttribute.THROUGHPUT,
false, 1);
// send the largest arrays over the links with highest Throughput
for(int p = 0; p < portViews.length; p++){
int portIndex = sortedViews[p].GetIndex();
int [] array = new int[arraySize[p]];
port_[portIndex].SyncWrite(array, 0);
}
// send exit signals to Workers
for(int p = 0; p < port_.length; p++){
port_[p].SyncWrite(exitSignal, 1);
}
System.out.println("Manager " + tvn + ": Completed ...");
}
else{// All other tasks are Workers
System.out.println("Worker " + tvn + ": is UP ...");
while(true){
// get an array from the Manager task
int [] myArray = (int [])port_[0].AsyncRead(0);
// get an exit signal from the Manager task
String str = (String)port_[0].AsyncRead(1);
if(myArray != null){
System.out.println("Worker " + tvn + ": my vector size = " + myArray.length);
}
if(str != null && str.compareTo("EXIT") == 0){
System.out.println("Worker " + tvn + ": Completed ...");
break;
}
}
}
qosservice_.release();
portmanager.release(qosservice_);
}
catch(Throwable e) {
e.printStackTrace();
System.exit(-1);
}
}
public static void main (String args[])
{
spmd spmdThread = new spmd(args[0], args[1]);
spmdThread.setPriority(Thread.MIN_PRIORITY);
spmdThread.start();
try {
spmdThread.join();
} catch(Throwable e) {
e.printStackTrace();
System.exit(-1);
}
System.exit(0);
}
}
* NOTE: The code in blue is automatically generated by the JavaPorts Application Configuration Toolset (JPACT) based on
the parsed application configuration file. While the code in black is the user code that was added to the original task templates.
|