#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include "pvm3.h"
#include "pgpvm2.h"

#define TAG_UNSORTED                     1
#define TAG_SORTED                       2

#define NB_TASKS_PER_PROCESSOR           1
#define MAX_NB_TASKS                     1024
#define QSORT_TASK_NAME                  "dis_qsort_slave"
#define INPUT_FACTOR                     1000

void main(argc, argv) int argc; char *argv[];
{ int i, j, rank, nb, nbdata, minipos;
  char *data, *sorteddata, minival;
  int mytid, tids[MAX_NB_TASKS], nbtids, route;
  int startsubarray[MAX_NB_TASKS], indicessubarray[MAX_NB_TASKS],
      maxpossubarray[MAX_NB_TASKS];
  int nhost, narch, pvm_info;
  struct pvmhostinfo *hostp;

  assert(argc==2 || argc==3);
  nb=atoi(argv[1])*INPUT_FACTOR;

  data=(char *)malloc(sizeof(char)*nb);
  assert(data!=NULL);
  sorteddata=(char *)malloc(sizeof(char)*nb);
  assert(sorteddata!=NULL);

  mytid=pvm_mytid();

  route=pvm_setopt(PvmRoute, PvmRouteDirect);

  pvm_info=pvm_config(&nhost, &narch, &hostp);
  assert(pvm_info>=0);

  if (argc==3)
    nbtids=atoi(argv[2]);
  else
    nbtids=NB_TASKS_PER_PROCESSOR*nhost;

  assert(nbtids<=MAX_NB_TASKS);

  /* random initialization of the array to sort */
  for (i=0;i<nb;i++) data[i]=(char)rand();

  pg_startadmin("", "", nbtids+1);
  pg_tids("y");

  for (i=0;i<nhost-1;++i)
  { if (nbtids/nhost)
      pvm_spawn(QSORT_TASK_NAME, (char **)NULL, PvmTaskHost,
                hostp[i].hi_name, nbtids/nhost,
                tids+i*(nbtids/nhost));
  }
  pvm_spawn(QSORT_TASK_NAME, (char **)NULL, PvmTaskHost,
            hostp[i].hi_name,
            (nbtids/nhost)+((nbtids%nhost)?(nbtids%nhost):0),
            tids+i*(nbtids/nhost));
  
  pg_beglab(1);

  nbdata=nb/nbtids;
  for (i=0;i<nb%nbtids;++i)
  { startsubarray[i]=indicessubarray[i]=i*(nbdata+1);
    maxpossubarray[i]=startsubarray[i]+nbdata;
  }
  for (j=0;j<nbtids-nb%nbtids;++j)
  { startsubarray[i+j]=indicessubarray[i+j]=(nb%nbtids)*(nbdata+1)+j*(nbdata);
    maxpossubarray[i+j]=startsubarray[i+j]+nbdata-1;
  }

  for (i=0;i<nbtids;++i)
  { pvm_initsend(PvmDataRaw);
    pvm_pkint(&i, 1, 1);
    nbdata=maxpossubarray[i]-startsubarray[i]+1;
    pvm_pkint(&nbdata, 1, 1);
    printf("[%d]", nbdata);
    pvm_pkbyte(data+startsubarray[i], nbdata, 1);
    pvm_send(tids[i], TAG_UNSORTED);
  }
  printf(" distributed...\n"); fflush(stdout);
  pg_endlab(1);

  /* receiving sorted arrays */
  pg_beglab(2);
  for (i=0;i<nbtids;++i)
  { pvm_recv(-1, TAG_SORTED);
    pvm_upkint(&rank, 1, 1);
    pvm_upkbyte(data+startsubarray[rank],
                maxpossubarray[rank]-startsubarray[rank]+1, 1);
  }
  pg_endlab(2);

  /* merging sorted arrays */
  pg_beglab(3);
  for (i=0;i<nb;++i)
  { for (minipos=0;minipos<nbtids;++minipos)
      if (indicessubarray[minipos]<=maxpossubarray[minipos]) break;
    minival=data[indicessubarray[minipos]];
    for (j=0;j<nbtids;++j)
      if (indicessubarray[j] <= maxpossubarray[j] &&
          data[indicessubarray[j]]<minival)
      { minival=data[indicessubarray[j]];
        minipos=j;
      }
    sorteddata[i]=minival;
    indicessubarray[minipos]++;
  }
  pg_endlab(3);

  free(data); free(sorteddata);

  pvm_exit();
  exit(0);
}
