JavaPorts
Javaports: A Component-based Framework for Network Computing

Fault Tolerant Manager-Workers Style Distributed Application

This is a fault tolerant Manager-Workers style distributed application. The Manager and Worker tasks use the fault-tolerance API methods to detect the state of their corresponding peers before communicating with them. A message passing operation will not be invoked on a Port object if the peer-port task is in the dead or completed states i.e. -1 or 2 respectively.

The Configuration File for this Application

BEGIN CONFIGURATION
BEGIN DEFINITIONS
DEFINE APPLICATION "faultol"
DEFINE MACHINE M1="gold" MASTER
DEFINE MACHINE M2="zinc"
DEFINE MACHINE M3="copper" 
DEFINE TASK T1="Manager" NUMOFPORTS=2 
DEFINE TASK T2="Worker" NUMOFPORTS=1 
DEFINE TASK T3="Worker" NUMOFPORTS=1
END DEFINITIONS
BEGIN ALLOCATIONS
ALLOCATE T1 M1
ALLOCATE T2 M2
ALLOCATE T3 M3
END ALLOCATIONS
BEGIN CONNECTIONS
CONNECT T1.P[0] T2.P[0]
CONNECT T1.P[1] T3.P[0]
END CONNECTIONS
END CONFIGURATION

NOTE: The above configuration file corresponds to an application built to execute on a local platform. In order to be able to run this application on another platform you will need to adjust the machine domain names (e.g. "gold" and "zinc") to the machine domain names on the target cluster. For machine domain names only the machine name should suffice. Machines utilized in running an application should belong to the same domain.

The QoS Setup File

RunQoSSystem=true
MeasLinkData=true
MinMsgSize=40
MaxMsgSize=32000
NumOfPastPointsToRetain=10
NumOfProbesPerPoint=3
LocalQoSDataUpdatePeriod=30
GlobalQoSDataUpdatePeriod=60

NOTE: The above setup file (QoSSetup.txt) contains the default parameters used by the QoS middleware. Make sure that the RunQoSSystem and MeasLinkData variables are set to "true" to turn QoS monitoring on while the application is running.

Completed Java Code for the Manager Template


package faultol;

import SCLASS.*;
import Lib.*;

import java.*;
import java.io.*;
import java.lang.*;
import java.util.*;
import java.rmi.*;
import java.rmi.server.*;
import java.rmi.registry.*;
import java.io.Serializable;
import java.rmi.server.UnicastRemoteObject;

public class Manager extends Thread {
  private Port[] port_;
  private String AppName_;
  private String TaskVarName_;
  private QoSService qosservice_ = null;

  // User class variables
  private int numOfJobs = 0;


  public Manager(String AppName, String TaskVarName)
  {
    super();
    AppName_ = AppName;
    TaskVarName_ = TaskVarName;
  }

  public Manager(){
  }


  // User methods 
  public void updateState(int doneJobs){
    try{
      FileOutputStream out = new FileOutputStream("m_state.txt");
      PrintStream p = new PrintStream(out);

      p.println(doneJobs);

      p.close();
    }catch(Exception e){
       System.out.println("error writing to m_state.txt");
    }
  }

  public int [] getAllJobs(){
    numOfJobs = 4;
    int [] jobs = new int[numOfJobs];

    for(int i = 0; i < jobs.length; i++){
      jobs[i] = (i+1)*10;
    }

    return jobs;
  }

  public boolean [] initWorkersQueue(){
    boolean [] workersQueue = new boolean[port_.length];

    for(int i = 0; i < workersQueue.length; i++){
      workersQueue[i] = true;
    }

    return workersQueue;
  }

  public int getReadyPort(boolean [] workersQueue, PortView [] portViews){ 
    int readyPort = -1;
    int peerTaskState = 0;

    if(workersQueue != null){
      for(int q = 0; q < workersQueue.length; q++){
        // if worker is ready 
        if(workersQueue[q]){
          peerTaskState = portViews[q].GetPeerTaskAttributeValue(
                                         ResourceAttribute.TASKSTATE);

          if(peerTaskState == 0 || peerTaskState == 1){
            readyPort = q;
            break;
          }
        }
      }
    }

    return readyPort;
  }



  public synchronized void run () 
  {
    try {
      PortManager portmanager = new PortManager();      
      port_ = portmanager.configure(AppName_, TaskVarName_);
      qosservice_ = new QoSService(AppName_, TaskVarName_);

      // User application code


      double [] vector = null;
      double [] result = null;
      int [] jobs = getAllJobs();
      int curJob = 0;
      int doneJobs = 0;
      int key = 0;
      int msgSize = 0;
      int p = 0;
      int w = 0;
      boolean [] workersQueue = initWorkersQueue(); 
      int [] portStates = new int[port_.length];
      int numOfWorkers = port_.length;
      TaskView tv = qosservice_.GetTaskView();
      PortView [] portViews = tv.GetPortViews();
      int state = 0;

      // wait for all workers to finish
      while(doneJobs < numOfJobs){
        for(p = 0; p < port_.length; p++){
          // if a worker returned its result i.e. it is also ready
          if(!workersQueue[p] && portStates[p] == 0){
            state = portViews[p].GetPeerTaskAttributeValue(
                                 ResourceAttribute.TASKSTATE);

            // if the Worker is up
            if(state == 0 || state == 1){
              // get the result from the Worker if its available
              if((result = (double [])port_[p].AsyncRead(key)) != null){
                workersQueue[p] = true; // mark this worker as ready
                doneJobs++;
                updateState(doneJobs);
                System.out.println("jobs done = " + doneJobs);
                System.out.println("Manager got result with size " + 
                                 result.length + " from worker " 
                                 + (p+1) + "\n");
              }
            }
            // A worker crashed before it returned its results, mark it as dead
            else if (portStates[p] != -1 && portStates[p] != 2){              
              curJob--; // to resend the unfinished job to another worker
              portStates[p] = state;
              System.out.println("!!!!!!! Worker " + (p+1) + " is down!!!!!!!!\n" + "curJob = " + curJob);
            }
          }
        }

        for(w = 0; w < numOfWorkers && (curJob < numOfJobs); w++){
            p = getReadyPort(workersQueue, portViews);

            // if the ready worker is up, then send a vector (job) to it
            if(p > -1){
              msgSize = jobs[curJob];
              workersQueue[p] = false;
              vector = new double[msgSize]; 
              port_[p].AsyncWrite(vector, key);
              curJob++;
              System.out.println("Manager sent job with size " + msgSize + " to worker " + (p+1));
            }
        }
      }// wait for all workers to finish

      // kill the workers
      for(p = 0; p < port_.length; p++){
        state = portViews[p].GetPeerTaskAttributeValue(
                               ResourceAttribute.TASKSTATE);

        if(state != -1 && state != 2){
          int [] flag = {1}; 
          port_[p].SyncWrite(flag, 1);
        }
      }



      qosservice_.release();
      portmanager.release(qosservice_);
    }
    catch(Throwable e) {
      e.printStackTrace();
      System.exit(-1);
    }
  }

  public static void main (String args[]) 
  {
    Manager ManagerThread = new Manager(args[0], args[1]);
    ManagerThread.setPriority(Thread.MIN_PRIORITY);
    ManagerThread.start();
    try {
      ManagerThread.join();
    } catch(Throwable e) {
      e.printStackTrace();
      System.exit(-1);
    }
    System.exit(0);
  }
}

Completed Java Code for the Worker Template


package faultol;

import SCLASS.*;
import Lib.*;

import java.*;
import java.lang.*;
import java.util.*;
import java.rmi.*;
import java.rmi.server.*;
import java.rmi.registry.*;
import java.io.Serializable;
import java.rmi.server.UnicastRemoteObject;

public class Worker extends Thread {
  private Port[] port_;
  private String AppName_;
  private String TaskVarName_;
  private QoSService qosservice_ = null;

  // User class variables

  public Worker(String AppName, String TaskVarName)
  {
    super();
    AppName_ = AppName;
    TaskVarName_ = TaskVarName;
  }

  // User methods

  public int getManagerPort(PortView portView){
    int p = -1;
    int state = portView.GetPeerTaskAttributeValue(
                         ResourceAttribute.TASKSTATE);

    if(state == 0 || state == 1){ 
       p = 0;
    }

    return p;
  }


  public synchronized void run () 
  {
    try{
      PortManager portmanager = new PortManager();      
      port_ = portmanager.configure(AppName_, TaskVarName_);
      qosservice_ = new QoSService(AppName_, TaskVarName_);

      // User application code


      Thread.sleep(5000);

      double [] vector = null;
      PortView portView = qosservice_.GetPortView(0);

      int p = getManagerPort(portView);

      // if the Manager is up and no exit signal is received (from the Manager) stay in the while loop
      while(p != -1 && port_[p].AsyncRead(1) == null){
        // get a vector from the Manager
        if((vector = (double [])port_[p].AsyncRead(0)) != null){
          System.out.println("Task " + TaskVarName_ + " got job with size " + vector.length);

          p = getManagerPort(portView);
          if(p == -1)
             break;

          // return result back to the Manager
          port_[p].AsyncWrite(vector, 0);
          System.out.println("Task " + TaskVarName_ + " finished a job");
        }

        p = getManagerPort(portView);

        // if the Manager is dead, then exit
        if(p == -1)
          break; 
      }
      System.out.println("Task " + TaskVarName_ + " exiting");


      qosservice_.release();
      portmanager.release(qosservice_);
    }
    catch(Throwable e) {
      e.printStackTrace();
      System.exit(-1);
    }
  }

  public static void main (String args[]) 
  {
    Worker WorkerThread = new Worker(args[0], args[1]);
    WorkerThread.setPriority(Thread.MIN_PRIORITY);
    WorkerThread.start();
    try {
      WorkerThread.join();
    } catch(Throwable e) {
      e.printStackTrace();
      System.exit(-1);
    }
      System.exit(0);
  }
}

* NOTE: The code in blue is automatically generated by the JavaPorts Application Configuration Toolset (JPACT) based on the parsed application configuration file. While the code in black is the user code that was added to the original task templates.

:: Last update: 03/14/2010 ::