Fault Tolerant Manager-Workers Style Distributed Application
This is a fault tolerant Manager-Workers style distributed
application. The Manager and Worker tasks use the
fault-tolerance API methods to detect the state of their
corresponding peers before communicating with them.
A message passing operation will not be invoked on a Port
object if the peer-port task is in the dead or completed
states i.e. -1 or 2 respectively.
The Configuration File for this Application
BEGIN CONFIGURATION
BEGIN DEFINITIONS
DEFINE APPLICATION "faultol"
DEFINE MACHINE M1="gold" MASTER
DEFINE MACHINE M2="zinc"
DEFINE MACHINE M3="copper"
DEFINE TASK T1="Manager" NUMOFPORTS=2
DEFINE TASK T2="Worker" NUMOFPORTS=1
DEFINE TASK T3="Worker" NUMOFPORTS=1
END DEFINITIONS
BEGIN ALLOCATIONS
ALLOCATE T1 M1
ALLOCATE T2 M2
ALLOCATE T3 M3
END ALLOCATIONS
BEGIN CONNECTIONS
CONNECT T1.P[0] T2.P[0]
CONNECT T1.P[1] T3.P[0]
END CONNECTIONS
END CONFIGURATION
NOTE: The above configuration file corresponds to an
application built to execute on a local platform.
In order to be able to run this application on another
platform you will need to adjust the machine domain
names (e.g. "gold" and "zinc") to the machine domain
names on the target cluster. For machine domain names
only the machine name should suffice. Machines utilized
in running an application should belong to the same
domain.
The QoS Setup File
RunQoSSystem=true
MeasLinkData=true
MinMsgSize=40
MaxMsgSize=32000
NumOfPastPointsToRetain=10
NumOfProbesPerPoint=3
LocalQoSDataUpdatePeriod=30
GlobalQoSDataUpdatePeriod=60
NOTE: The above setup file (QoSSetup.txt) contains
the default parameters used by the QoS middleware. Make sure that
the RunQoSSystem and MeasLinkData variables are set to "true"
to turn QoS monitoring on while the application is running.
Completed Java Code for the Manager Template
package faultol;
import SCLASS.*;
import Lib.*;
import java.*;
import java.io.*;
import java.lang.*;
import java.util.*;
import java.rmi.*;
import java.rmi.server.*;
import java.rmi.registry.*;
import java.io.Serializable;
import java.rmi.server.UnicastRemoteObject;
public class Manager extends Thread {
private Port[] port_;
private String AppName_;
private String TaskVarName_;
private QoSService qosservice_ = null;
// User class variables
private int numOfJobs = 0;
public Manager(String AppName, String TaskVarName)
{
super();
AppName_ = AppName;
TaskVarName_ = TaskVarName;
}
public Manager(){
}
// User methods
public void updateState(int doneJobs){
try{
FileOutputStream out = new FileOutputStream("m_state.txt");
PrintStream p = new PrintStream(out);
p.println(doneJobs);
p.close();
}catch(Exception e){
System.out.println("error writing to m_state.txt");
}
}
public int [] getAllJobs(){
numOfJobs = 4;
int [] jobs = new int[numOfJobs];
for(int i = 0; i < jobs.length; i++){
jobs[i] = (i+1)*10;
}
return jobs;
}
public boolean [] initWorkersQueue(){
boolean [] workersQueue = new boolean[port_.length];
for(int i = 0; i < workersQueue.length; i++){
workersQueue[i] = true;
}
return workersQueue;
}
public int getReadyPort(boolean [] workersQueue, PortView [] portViews){
int readyPort = -1;
int peerTaskState = 0;
if(workersQueue != null){
for(int q = 0; q < workersQueue.length; q++){
// if worker is ready
if(workersQueue[q]){
peerTaskState = portViews[q].GetPeerTaskAttributeValue(
ResourceAttribute.TASKSTATE);
if(peerTaskState == 0 || peerTaskState == 1){
readyPort = q;
break;
}
}
}
}
return readyPort;
}
public synchronized void run ()
{
try {
PortManager portmanager = new PortManager();
port_ = portmanager.configure(AppName_, TaskVarName_);
qosservice_ = new QoSService(AppName_, TaskVarName_);
// User application code
double [] vector = null;
double [] result = null;
int [] jobs = getAllJobs();
int curJob = 0;
int doneJobs = 0;
int key = 0;
int msgSize = 0;
int p = 0;
int w = 0;
boolean [] workersQueue = initWorkersQueue();
int [] portStates = new int[port_.length];
int numOfWorkers = port_.length;
TaskView tv = qosservice_.GetTaskView();
PortView [] portViews = tv.GetPortViews();
int state = 0;
// wait for all workers to finish
while(doneJobs < numOfJobs){
for(p = 0; p < port_.length; p++){
// if a worker returned its result i.e. it is also ready
if(!workersQueue[p] && portStates[p] == 0){
state = portViews[p].GetPeerTaskAttributeValue(
ResourceAttribute.TASKSTATE);
// if the Worker is up
if(state == 0 || state == 1){
// get the result from the Worker if its available
if((result = (double [])port_[p].AsyncRead(key)) != null){
workersQueue[p] = true; // mark this worker as ready
doneJobs++;
updateState(doneJobs);
System.out.println("jobs done = " + doneJobs);
System.out.println("Manager got result with size " +
result.length + " from worker "
+ (p+1) + "\n");
}
}
// A worker crashed before it returned its results, mark it as dead
else if (portStates[p] != -1 && portStates[p] != 2){
curJob--; // to resend the unfinished job to another worker
portStates[p] = state;
System.out.println("!!!!!!! Worker " + (p+1) + " is down!!!!!!!!\n" + "curJob = " + curJob);
}
}
}
for(w = 0; w < numOfWorkers && (curJob < numOfJobs); w++){
p = getReadyPort(workersQueue, portViews);
// if the ready worker is up, then send a vector (job) to it
if(p > -1){
msgSize = jobs[curJob];
workersQueue[p] = false;
vector = new double[msgSize];
port_[p].AsyncWrite(vector, key);
curJob++;
System.out.println("Manager sent job with size " + msgSize + " to worker " + (p+1));
}
}
}// wait for all workers to finish
// kill the workers
for(p = 0; p < port_.length; p++){
state = portViews[p].GetPeerTaskAttributeValue(
ResourceAttribute.TASKSTATE);
if(state != -1 && state != 2){
int [] flag = {1};
port_[p].SyncWrite(flag, 1);
}
}
qosservice_.release();
portmanager.release(qosservice_);
}
catch(Throwable e) {
e.printStackTrace();
System.exit(-1);
}
}
public static void main (String args[])
{
Manager ManagerThread = new Manager(args[0], args[1]);
ManagerThread.setPriority(Thread.MIN_PRIORITY);
ManagerThread.start();
try {
ManagerThread.join();
} catch(Throwable e) {
e.printStackTrace();
System.exit(-1);
}
System.exit(0);
}
}
Completed Java Code for the Worker Template
package faultol;
import SCLASS.*;
import Lib.*;
import java.*;
import java.lang.*;
import java.util.*;
import java.rmi.*;
import java.rmi.server.*;
import java.rmi.registry.*;
import java.io.Serializable;
import java.rmi.server.UnicastRemoteObject;
public class Worker extends Thread {
private Port[] port_;
private String AppName_;
private String TaskVarName_;
private QoSService qosservice_ = null;
// User class variables
public Worker(String AppName, String TaskVarName)
{
super();
AppName_ = AppName;
TaskVarName_ = TaskVarName;
}
// User methods
public int getManagerPort(PortView portView){
int p = -1;
int state = portView.GetPeerTaskAttributeValue(
ResourceAttribute.TASKSTATE);
if(state == 0 || state == 1){
p = 0;
}
return p;
}
public synchronized void run ()
{
try{
PortManager portmanager = new PortManager();
port_ = portmanager.configure(AppName_, TaskVarName_);
qosservice_ = new QoSService(AppName_, TaskVarName_);
// User application code
Thread.sleep(5000);
double [] vector = null;
PortView portView = qosservice_.GetPortView(0);
int p = getManagerPort(portView);
// if the Manager is up and no exit signal is received (from the Manager) stay in the while loop
while(p != -1 && port_[p].AsyncRead(1) == null){
// get a vector from the Manager
if((vector = (double [])port_[p].AsyncRead(0)) != null){
System.out.println("Task " + TaskVarName_ + " got job with size " + vector.length);
p = getManagerPort(portView);
if(p == -1)
break;
// return result back to the Manager
port_[p].AsyncWrite(vector, 0);
System.out.println("Task " + TaskVarName_ + " finished a job");
}
p = getManagerPort(portView);
// if the Manager is dead, then exit
if(p == -1)
break;
}
System.out.println("Task " + TaskVarName_ + " exiting");
qosservice_.release();
portmanager.release(qosservice_);
}
catch(Throwable e) {
e.printStackTrace();
System.exit(-1);
}
}
public static void main (String args[])
{
Worker WorkerThread = new Worker(args[0], args[1]);
WorkerThread.setPriority(Thread.MIN_PRIORITY);
WorkerThread.start();
try {
WorkerThread.join();
} catch(Throwable e) {
e.printStackTrace();
System.exit(-1);
}
System.exit(0);
}
}
* NOTE: The code in blue is automatically generated by the JavaPorts Application Configuration Toolset (JPACT) based on
the parsed application configuration file. While the code in black is the user code that was added to the original task templates.
|