%include global { const int MAX_JOBS_PER_SERVER = 2; }; class Local . JobController { /*! Provides a service that distributes tasks across a load-balanced pool of servers. !*/ oid peerRegistry; int serverIdCounter; assoc serverList; array serverOID; array serverLoad; array serverForJob; array queuedWork; int firstEntryIndex; int lastEntryIndex; int maxConcurrentJobs; int jobsInProgress; } inherits from Object; JobController:create() { peerRegistry = lookupLocalService("/PeerRegistry"); if (peerRegistry == nil) { display("JobController: no /PeerRegistry\n"); send "deleteYourself" to thisObject; } send "peerRegistered"(ObjectCreator) to thisObject; send "addNotifyWhenPeerRegistered"(thisObject, 1) to peerRegistry from nil; firstEntryIndex = 1; lastEntryIndex = 1; } JobController:delete() { if (peerRegistry != nil) { send "removeNotifyWhenPeerRegistered"(thisObject) to peerRegistry from nil; } } JobController:peerRegistered(oid remotePeer) { string key; if (remotePeer == nil) exit; key = makeAsString(remotePeer); serverIdCounter += 1; serverList[key] = serverIdCounter; serverOID[serverIdCounter] = remotePeer; serverLoad[serverIdCounter] = 0; maxConcurrentJobs = elementCount(serverList) * MAX_JOBS_PER_SERVER; while (firstEntryIndex < lastEntryIndex) { if (jobsInProgress > maxConcurrentJobs) break; call "dequeueJob"(); } } JobController:peerUnregistered(oid remotePeer) { string key; int id; key = makeAsString(remotePeer); id = serverList[key]; deleteIndex(serverList, key); deleteIndex(serverOID, id); deleteIndex(serverLoad, id); maxConcurrentJobs = elementCount(serverList) * MAX_JOBS_PER_SERVER; } JobController:queueJob(oid client, any clientInfo, string className, assoc acl, set args) { /*! Queue a work unit. Result returned to client by sending it a jobComplete message and passing it clientInfo and the result of the job. !*/ queuedWork[lastEntryIndex] = argv; lastEntryIndex += 1; if (jobsInProgress < maxConcurrentJobs) { call "dequeueJob"(); } if (fromObject != nil) return (0); } JobController:dequeueJob() { array rec; string className; assoc acl; set args; oid obj, rmtObj; int server, transactionId; if (lastEntryIndex == firstEntryIndex) return (0); transactionId = firstEntryIndex; firstEntryIndex += 1; jobsInProgress += 1; server = call "selectServer"(); serverForJob[transactionId] = server; rmtObj = serverOID[server]; serverLoad[server] += 1; rec = queuedWork[transactionId]; className = rec[2]; acl = rec[3]; args = rec[4]; send "createObject"(className, acl, thisObject, transactionId, args) to rmtObj from nil; return (transactionId); } JobController:returnJobResult(int transactionId, any result) { /*! Notes the completion of a queued task. !*/ array rec; int server; oid client; any clientInfo; if (indexExists(serverForJob, transactionId) == 0) { debugDisplay(debugLogLevel3, "JobController:jobComplete no such transaction", transactionId, "\n"); exit; } jobsInProgress -= 1; server = serverForJob[transactionId]; serverLoad[server] -= 1; rec = queuedWork[transactionId]; client = rec[0]; clientInfo = rec[1]; deleteIndex(queuedWork, transactionId); deleteIndex(serverForJob, transactionId); send "jobComplete"(clientInfo, result) to client; if (jobsInProgress < maxConcurrentJobs) { call "dequeueJob"(); } } JobController:selectServer() { int minId, minLoad, i, l; minId = -1; minLoad = 0x7f0000; for(i=nextIndex(serverLoad, 0);i != 0;i=nextIndex(serverLoad,i)) { l = serverLoad[i]; if (l < minLoad) { minLoad = l; minId = i; } } return (minId); } JobController:timerExpired() { } class Local . TestJobController { oid controllerObj; int queuedJobs; int completedJobs; } inherits from Object; TestJobController:create(int count) { assoc acl; int i; set args; acl = makeDefaultACL(); controllerObj = send "createObject"("JobController", acl, "TestPool") to ObjectCreator; queuedJobs = count; args += "hi"; for(i=1;i<=count;i+=1) { send "queueJob"(thisObject, i, "TestJob", acl, emptySet + args) to controllerObj from nil; } } TestJobController:delete() { send "deleteYourself" to controllerObj; } TestJobController:jobComplete(any jobInfo, any result) { display("result from job ", jobInfo, " is ", result, "\n"); completedJobs += 1; if (completedJobs == queuedJobs) send "deleteYourself" to thisObject; } class Local . TestJob { int transactionId; oid jobController; } inherits from Object; TestJob:create(oid jobCntrl, int transId, any arg1) { string result; jobController = jobCntrl; transactionId = transId; // Do lots of work... result = makeAsString("[", transId, ":", arg1, "]"); // After lots of work completed, return result... send "returnJobResult"(transactionId, result) to jobController; send "deleteYourself" to thisObject; // all done, delete } TestJob:delete() { }