diff -Naur --exclude=_MTN base/src/cml/src/cml.cm final/src/cml/src/cml.cm --- base/src/cml/src/cml.cm 2006-10-20 13:42:14.000000000 -0700 +++ final/src/cml/src/cml.cm 2006-10-20 13:42:25.000000000 -0700 @@ -8,6 +8,8 @@ signature CML structure CML + structure SimTimeOut + signature SYNC_VAR structure SyncVar signature MAILBOX @@ -16,6 +18,7 @@ structure RunCML structure Debug + structure SchedulerGlueControl signature CML_STREAM_IO signature CML_TEXT_STREAM_IO diff -Naur --exclude=_MTN base/src/cml/src/cml-internal.cm final/src/cml/src/cml-internal.cm --- base/src/cml/src/cml-internal.cm 2006-10-20 13:42:14.000000000 -0700 +++ final/src/cml/src/cml-internal.cm 2006-10-20 13:42:25.000000000 -0700 @@ -15,10 +15,12 @@ structure SyncVar signature MAILBOX structure Mailbox + structure SimTimeOut structure RunCML structure Debug + structure SchedulerGlueControl signature OS_PROCESS signature OS_IO diff -Naur --exclude=_MTN base/src/cml/src/core-cml/channel.sml final/src/cml/src/core-cml/channel.sml --- base/src/cml/src/core-cml/channel.sml 2006-10-20 13:42:14.000000000 -0700 +++ final/src/cml/src/core-cml/channel.sml 2006-10-20 13:42:25.000000000 -0700 @@ -23,7 +23,7 @@ end = struct - structure T = Thread + (* structure T = Thread *) structure S = Scheduler structure R = RepTypes diff -Naur --exclude=_MTN base/src/cml/src/core-cml/cml-util.cm final/src/cml/src/core-cml/cml-util.cm --- base/src/cml/src/core-cml/cml-util.cm 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/cml-util.cm 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,11 @@ +(* Things needed both by the scheduler component and the rest of CML *) +Group + structure RepTypes + structure Q + structure Debug +is + $/basis.cm + rep-types.sml + queue.sml + debug.sml + $/smlnj-lib.cm diff -Naur --exclude=_MTN base/src/cml/src/core-cml/debug.sml final/src/cml/src/core-cml/debug.sml --- base/src/cml/src/core-cml/debug.sml 2006-10-20 13:42:14.000000000 -0700 +++ final/src/cml/src/core-cml/debug.sml 2006-10-20 13:42:25.000000000 -0700 @@ -21,7 +21,7 @@ val getCurThread : unit -> RepTypes.thread_id = Unsafe.getVar - fun sayDebugId msg = sayDebug(concat[ + fun sayDebugId msg = sayDebug(concat["[", Time.fmt 3 (Time.now()), "] ", RepTypes.tidToString(getCurThread()), " ", msg ]) diff -Naur --exclude=_MTN base/src/cml/src/core-cml/event.sml final/src/cml/src/core-cml/event.sml --- base/src/cml/src/core-cml/event.sml 2006-10-20 13:42:14.000000000 -0700 +++ final/src/cml/src/core-cml/event.sml 2006-10-20 13:42:25.000000000 -0700 @@ -85,16 +85,16 @@ fun atomicCVarSet (R.CVAR state) = ( case !state of (R.CVAR_unset waiting) => let - val R.Q{rear, ...} = S.rdyQ1 - fun add [] = !rear + fun add [] = () | add ({transId=ref R.CANCEL, ...}::r) = add r | add ({transId as ref(R.TRANS tid), cleanUp, kont}::r) = ( transId := R.CANCEL; cleanUp(); - (tid, kont) :: (add r)) + S.simpleEnqueue (tid, kont); + add r) in state := R.CVAR_set 1; - rear := add waiting + add waiting end | _ => error "cvar already set" (* end case *)) diff -Naur --exclude=_MTN base/src/cml/src/core-cml/io-manager.sml final/src/cml/src/core-cml/io-manager.sml --- base/src/cml/src/core-cml/io-manager.sml 2006-10-20 13:42:14.000000000 -0700 +++ final/src/cml/src/core-cml/io-manager.sml 2006-10-20 13:42:25.000000000 -0700 @@ -96,7 +96,7 @@ in tid := R.CANCEL; cleanUp(); - S.enqueueThread (id, uk) + S.markAndEnqueue (id, uk) end | enqueue ({tid=ref R.CANCEL, ...}, _) = () diff -Naur --exclude=_MTN base/src/cml/src/core-cml/rep-types.sml final/src/cml/src/core-cml/rep-types.sml --- base/src/cml/src/core-cml/rep-types.sml 2006-10-20 13:42:14.000000000 -0700 +++ final/src/cml/src/core-cml/rep-types.sml 2006-10-20 13:42:25.000000000 -0700 @@ -16,9 +16,15 @@ rear : 'a list ref } + datatype Priority = NUM of int | SIM (* extend with addition choices as needed *) + type priority = Priority option + (** thread IDs --- see threads.sml **) datatype thread_id = TID of { (* thread ids *) id : int, (* an unique ID *) + (* call enqFn with #1 true if preempted *) + enqFn : enqueueFn ref, + priority : priority, alert : bool ref, (* true, if there is a pending alert on this *) (* thread *) done_comm : bool ref, (* set this whenever this thread does some *) @@ -28,6 +34,15 @@ dead : cvar (* the cvar that becomes set when the thread *) (* dies *) } + + (* !important! a thread's priority is determined principally by + * the qForPriority field, not the priority field. Changing the + * priority field alone will do nothing except confuse the reader. + * It should be safe for a + * thread to change its own qForPriority field since it won't be + * on the queue at the time. I haven't analyzed whether it is safe + * for a different thread to change the qForPriority field. + *) (* transaction IDs are used to mark blocked threads in the various waiting * queues. They are "cancelled" when some other event is selected. @@ -41,7 +56,7 @@ * internal synchronization conditions (e.g., nack events, I/O * synchronization, and thread termination). *) - and cvar = CVAR of cvar_state ref + and cvar_state = CVAR_unset of { transId : trans_id ref, @@ -49,6 +64,11 @@ kont : unit SMLofNJ.Cont.cont } list | CVAR_set of int + and cvar = CVAR of cvar_state ref + + withtype enqueueFn = (bool*(thread_id*unit SMLofNJ.Cont.cont) -> unit) + + type sched_element = (thread_id*unit SMLofNJ.Cont.cont) (** events --- see events.sml **) datatype 'a event_status diff -Naur --exclude=_MTN base/src/cml/src/core-cml/scheduler/heap.cm final/src/cml/src/core-cml/scheduler/heap.cm --- base/src/cml/src/core-cml/scheduler/heap.cm 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/scheduler/heap.cm 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,7 @@ +Group + structure PriorityHeap +is + ../../../../MLRISC/library/heap.sml (private) + ../../../../MLRISC/library/priQueue.sig (private) + $/basis.cm + diff -Naur --exclude=_MTN base/src/cml/src/core-cml/scheduler/README-PS final/src/cml/src/core-cml/scheduler/README-PS --- base/src/cml/src/core-cml/scheduler/README-PS 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/scheduler/README-PS 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,108 @@ +This directory contains a reorganized scheduler component for CML. +The goal of the reorganization was to allow schedulers implementing +different scheduling disciplines to be used with CML. The currently +available schedulers implement John Reppy's original CML dynamic +priority scheme -- see John's Concurrent Programming in ML book, +pp. 244-246 -- and conventional, strict-priority scheduling for +which two implementations are provided. + + +Organization + +As in John's original implementation, scheduling functionality is +provided to the rest of CML by scheduler.sml. Almost all of the +changes are confined below the interface it provides. In a few places +I had to change files in the parent directory, e.g. thread.sml, +event.sml, etc., because they took advantage of knowledge of the +queueing implementation used by the scheduler. The interface is now +implementation-independent. The thread interface had to be widened to +accomodate priorities, of course. + +To make it possible to change scheduler policies and implementations a +"glue" layer was introduced between scheduler.sml and the queueing +implementation(s). The glue layer has two interfaces, one for use by +the scheduler itself (signature SCHEDULER_GLUE) and the other for use +by CML application programs (signature SCHEDULER_GLUE_CONTROL). Only +the latter is exposed upwards by the .cm files for CML. + +Below the glue layer sit the queue implementations matching signature +SCHED_QUEUES. As noted above there are currently three +implementations: John's dynamic priority scheme, and two +implementations of the strict priority scheme, one using leftist heaps +and the other using the array-based priority queue implementation +from the MLRISC library. + +Files at the SCHED_QUEUES layer: +sched-queues-sig.sml +sched-queues-array.sml +sched-queues-leftist.sml +sched-queues-default.sml + +My original glue layer implementation required the application to +choose the queueing mechanism at run-time before calling +RunCML.doit(). But SCHED_QUEUES is very nearly a combination of +SCHEDULER_GLUE and SCHEDULER_GLUE_CONTROL so it proved rather simple +to provide a way to choose at compile time. Compilation manager +symbol definitions control the choice and a couple of functors map the +implementations of SCHED_QUEUES into an implementation of +SCHEDULER_GLUE_CONTROL. + +The choice of which scheme is used in any given compilation is governed +by defining CM symbols on the command line used to invoke SML (-Dsym). + +-DCML_SCHED_RT defers the choice to run-time. If not changed using +SchedulerGlueControl.useXXXQueues() then the default is the +dynamic-priority scheduler. + +-DCML_SCHED_STATIC binds the array-based static-priority scheduler at +compile time. + +If neither switch is given the dynamic-priority scheduler is bound at +compile time. + +Glue-layer signatures and types: +scheduler-glue-sig.sml +scheduler-glue-control-sig.sml +scheduler-glue-types.sml + +Run-time choice implementation: +scheduler-glue-dynamic.sml -- implementing structure +scheduler-glue.sml -- glue view of scheduler-glue-dynamic +scheduler-glue-control.sml -- glue_control view of scheduler-glue-dynamic + +Compile-time choice implementation: +scheduler-glue-static-fn.sml +scheduler-glue-control-fn.sml +scheduler-glue-static-xxx.sml (applications of the functors) + +The scheduler component is now complicated enough that I wanted to +separate it from the rest of core-cml. The RepTypes, Q, and Debug +structures are needed in both core-cml.cm and scheduler.cm so I moved +them to a separate cml-util CM Group (since they don't rightly belong +to scheduler). + +About Strict Priority Scheduling + +The intention in strict priority scheduling is that the CPU should +always be executing one of the highest-priority ready-to-run +threads. This property should not be relied upon for mutual exclusion +or synchronization! (Consider what happens when your code is moved to +a multi-processor). + +For some reason "high priority" is traditionally associated with "low +numerical priority." (... speculation about searching an array from +the beginning to find a ready task ...) That tradition is followed by +these SCHED_QUEUE implementations. Supported priorities range from 0 +to 999. The new function Thread.spawn_prio has an int parameter for +the priority. Threads created by Thread.spawn are given priority 5. + +In the lower layers, thread priorities are passed as int lists. This +is a little unusual! The idea was to have a flexible representation to +accomodate future extensions such as priorities within classes, +fractional-share priorities, etc. The current static-priority +implementations look only at the first element of the list. The +dynamic-priority implementation ignores the priority parameter +entirely. + + +-- Carl Hauser, hauser@eecs.wsu.edu, 5 Aug 2005 \ No newline at end of file diff -Naur --exclude=_MTN base/src/cml/src/core-cml/scheduler/sched-queues-array.sml final/src/cml/src/core-cml/scheduler/sched-queues-array.sml --- base/src/cml/src/core-cml/scheduler/sched-queues-array.sml 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/scheduler/sched-queues-array.sml 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,126 @@ +structure SchedQueuesArray = struct + + structure R = RepTypes + + type priority = int + + exception emptyQueueExn + + val maxPrio = 999 + val defaultPriority = 5 + + type schedElement = (R.thread_id*unit SMLofNJ.Cont.cont) + type queue = schedElement R.queue + type heapElement = queue*priority + + structure Heap = PriorityHeap + + val queueArray : ((R.thread_id * + unit SMLofNJ.Cont.cont) R.queue) option Array.array + = Array.array (maxPrio+1, NONE) + + (* assert: compare is never called with NONE *) + fun compare (e1, e2) = let + val (_, p1) = valOf(e1) + val (_, p2) = valOf(e2) + in + p1 < p2 + end + + val qHeap : heapElement option Heap.priority_queue = + Heap.createN(compare, maxPrio+1, NONE) + + fun reset (running:bool) = ( + Array.modify (fn _ => NONE) queueArray; + Heap.clear(qHeap) + (* if running then Debug.sayDebug "Using Array implementation\n" else ()*) + ) + + fun queueForPriority prio = + case Array.sub(queueArray,prio) + of SOME(q) => q + | NONE => + let + val q = R.Q{front=ref [], rear=ref []} + in + Array.update (queueArray,prio,SOME(q)); + q + end + + fun emptyQueue (R.Q{front=ref [],rear=ref []}) = true + | emptyQueue _ = false + + fun empty () = Heap.isEmpty qHeap + + fun markTid _ = () + + local + fun getEnqFn p = let + val q as R.Q{front,rear} = queueForPriority p + fun enqFn (preempted, element) = (* ignores preemption *) + ( + (* maintain invariant: q is in qHeap iff q is not empty *) + (* if it was empty, it is now non-empty so has to be added to qHeap *) + (case emptyQueue q + of true => Heap.insert qHeap (SOME (q, p)) + | false => () + (*esac*)); + rear := element :: !rear + ) + in + enqFn + end + in + fun getEnqueueFn NONE = getEnqFn defaultPriority + | getEnqueueFn (SOME (R.NUM p)) = getEnqFn p + end + + fun getPriority p = SOME (R.NUM p) + + fun reverse' (x, [], rl) = (x, rl) + | reverse' (x, y::rest, rl) = reverse'(y, rest, x :: rl) + + fun dequeue (R.Q{front = ref [], rear = ref []}) = raise emptyQueueExn + | dequeue (R.Q{front as (ref []), rear as (ref (x::r))}) = let + val (x',r') = reverse'(x,r,[]) + in + front := r'; rear := []; x' + end + | dequeue (R.Q{front as (ref(x::r)), ...}) = (front := r; x) + + fun debugInfo NONE = "NONE" + | debugInfo (SOME (tid,_)) = RepTypes.tidToString tid + fun getNextThread () = let + (* rather than suffer the exception raised by Heap.min, + * check first to see if the heap is empty + *) + val minQ = if Heap.isEmpty qHeap then NONE else Heap.min(qHeap) + val t = + (case minQ + of NONE => NONE + | SOME(q,p) => + SOME(dequeue(q) before + ( + (* maintain invariant that heap holds non-empty queues *) + if emptyQueue q + then ignore (Heap.deleteMin qHeap) + else ()) + ) + (*esac*)) + in + t + end + + val schedName = "Static Priority - array based" + fun getSchedName () = schedName + + val queueingProcs = SchedulerGlueTypes.QP{queuesEmpty=empty, + getEnqueueFn=getEnqueueFn, + getPriority=getPriority, + getNextThread=getNextThread, + reset=reset, + markTid=markTid, + getSchedName=getSchedName + } + +end (* SchedQueues *) \ No newline at end of file diff -Naur --exclude=_MTN base/src/cml/src/core-cml/scheduler/sched-queues-default.sml final/src/cml/src/core-cml/scheduler/sched-queues-default.sml --- base/src/cml/src/core-cml/scheduler/sched-queues-default.sml 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/scheduler/sched-queues-default.sml 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,113 @@ +structure SchedQueuesDefault = struct + (* A scheduler using John Reppy's dynamic-priority algorithm; + * should behave essentially identically to original CML + *) + + structure R = RepTypes + + exception emptyQueueExn + + type schedElement = (R.thread_id*unit SMLofNJ.Cont.cont) + type queue = schedElement R.queue + + val rdyQ1 : queue = Q.queue() + val rdyQ2 : queue = Q.queue() + + fun reset (running:bool) = ( + Q.reset rdyQ1; + Q.reset rdyQ2 + (* if running then Debug.sayDebug "Using Reppy's implementation\n" else ()*) + ) + + fun emptyQueue (R.Q{front=ref [],rear=ref []}) = true + | emptyQueue _ = false + + fun empty () = Q.isEmpty rdyQ1 andalso Q.isEmpty rdyQ2 + + + fun enqueue element = let + val q as R.Q{front,rear} = rdyQ1 + in + rear := element :: !rear + end + + fun promote () = (case (Q.next rdyQ2) + of (SOME x) => enqueue x + | NONE => () + (* end case *)) + + local + fun getEnqFn doneComm = let + val q1 as R.Q{front=front1,rear=rear1} = rdyQ1 + val q2 as R.Q{front=front2,rear=rear2} = rdyQ2 + fun debugIt (preempted, q, element as (tid,_)) = () + (*Debug.sayDebugId (String.concat["Enqueued ", (RepTypes.tidToString tid), + " preempted ", Bool.toString preempted, + " queue ", Int.toString q, " \n"])*) + fun enqFnNotDoneComm (false, element) = (rear1 := element :: !rear1 ; debugIt(false, 1, element)) + | enqFnNotDoneComm (true, element) = (rear2 := element :: !rear2; debugIt(true, 2, element)) + fun enqFnDoneComm (false, element) = (rear1 := element :: ! rear1; debugIt(false, 1, element)) + | enqFnDoneComm (true, element as (R.TID{enqFn,...},_)) = + ( + enqFn := enqFnNotDoneComm; + promote(); + rear1 := element :: ! rear1; + debugIt(true, 1, element) + ) + in + if doneComm then enqFnDoneComm else enqFnNotDoneComm + end + in + val enqFnNotDoneComm = getEnqFn false + val enqFnDoneComm = getEnqFn true + end + + fun getEnqueueFn (_ : RepTypes.priority) = enqFnNotDoneComm + + fun getPriority _ = NONE + + fun markTid (R.TID{enqFn,...}) = enqFn := enqFnDoneComm + + fun reverse' (x, [], rl) = (x, rl) + | reverse' (x, y::rest, rl) = reverse'(y, rest, x :: rl) + + fun dequeue (R.Q{front = ref [], rear = ref []}) = raise emptyQueueExn + | dequeue (R.Q{front as (ref []), rear as (ref (x::r))}) = let + val (x',r') = reverse'(x,r,[]) + in + front := r'; rear := []; x' + end + | dequeue (R.Q{front as (ref(x::r)), ...}) = (front := r; x) + + fun getNextThread () = let + val minQ = if not (emptyQueue rdyQ1) + then SOME rdyQ1 + else if not (emptyQueue rdyQ2) + then SOME rdyQ2 + else NONE + in + (case minQ + of NONE => NONE + (* NONE => (NONE before Debug.sayDebugId "Heap was empty\n") *) + | SOME q => SOME (dequeue q) + (* let val qe (* as (tid,_)*) = dequeue q + in + (* Debug.sayDebugId ("dequeued task " ^ (RepTypes.tidToString tid) + ^ " \n"); *) + SOME(qe) + end *) + (*esac*)) + end + + val schedName = "Original dynamic-priority scheduler" + fun getSchedName () = schedName + + val queueingProcs = SchedulerGlueTypes.QP {queuesEmpty=empty, + getEnqueueFn=getEnqueueFn, + getPriority=getPriority, + getNextThread=getNextThread, + reset=reset, + markTid=markTid, + getSchedName=getSchedName + } +end (* SchedQueues *) \ No newline at end of file diff -Naur --exclude=_MTN base/src/cml/src/core-cml/scheduler/sched-queues-leftist.sml final/src/cml/src/core-cml/scheduler/sched-queues-leftist.sml --- base/src/cml/src/core-cml/scheduler/sched-queues-leftist.sml 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/scheduler/sched-queues-leftist.sml 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,125 @@ +structure SchedQueuesLeftist = struct + + structure R = RepTypes + + type priority = int + + exception emptyQueueExn + + val maxPrio = 999 + val defaultPriority = 5 + + type schedElement = (R.thread_id*unit SMLofNJ.Cont.cont) + type queue = schedElement R.queue + type heapElement = queue*priority + + val queueArray : ((R.thread_id * + unit SMLofNJ.Cont.cont) R.queue) option Array.array + = Array.array (maxPrio+1, NONE) + + fun compare ((_,p1), (_,p2)) = Int.compare(p1,p2) + + structure Priority = struct + type priority = int + fun compare (p1, p2) = Int.compare (p2,p1) + type item = heapElement + fun priority (_,p) = p + end + + structure Heap = LeftPriorityQFn(Priority) + + val qHeap = ref Heap.empty + + fun reset (running:bool) = ( + Array.modify (fn _ => NONE) queueArray; + qHeap := Heap.empty + (* if running then Debug.sayDebug "Using leftist heap\n" else () *) + ) + + fun queueForPriority prio = + case Array.sub(queueArray,prio) + of SOME(q) => q + | NONE => + let + val q = R.Q{front=ref [], rear=ref []} + in + Array.update (queueArray,prio,SOME(q)); + q + end + + fun emptyQueue (R.Q{front=ref [],rear=ref []}) = true + | emptyQueue _ = false + + fun empty () = Heap.isEmpty (!qHeap); + + fun markTid _ = () + + local + fun getEnqFn p = let + val q as R.Q{front,rear} = queueForPriority p + fun enqFn (_, element) = (* ignores preemption *) + ( + (* maintain invariant: q is in qHeap iff q is not empty *) + (* if it was empty, it is now non-empty so has to be added to qHeap *) + (case emptyQueue q + of true => qHeap := Heap.insert ((q, p),(!qHeap)) + | false => () + (*esac*)); + rear := element :: !rear + (* Debug.sayDebugId ("enqueued priority "^ + (Int.toString priority) ^ " task\n")*) + ) + in + enqFn + end + in + fun getEnqueueFn NONE = getEnqFn defaultPriority + | getEnqueueFn (SOME (R.NUM p)) = getEnqFn p + end + + fun getPriority p = SOME (R.NUM p) + + fun reverse' (x, [], rl) = (x, rl) + | reverse' (x, y::rest, rl) = reverse'(y, rest, x :: rl) + + fun dequeue (R.Q{front = ref [], rear = ref []}) = raise emptyQueueExn + | dequeue (R.Q{front as (ref []), rear as (ref (x::r))}) = let + val (x',r') = reverse'(x,r,[]) + in + front := r'; rear := []; x' + end + | dequeue (R.Q{front as (ref(x::r)), ...}) = (front := r; x) + + fun getNextThread () = let + val next = Heap.next(!qHeap) (* functional! doesn't change !qHeap *) + in + (case next + of NONE => NONE + (* of NONE => (NONE before Debug.sayDebugId "Heap was empty\n") *) + | SOME((q,p),newHeap) => let (* ignore the new heap *) + val next as (nextTid,_) = dequeue(q) + in + (*Debug.sayDebugId ("dequeued task priority " ^ (Int.toString p) ^ + R.tidToString(nextTid) ^ "\n"); *) + (* maintain invariant that heap holds non-empty queues *) + if emptyQueue q + then qHeap := newHeap + else (); + SOME(next) + end + (*esac*)) + end + + val schedName = "Static Priority - Leftist heap based" + fun getSchedName () = schedName + + val queueingProcs = SchedulerGlueTypes.QP{queuesEmpty=empty, + getEnqueueFn=getEnqueueFn, + getPriority=getPriority, + getNextThread=getNextThread, + reset=reset, + markTid=markTid, + getSchedName=getSchedName + } + +end (* SchedQueues *) \ No newline at end of file diff -Naur --exclude=_MTN base/src/cml/src/core-cml/scheduler/sched-queues-sig.sml final/src/cml/src/core-cml/scheduler/sched-queues-sig.sml --- base/src/cml/src/core-cml/scheduler/sched-queues-sig.sml 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/scheduler/sched-queues-sig.sml 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,9 @@ +signature SCHED_QUEUES = sig + val empty : unit -> bool + val markTid : RepTypes.thread_id -> unit + val getEnqueueFn : RepTypes.priority -> ((bool*SchedulerGlueTypes.schedElement) -> unit) + val getPriority : int -> RepTypes.priority + val getNextThread : unit -> SchedulerGlueTypes.schedElement option + val reset : bool -> unit + val getSchedName : unit -> string +end \ No newline at end of file diff -Naur --exclude=_MTN base/src/cml/src/core-cml/scheduler/sched-queues-sim.sml final/src/cml/src/core-cml/scheduler/sched-queues-sim.sml --- base/src/cml/src/core-cml/scheduler/sched-queues-sim.sml 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/scheduler/sched-queues-sim.sml 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,136 @@ +structure SchedQueuesSim = struct + (* A scheduler using John Reppy's dynamic-priority algorithm; + * should behave essentially identically to original CML + *) + + structure R = RepTypes + + exception emptyQueueExn + + type schedElement = (R.thread_id*unit SMLofNJ.Cont.cont) + type queue = schedElement R.queue + + val rdyQ1 : queue = Q.queue() + val rdyQ2 : queue = Q.queue() + val rdyQsim : queue = Q.queue() + + fun reset (running:bool) = ( + Q.reset rdyQ1; + Q.reset rdyQ2; + Q.reset rdyQsim + (* if running then Debug.sayDebug "Using Reppy's implementation\n" else ()*) + ) + + fun emptyQueue (R.Q{front=ref [],rear=ref []}) = true + | emptyQueue _ = false + + fun allSimsBlocked() = emptyQueue rdyQsim; + fun empty () = Q.isEmpty rdyQ1 andalso Q.isEmpty rdyQ2 andalso Q.isEmpty rdyQsim + + + fun enqueue element = let + val q as R.Q{front,rear} = rdyQ1 + in + rear := element :: !rear + end + + fun promote () = (case (Q.next rdyQ2) + of (SOME x) => enqueue x + | NONE => () + (* end case *)) + + local + fun getEnqFn doneComm = let + val q1 as R.Q{front=front1,rear=rear1} = rdyQ1 + val q2 as R.Q{front=front2,rear=rear2} = rdyQ2 + fun debugIt (preempted, q, element as (tid as R.TID{priority,...},_)) = () + (* if (priority=(SOME SIM)) + then Debug.sayDebugId "Sim thread on wrong queue\n" + else Debug.sayDebugId "Non-sim thread on correct queue\n" *) + (* Debug.sayDebugId (String.concat["Enqueued ", (RepTypes.tidToString tid), + " preempted ", Bool.toString preempted, + " queue ", Int.toString q, " \n"])*) + + fun enqFnNotDoneComm (false, element) = (rear1 := element :: !rear1 ; debugIt(false, 1, element)) + | enqFnNotDoneComm (true, element) = (rear2 := element :: !rear2; debugIt(true, 2, element)) + fun enqFnDoneComm (false, element) = (rear1 := element :: ! rear1; debugIt(false, 1, element)) + | enqFnDoneComm (true, element as (R.TID{enqFn,...},_)) = + ( + enqFn := enqFnNotDoneComm; + promote(); + rear1 := element :: ! rear1; + debugIt(true, 1, element) + ) + in + if doneComm then enqFnDoneComm else enqFnNotDoneComm + end + in + val enqFnNotDoneComm = getEnqFn false + val enqFnDoneComm = getEnqFn true + end + + fun enqFnSim (preempted,element) = let + val R.Q{front, rear} = rdyQsim + in + (* front := element :: !front *) + (* rear := element :: !rear *) + if preempted then front := element :: !front else rear := element :: !rear + end + + + + fun getEnqueueFn (SOME R.SIM) = enqFnSim + | getEnqueueFn (_) = enqFnNotDoneComm + + (* externally, 0 indicates sim threads, anything else normal thread *) + fun getPriority 0 = (SOME R.SIM) + | getPriority _ = NONE + + (* ignore marking for sim threads *) + fun markTid (R.TID{enqFn,priority,...}) = if (priority=(SOME R.SIM)) then () else enqFn := enqFnDoneComm + + fun reverse' (x, [], rl) = (x, rl) + | reverse' (x, y::rest, rl) = reverse'(y, rest, x :: rl) + + fun dequeue (R.Q{front = ref [], rear = ref []}) = raise emptyQueueExn + | dequeue (R.Q{front as (ref []), rear as (ref (x::r))}) = let + val (x',r') = reverse'(x,r,[]) + in + front := r'; rear := []; x' + end + | dequeue (R.Q{front as (ref(x::r)), ...}) = (front := r; x) + + fun getNextThread () = let + val minQ = if not (emptyQueue rdyQ1) + then SOME rdyQ1 + else if not (emptyQueue rdyQ2) + then SOME rdyQ2 + else if not (emptyQueue rdyQsim) + then SOME rdyQsim + else NONE + in + (case minQ + of NONE => NONE + (* NONE => (NONE before Debug.sayDebugId "Heap was empty\n") *) + | SOME q => SOME (dequeue q) + (* let val qe (* as (tid,_)*) = dequeue q + in + (* Debug.sayDebugId ("dequeued task " ^ (RepTypes.tidToString tid) + ^ " \n"); *) + SOME(qe) + end *) + (*esac*)) + end + + val schedName = "Simulated time scheduler" + fun getSchedName () = schedName + + val queueingProcs = SchedulerGlueTypes.QP {queuesEmpty=empty, + getEnqueueFn=getEnqueueFn, + getPriority=getPriority, + getNextThread=getNextThread, + reset=reset, + markTid=markTid, + getSchedName=getSchedName + } +end (* SchedQueues *) \ No newline at end of file diff -Naur --exclude=_MTN base/src/cml/src/core-cml/scheduler/scheduler.cm final/src/cml/src/core-cml/scheduler/scheduler.cm --- base/src/cml/src/core-cml/scheduler/scheduler.cm 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/scheduler/scheduler.cm 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,30 @@ +(* + * Simulation scheduler for CML + *) + +Group + structure Scheduler + structure SchedulerGlueControl + structure SchedQueuesSim + structure RepTypes + structure Q + structure Debug +is + $/basis.cm + ../cml-util.cm + + (* always needed *) + scheduler.sml + sched-queues-sig.sml + scheduler-glue-control-sig.sml + scheduler-glue-types.sml + scheduler-glue-sig.sml + + + (* always needed for scheduler configured at compile-time *) + scheduler-glue-static-fn.sml + scheduler-glue-control-static-fn.sml + (* Statically configured simulation scheduler *) + scheduler-glue-static-sim.sml + sched-queues-sim.sml + diff -Naur --exclude=_MTN base/src/cml/src/core-cml/scheduler/scheduler-glue-control-sig.sml final/src/cml/src/core-cml/scheduler/scheduler-glue-control-sig.sml --- base/src/cml/src/core-cml/scheduler/scheduler-glue-control-sig.sml 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/scheduler/scheduler-glue-control-sig.sml 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,10 @@ +signature SCHEDULER_GLUE_CONTROL = sig + + (* Bind to a different set of queueing primitives *) + type queueingProcs + val changeScheduler : queueingProcs -> unit + val useReppyQueues : unit -> unit + val usePriorityQueues : unit -> unit + val useLeftistQueues : unit -> unit + val getSchedName : unit -> string +end \ No newline at end of file diff -Naur --exclude=_MTN base/src/cml/src/core-cml/scheduler/scheduler-glue-control.sml final/src/cml/src/core-cml/scheduler/scheduler-glue-control.sml --- base/src/cml/src/core-cml/scheduler/scheduler-glue-control.sml 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/scheduler/scheduler-glue-control.sml 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,20 @@ +(* + * SchedulerGlueControl is the interface used by applications to + * change which scheduler is in use. ONLY DO THIS WHEN CML IS NOT RUNNING! + * Currently, the available schedulers are listed and clients choose + * between them; this should be changed so all the appropriate stuff is + * visible outside CML so clients can impose arbitrary schedulers. + *) + +structure SchedulerGlueControl : SCHEDULER_GLUE_CONTROL = struct + type queueingProcs = SchedulerGlueTypes.queueingProcs + val changeScheduler = SchedulerGlueDynamic.changeScheduler + structure RQ = SchedQueuesDefault + structure AQ = SchedQueuesArray + structure LQ = SchedQueuesLeftist + fun useReppyQueues () = changeScheduler RQ.queueingProcs + fun usePriorityQueues () = changeScheduler AQ.queueingProcs + fun useLeftistQueues () = changeScheduler LQ.queueingProcs + val getSchedName = SchedulerGlueDynamic.getSchedName + + end diff -Naur --exclude=_MTN base/src/cml/src/core-cml/scheduler/scheduler-glue-control-static-fn.sml final/src/cml/src/core-cml/scheduler/scheduler-glue-control-static-fn.sml --- base/src/cml/src/core-cml/scheduler/scheduler-glue-control-static-fn.sml 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/scheduler/scheduler-glue-control-static-fn.sml 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,14 @@ +(* + * SchedulerGlueControl stub for use with statically configured scheduler + *) + +functor SchedulerGlueControlStaticFn (SQ: SCHED_QUEUES) : SCHEDULER_GLUE_CONTROL = struct + type queueingProcs = SchedulerGlueTypes.queueingProcs + exception NotDynamicallyConfigured + fun changeScheduler _ = raise NotDynamicallyConfigured + fun useReppyQueues () = raise NotDynamicallyConfigured + fun usePriorityQueues () = raise NotDynamicallyConfigured + fun useLeftistQueues () = raise NotDynamicallyConfigured + val getSchedName = SQ.getSchedName + + end diff -Naur --exclude=_MTN base/src/cml/src/core-cml/scheduler/scheduler-glue-dynamic.sml final/src/cml/src/core-cml/scheduler/scheduler-glue-dynamic.sml --- base/src/cml/src/core-cml/scheduler/scheduler-glue-dynamic.sml 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/scheduler/scheduler-glue-dynamic.sml 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,48 @@ +structure SchedulerGlueDynamic = struct + structure SQ = SchedQueuesDefault (* the default strict priority scheduler *) + + val currentQueuesEmpty = ref SQ.empty + fun queuesEmpty () = !currentQueuesEmpty () + + val currentMarkTid = ref SQ.markTid + fun markTid t = !currentMarkTid t + + (* enqueue a ready thread *) + val currentGetEnqueueFn = ref SQ.getEnqueueFn + fun getEnqueueFn (p: RepTypes.priority) = !currentGetEnqueueFn p + + val currentGetPriority : (int -> RepTypes.priority) ref = ref SQ.getPriority + fun getPriority p = !currentGetPriority p + + fun enqueue (preempted, element as (RepTypes.TID{enqFn,...},_)) = + (!enqFn) (preempted, element) + + (* get thread next to run according to current policy *) + val currentGetNextThread = ref SQ.getNextThread + + fun getNextThread () = !currentGetNextThread () + + (* reset the queue states *) + val currentReset = ref SQ.reset + fun reset running = (!currentReset) running + + (* name of the current scheduler *) + val currentGetSchedName = ref SQ.getSchedName + fun getSchedName () = ! currentGetSchedName () + + fun changeScheduler (SchedulerGlueTypes.QP{ + queuesEmpty, getEnqueueFn, + getNextThread, getPriority, + reset, markTid, getSchedName}) = + ( + currentQueuesEmpty := queuesEmpty; + currentGetEnqueueFn := getEnqueueFn; + currentGetNextThread := getNextThread; + currentGetPriority := getPriority; + currentReset := reset; + currentMarkTid := markTid; + currentGetSchedName := getSchedName + ) +end + + diff -Naur --exclude=_MTN base/src/cml/src/core-cml/scheduler/scheduler-glue-sig.sml final/src/cml/src/core-cml/scheduler/scheduler-glue-sig.sml --- base/src/cml/src/core-cml/scheduler/scheduler-glue-sig.sml 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/scheduler/scheduler-glue-sig.sml 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,31 @@ +signature SCHEDULER_GLUE = sig + + (* queuesEmpty -- true if no ready threads *) + val queuesEmpty : unit -> bool + + (* idea: an enqueue function is specialized for the priority information + * contained in the int list according to rules defined by the + * scheduler in use. The returned function is stored in the + * TID record; this can be used to pre-bind the thread to + * a queue at a particular priority, for example. + *) + val getEnqueueFn : RepTypes.priority -> RepTypes.enqueueFn + + (* enqueue : the first parameter, if true, indicates that the thread was preempted; + * idea is that this will ultimately call the enqueue function in the schedElement + *) + + val getPriority : int -> RepTypes.priority + val enqueue : (bool*SchedulerGlueTypes.schedElement) -> unit + + (* Get next thread to run, according to current policy *) + val getNextThread : unit -> SchedulerGlueTypes.schedElement option + + val reset : bool -> unit + + (* Marking is used in the Reppy scheduler in determining whether + * or not a thread is "interactive" + *) + val markTid : RepTypes.thread_id -> unit + +end \ No newline at end of file diff -Naur --exclude=_MTN base/src/cml/src/core-cml/scheduler/scheduler-glue.sml final/src/cml/src/core-cml/scheduler/scheduler-glue.sml --- base/src/cml/src/core-cml/scheduler/scheduler-glue.sml 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/scheduler/scheduler-glue.sml 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,9 @@ +(* + * SchedulerGlue is the interface used by the scheduler to invoke lower-level + * operations. It does not include the changeScheduler function + *) +structure SchedulerGlue : SCHEDULER_GLUE = SchedulerGlueDynamic + + + + diff -Naur --exclude=_MTN base/src/cml/src/core-cml/scheduler/scheduler-glue-static-array.sml final/src/cml/src/core-cml/scheduler/scheduler-glue-static-array.sml --- base/src/cml/src/core-cml/scheduler/scheduler-glue-static-array.sml 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/scheduler/scheduler-glue-static-array.sml 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,2 @@ +structure SchedulerGlue : SCHEDULER_GLUE = SchedulerGlueStaticFn(SchedQueuesArray) +structure SchedulerGlueControl : SCHEDULER_GLUE_CONTROL = SchedulerGlueControlStaticFn(SchedQueuesArray) \ No newline at end of file diff -Naur --exclude=_MTN base/src/cml/src/core-cml/scheduler/scheduler-glue-static-default.sml final/src/cml/src/core-cml/scheduler/scheduler-glue-static-default.sml --- base/src/cml/src/core-cml/scheduler/scheduler-glue-static-default.sml 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/scheduler/scheduler-glue-static-default.sml 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,2 @@ +structure SchedulerGlue : SCHEDULER_GLUE = SchedulerGlueStaticFn(SchedQueuesDefault) +structure SchedulerGlueControl : SCHEDULER_GLUE_CONTROL = SchedulerGlueControlStaticFn(SchedQueuesDefault) \ No newline at end of file diff -Naur --exclude=_MTN base/src/cml/src/core-cml/scheduler/scheduler-glue-static-fn.sml final/src/cml/src/core-cml/scheduler/scheduler-glue-static-fn.sml --- base/src/cml/src/core-cml/scheduler/scheduler-glue-static-fn.sml 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/scheduler/scheduler-glue-static-fn.sml 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,25 @@ +functor SchedulerGlueStaticFn(SQ : SCHED_QUEUES) : SCHEDULER_GLUE = struct + + fun queuesEmpty () = SQ.empty () + + fun markTid t = SQ.markTid t + + (* enqueue a ready thread *) + fun getEnqueueFn l = SQ.getEnqueueFn l + + fun getPriority p = SQ.getPriority p + + fun enqueue (preempted, element as (RepTypes.TID{enqFn,...},_)) = + (!enqFn) (preempted, element) + + (* get thread next to run according to current policy *) + fun getNextThread () = SQ.getNextThread () + + (* reset the queue states *) + fun reset running = SQ.reset running + + (* name of the current scheduler *) + fun getSchedName () = SQ.getSchedName +end + + diff -Naur --exclude=_MTN base/src/cml/src/core-cml/scheduler/scheduler-glue-static-leftist.sml final/src/cml/src/core-cml/scheduler/scheduler-glue-static-leftist.sml --- base/src/cml/src/core-cml/scheduler/scheduler-glue-static-leftist.sml 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/scheduler/scheduler-glue-static-leftist.sml 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,2 @@ +structure SchedulerGlue : SCHEDULER_GLUE = SchedulerGlueStaticFn(SchedQueuesLeftist) +structure SchedulerGlueControl : SCHEDULER_GLUE_CONTROL = SchedulerGlueControlStaticFn(SchedQueuesLeftist) \ No newline at end of file diff -Naur --exclude=_MTN base/src/cml/src/core-cml/scheduler/scheduler-glue-static-sim.sml final/src/cml/src/core-cml/scheduler/scheduler-glue-static-sim.sml --- base/src/cml/src/core-cml/scheduler/scheduler-glue-static-sim.sml 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/scheduler/scheduler-glue-static-sim.sml 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,2 @@ +structure SchedulerGlue : SCHEDULER_GLUE = SchedulerGlueStaticFn(SchedQueuesSim) +structure SchedulerGlueControl : SCHEDULER_GLUE_CONTROL = SchedulerGlueControlStaticFn(SchedQueuesSim) \ No newline at end of file diff -Naur --exclude=_MTN base/src/cml/src/core-cml/scheduler/scheduler-glue-types.sml final/src/cml/src/core-cml/scheduler/scheduler-glue-types.sml --- base/src/cml/src/core-cml/scheduler/scheduler-glue-types.sml 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/scheduler/scheduler-glue-types.sml 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,12 @@ +structure SchedulerGlueTypes = struct + type schedElement = RepTypes.sched_element + datatype queueingProcs = QP of { queuesEmpty: unit->bool, + getEnqueueFn: RepTypes.priority -> ((bool*schedElement) -> unit), + getPriority: int -> RepTypes.priority, + getNextThread: unit -> schedElement option, + reset: bool->unit, + markTid : RepTypes.thread_id -> unit, + getSchedName : unit -> string + } +end + diff -Naur --exclude=_MTN base/src/cml/src/core-cml/scheduler/scheduler.sml final/src/cml/src/core-cml/scheduler/scheduler.sml --- base/src/cml/src/core-cml/scheduler/scheduler.sml 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/scheduler/scheduler.sml 2006-10-20 13:42:25.000000000 -0700 @@ -0,0 +1,358 @@ +(* scheduler.sml + * + * COPYRIGHT (c) 1995 AT&T Bell Laboratories. + * COPYRIGHT (c) 1989-1991 John H. Reppy + * + * This module implements the scheduling queues and preemption + * mechanisms. + * This version has priority scheduling changes + * by Carl Hauser, hauser@eecs.wsu.edu + *) + +structure Scheduler : sig + + type thread_id + type 'a cont = 'a SMLofNJ.Cont.cont + + val getCurThread : unit -> thread_id + val setCurThread : thread_id -> unit + + (* markAndEnqueue marks the thread as having voluntarily ceded the CPU *) + val markAndEnqueue : (thread_id * unit cont) -> unit + + (* simpleEnqueue does not mark the thread *) + val simpleEnqueue : (thread_id * unit cont) -> unit + + (* readyThreads -- true if there are any ready threads *) + val readyThreads : unit -> bool + + val getEnqueueFn : RepTypes.priority -> RepTypes.enqueueFn + + val getPriority : int ->RepTypes.priority + + val enqueueAndSwitchCurThread : (unit cont * thread_id) -> unit + (* enqueue the given continuation with the current thread ID, and make + * the given thread ID be the current one. + *) + + val enqueueTmpThread : (unit -> unit) -> unit + (* create a temporary thread (with dummy ID) to run the given + * function and then exit. The thread is placed on the front + * of the scheduling queue. + *) + + datatype atomic_state = NonAtomic | Atomic | SignalPending + + val atomicState : atomic_state ref + + val atomicBegin : unit -> unit + val atomicEnd : unit -> unit + (* enter/leave an atomic region; note that these do not nest *) + + val atomicDispatch : unit -> 'a + (* leave the atomic region and dispatch the next thread *) + + val dispatch : unit -> 'a + (* dispatch the next thread; this should NOT be called while in + * an atomic region. Use atomicDispatch() for that case. + *) + + val atomicSwitchTo : (thread_id * 'a cont * 'a) -> unit + (* switch to the given thread, while leaving the atomic region *) + + val atomicYield : unit cont -> 'a + (* Yield control to the next thread, while leaving the atomic + * region. + *) + + val schedulerHook : unit cont ref + (* this hook points to a continuation that gets dispatched when + * a preemption is received, or when a thread exits an atomic + * region and there is a signal pending. It is invoked after + * leaving the atomic region. + *) + + val pauseHook : unit cont ref + (* this hook points to a continuation that gets invoked when + * when the scheduler has nothing else to do. + *) + + val shutdownHook : (bool * OS.Process.status) cont ref + (* this hook points to a continuation that gets invoked when + * the system is otherwise deadlocked. It takes two arguments: + * the first is a boolean flag that says weather to do clean-up, + * and the second is the exit status. + *) + + val getTime : unit -> Time.time + (* returns an approximation of the current time of day (this is at + * least as accurate as the time quantum. + *) + + val reset : bool -> unit + + (* control over the preemptive timer *) + val startTimer : Time.time -> unit + val stopTimer : unit -> unit + val restartTimer : unit -> unit + + end = struct + + structure R = RepTypes + structure Sig = Signals + structure SG = SchedulerGlue + + type 'a cont = 'a SMLofNJ.Cont.cont + val callcc = SMLofNJ.Cont.callcc + val throw = SMLofNJ.Cont.throw + + type thread_id = R.thread_id + + (* the current thread is represented using the "var" register *) + val getCurThread : unit -> thread_id = Unsafe.getVar + val setCurThread : thread_id -> unit = Unsafe.setVar + + + + (* The scheduler defines three continuation "hooks": + * schedulerHook -- this points to a continuation that gets dispatched + * when a thread attempts to exit an atomic region and + * there is a signal pending. It is invoked after + * leaving the atomic region. + * pauseHook -- this points to a continuation that gets invoked when + * there is nothing else to do. + * shutdownHook -- this points to a continuation that gets invoked when + * the system is deadlocked, or when RunCML.shutdown + * is called. It takes two arguments: the first is a + * boolean flag that says weather to do clean-up, and + * the second is the exit status. + * The hooks are initialized during CML startup in an OS-specific way. + * When CML is not running they are set to a bogus hook that should + * never be called. + *) + fun bogus _ = (Debug.sayDebugId "Bo..o..oo..gus!\n"; raise Fail "should never see this ") + val bogusHook : unit cont = SMLofNJ.Cont.isolate bogus + val bogusShutdownHook : (bool * OS.Process.status) cont = + SMLofNJ.Cont.isolate bogus + val schedulerHook = ref bogusHook + val pauseHook = ref bogusHook + val shutdownHook = ref bogusShutdownHook + + (* Priorities run from 0..maxPrio. Threads with smaller numerical priorities + * run in preference to those with larger numerical priorities. + *) + val maxPrio = 999 + + val getEnqueueFn = SG.getEnqueueFn + + val getPriority = SG.getPriority + + (* the dummy thread Id; this is used when an ID is needed to get + * the types right + *) + val dummyTid as (R.TID{enqFn=dummyEnqFn,...}) = R.TID{ + id = ~1, alert = ref false, done_comm = ref false, + exnHandler = ref(fn _ => ()), + priority = NONE, + props = ref[], + dead = R.CVAR(ref(R.CVAR_unset[])), + enqFn = ref (getEnqueueFn NONE) + } + (* the error thread. This thread is used to trap attempts to run CML + * without proper initialization (i.e., via RunCML). This thread is + * enqueued by reset. + *) + val errorTid as (R.TID{enqFn=errorEnqFn,...}) = R.TID{ + id = ~2, alert = ref false, done_comm = ref false, + exnHandler = ref(fn _ => ()), + priority=NONE, + props = ref[], + dead = R.CVAR(ref(R.CVAR_unset[])), + enqFn = ref (getEnqueueFn NONE) + } + val errorCont : unit cont = SMLofNJ.Cont.isolate (fn _ => ( + Debug.sayDebug "**** Use RunCML.doit to run CML ****\n"; + raise Fail "CML not initialized")) + + (* thread id marking: marking is used by the original CML scheduler to + * implement a dynamic, non-strict priority scheme that favors + * interactive threads. *) + fun markTid t = SG.markTid t + + fun readyThreads () = not (SG.queuesEmpty()) + + + + (* enqueue ready threads *) + fun markAndEnqueue (p as (id, _)) = + ( + markTid id; + SG.enqueue (false, p) + ) + + fun simpleEnqueue t = SG.enqueue (false, t) + + (* enqueue the current thread, and make the given thread ID be the current + * one. + *) + fun enqueueAndSwitchCurThread (resume, tid) = ( + markAndEnqueue(getCurThread(), resume); + setCurThread tid) + + (* dequeue a thread from the heap *) + fun dequeue1 () = (case (SG.getNextThread ()) + of NONE => (dummyTid, !pauseHook) + | SOME(t) => t + ) + + + (* global flag for implementing atomic operations *) + datatype atomic_state = NonAtomic | Atomic | SignalPending + val atomicState = ref NonAtomic + + (* Note, the first thing the scheduler hook does is an atomicBegin, so we don't + * need to clear the atomic state here. + *) + fun dispatchSchedulerHook () = throw (!schedulerHook) () + + fun atomicBegin () = atomicState := Atomic + + (* leave an atomic region. + * NOTE: there is a race condition between the test of the atomicState + * flag and the setting of it to NonAtomic, but this is not a problem in + * practice, because there are no GC tests between these (and thus no + * preemption). + *) + fun atomicEnd () = (case !atomicState + of SignalPending => callcc (fn k => ( + SG.enqueue(false, (getCurThread(), k)); + dispatchSchedulerHook())) + | _ => atomicState := NonAtomic + (* end case *)) + + fun atomicDispatch () = (case !atomicState + of SignalPending => dispatchSchedulerHook() + | _ => let + val ((id, kont)) = dequeue1() + in + setCurThread id; + atomicState := NonAtomic; + throw kont () + end + (* end case *)) + + fun dispatch () = (atomicBegin(); atomicDispatch ()) + + fun atomicSwitchTo (tid, k, x) = + callcc (fn curK => ( + case !atomicState + of SignalPending => + callcc (fn k' => ( + SG.enqueue(false,(tid, k')); + SG.enqueue(false, (getCurThread(), curK)); + dispatchSchedulerHook())) + | _ => ( + enqueueAndSwitchCurThread (curK, tid); + atomicState := NonAtomic) + (* end case *); + throw k x)) + + (* Yield control to the next thread, while leaving the atomic region. *) + fun atomicYield k = ( + markAndEnqueue(getCurThread(), k); + atomicDispatch ()) + + (* create a temporary thread (with dummy ID) to run the given + * function and then exit. The thread is placed on the front + * of the scheduling queue. + *) + + fun enqueueTmpThread f = let +(** this should be, but the overhead is too high right now. ** + val kont = SMLofNJ.Cont.isolate f +**) + val kont = callcc (fn k => ( + callcc (fn k' => throw k k'); + f () handle _ => (); + dispatch ())) + in + ( + (*Debug.sayDebugId "enqueueing a temp thread\n";*) + SG.enqueue (false,(dummyTid, kont)) + ) + end + + val defaultHook : unit cont = SMLofNJ.Cont.isolate dispatch + + (* this holds an approximation of the current time of day. It is + * cleared at each pre-emption, and initialized on demand (by getTime). + *) + val clock = ref(SOME Time.zeroTime) + + (* returns an approximation of the current time of day (this is at + * least as accurate as the time quantum). + *) + fun getTime () = (case !clock + of NONE => let val t = Time.now() + in + clock := SOME t; t + end + | (SOME t) => t + (* end case *)) + + (* preempt the current thread (with continuation k). *) + (* strict priority scheduler loses the notion of promotion *) + fun preempt k = let + val curTid = getCurThread() + val curP = (curTid, k) + in + SG.enqueue (true, curP) + end + + (* the preemption handler *) + fun alrmHandler (_, _, k) = ( + clock := NONE; + case !atomicState + of NonAtomic => (preempt k; !schedulerHook) + | Atomic => (atomicState := SignalPending; k) + | _ => k + (* end case *)) + + (* time quanta *) + val defaultTimeQ = Time.fromMilliseconds 20 + val timeQ = ref defaultTimeQ + + structure IT = SMLofNJ.IntervalTimer + + fun startTimer tq = let + val tq = if Time.<(Time.zeroTime, tq) then tq else defaultTimeQ + in + timeQ := tq; + ignore (Sig.setHandler (Sig.sigALRM, Sig.HANDLER alrmHandler)); + ignore (IT.setIntTimer (SOME tq)) + end + + fun stopTimer () = ( + ignore (IT.setIntTimer NONE); + ignore (Sig.setHandler (Sig.sigALRM, Sig.IGNORE))) + + fun restartTimer () = startTimer (!timeQ) + + (* reset various pieces of state *) + fun reset running = ( + setCurThread dummyTid; + pauseHook := bogusHook; + shutdownHook := bogusShutdownHook; + schedulerHook := defaultHook; + clock := NONE; + SG.reset running; + (* change the enqFns of the static threads to match the + current scheduler *) + errorEnqFn := getEnqueueFn NONE; + dummyEnqFn := getEnqueueFn NONE; + if (not running) then markAndEnqueue(errorTid, errorCont) else ()) + + val _ = reset false + + end + diff -Naur --exclude=_MTN base/src/cml/src/core-cml/scheduler.sml final/src/cml/src/core-cml/scheduler.sml --- base/src/cml/src/core-cml/scheduler.sml 2006-10-20 13:42:14.000000000 -0700 +++ final/src/cml/src/core-cml/scheduler.sml 1969-12-31 16:00:00.000000000 -0800 @@ -1,359 +0,0 @@ -(* scheduler.sml - * - * COPYRIGHT (c) 1995 AT&T Bell Laboratories. - * COPYRIGHT (c) 1989-1991 John H. Reppy - * - * This module implements the scheduling queues and preemption - * mechanisms. - *) - -structure Scheduler : sig - - type thread_id - type 'a cont = 'a SMLofNJ.Cont.cont - - val rdyQ1 : (thread_id * unit cont) RepTypes.queue - - val getCurThread : unit -> thread_id - val setCurThread : thread_id -> unit - - val enqueueThread : (thread_id * unit cont) -> unit - - val enqueueAndSwitchCurThread : (unit cont * thread_id) -> unit - (* enqueue the given continuation with the current thread ID, and make - * the given thread ID be the current one. - *) - - val enqueueTmpThread : (unit -> unit) -> unit - (* create a temporary thread (with dummy ID) to run the given - * function and then exit. The thread is placed on the front - * of the scheduling queue. - *) - - datatype atomic_state = NonAtomic | Atomic | SignalPending - - val atomicState : atomic_state ref - - val atomicBegin : unit -> unit - val atomicEnd : unit -> unit - (* enter/leave an atomic region; note that these do not nest *) - - val atomicDispatch : unit -> 'a - (* leave the atomic region and dispatch the next thread *) - - val dispatch : unit -> 'a - (* dispatch the next thread; this should NOT be called while in - * an atomic region. Use atomicDispatch() for that case. - *) - - val atomicSwitchTo : (thread_id * 'a cont * 'a) -> unit - (* switch to the given thread, while leaving the atomic region *) - - val atomicYield : unit cont -> 'a - (* Yield control to the next thread, while leaving the atomic - * region. - *) - - val schedulerHook : unit cont ref - (* this hook points to a continuation that gets dispatched when - * a preemption is received, or when a thread exits an atomic - * region and there is a signal pending. It is invoked after - * leaving the atomic region. - *) - - val pauseHook : unit cont ref - (* this hook points to a continuation that gets invoked when - * when the scheduler has nothing else to do. - *) - - val shutdownHook : (bool * OS.Process.status) cont ref - (* this hook points to a continuation that gets invoked when - * the system is otherwise deadlocked. It takes two arguments: - * the first is a boolean flag that says weather to do clean-up, - * and the second is the exit status. - *) - - val getTime : unit -> Time.time - (* returns an approximation of the current time of day (this is at - * least as accurate as the time quantum. - *) - - val reset : bool -> unit - - (* control over the preemptive timer *) - val startTimer : Time.time -> unit - val stopTimer : unit -> unit - val restartTimer : unit -> unit - - end = struct - - structure R = RepTypes - structure Sig = Signals - - type 'a cont = 'a SMLofNJ.Cont.cont - val callcc = SMLofNJ.Cont.callcc - val throw = SMLofNJ.Cont.throw - - (* some utility functions that should be inlined *) - fun reverse ([], rl) = rl - | reverse (x :: rest, rl) = reverse(rest, x :: rl) - - type thread_id = R.thread_id - - (* the current thread is represented using the "var" register *) - val getCurThread : unit -> thread_id = Unsafe.getVar - val setCurThread : thread_id -> unit = Unsafe.setVar - - (* The scheduler defines three continuation "hooks": - * schedulerHook -- this points to a continuation that gets dispatched - * when a thread attempts to exit an atomic region and - * there is a signal pending. It is invoked after - * leaving the atomic region. - * pauseHook -- this points to a continuation that gets invoked when - * there is nothing else to do. - * shutdownHook -- this points to a continuation that gets invoked when - * the system is deadlocked, or when RunCML.shutdown - * is called. It takes two arguments: the first is a - * boolean flag that says weather to do clean-up, and - * the second is the exit status. - *) - fun bogus _ = raise Fail "should never see this " - val bogusHook : unit cont = SMLofNJ.Cont.isolate bogus - val bogusShutdownHook : (bool * OS.Process.status) cont = - SMLofNJ.Cont.isolate bogus - val schedulerHook = ref bogusHook - val pauseHook = ref bogusHook - val shutdownHook = ref bogusShutdownHook - - (* the dummy thread Id; this is used when an ID is needed to get - * the types right - *) - val dummyTid = R.TID{ - id = ~1, alert = ref false, done_comm = ref false, - exnHandler = ref(fn _ => ()), - props = ref[], - dead = R.CVAR(ref(R.CVAR_unset[])) - } - (* the error thread. This thread is used to trap attempts to run CML - * without proper initialization (i.e., via RunCML). This thread is - * enqueued by reset. - *) - val errorTid = R.TID{ - id = ~2, alert = ref false, done_comm = ref false, - exnHandler = ref(fn _ => ()), - props = ref[], - dead = R.CVAR(ref(R.CVAR_unset[])) - } - val errorCont : unit cont = SMLofNJ.Cont.isolate (fn _ => ( - Debug.sayDebug "**** Use RunCML.doit to run CML ****\n"; - raise Fail "CML not initialized")) - - (* thread id marking *) - fun markTid (R.TID{done_comm, ...}) = done_comm := true - fun unmarkTid (R.TID{done_comm, ...}) = done_comm := false - fun isMarked (R.TID{done_comm, ...}) = !done_comm - - (* The thread ready queues: - * rdyQ1 is the primary queue and rdyQ2 is the secondary queue. - *) - val (rdyQ1 as R.Q{rear=rear1, ...}) : (R.thread_id * unit cont) Q.queue = - Q.queue() - val rdyQ2 : (R.thread_id * unit cont) Q.queue = Q.queue() - - (* enqueue a ready thread *) - fun enqueue p = (rear1 := p :: !rear1) - fun markAndEnqueue (p as (id, _)) = (markTid id; rear1 := p :: !rear1) - - val enqueueThread = markAndEnqueue - - (* enqueue the current thread, and make the given thread ID be the current - * one. - *) - fun enqueueAndSwitchCurThread (resume, tid) = ( - markAndEnqueue(getCurThread(), resume); - setCurThread tid) - - (* dequeue a thread from the primary queue *) - fun dequeue1 () = (case rdyQ1 - of (R.Q{front = ref [], rear = ref []}) => dequeue2() - | (R.Q{front as (ref []), rear as (ref l)}) => let - val (x::r) = reverse(l, []) - in - front := r; rear := []; x - end - | (R.Q{front as (ref(x::r)), ...}) => (front := r; x) - (* end case *)) - - (* remove a thread from the secondary queue (assuming that the - * primary queue is empty. - *) - and dequeue2 () = (case rdyQ2 - of (R.Q{front = ref [], rear = ref []}) => (dummyTid, !pauseHook) - | (R.Q{front as ref [], rear as ref l}) => ( - rear := []; front := reverse(l, []); dequeue2()) - | (R.Q{front as ref(item::r), ...}) => (front := r; item) - (* end case *)) - - (* promote a thread from the secondary queue to the primary queue *) - fun promote () = (case (Q.next rdyQ2) - of (SOME x) => enqueue x - | NONE => () - (* end case *)) - - (* global flag for implementing atomic operations *) - datatype atomic_state = NonAtomic | Atomic | SignalPending - val atomicState = ref NonAtomic - - (* Note, the first thing the scheduler hook does is a atomicBegin, so we don't - * need to clear the atomic state here. - *) - fun dispatchSchedulerHook () = throw (!schedulerHook) () - -(* - fun enqueueSchedulerHook () = let - val kont = callcc (fn k => ( - callcc (fn k' => throw k k'); - dispatchSchedulerHook ())) - val R.Q{front, ...} = rdyQ1 - in - front := (dummyTid, kont) :: !front - end -*) - - fun atomicBegin () = atomicState := Atomic - - (* leave an atomic region. - * NOTE: there is a race condition between the test of the atomicState - * flag and the setting of it to NonAtomic, but this is not a problem in - * practice, because there are no GC tests between these (and thus no - * preemption). - *) - fun atomicEnd () = (case !atomicState - of SignalPending => callcc (fn k => ( - enqueue(getCurThread(), k); - dispatchSchedulerHook())) - | _ => atomicState := NonAtomic - (* end case *)) - - fun atomicDispatch () = (case !atomicState - of SignalPending => dispatchSchedulerHook() - | _ => let - val (id, kont) = dequeue1() - in - setCurThread id; - atomicState := NonAtomic; - throw kont () - end - (* end case *)) - - fun dispatch () = (atomicBegin(); atomicDispatch ()) - - fun atomicSwitchTo (tid, k, x) = - callcc (fn curK => ( - case !atomicState - of SignalPending => - callcc (fn k' => ( - enqueue(tid, k'); - enqueue(getCurThread(), curK); - dispatchSchedulerHook())) - | _ => ( - enqueueAndSwitchCurThread (curK, tid); - atomicState := NonAtomic) - (* end case *); - throw k x)) - - (* Yield control to the next thread, while leaving the atomic region. *) - fun atomicYield k = ( - markAndEnqueue(getCurThread(), k); - atomicDispatch ()) - - (* create a temporary thread (with dummy ID) to run the given - * function and then exit. The thread is placed on the front - * of the scheduling queue. - *) - fun enqueueTmpThread f = let -(** this should be, but the overhead is too high right now. ** - val kont = SMLofNJ.Cont.isolate f -**) - val kont = callcc (fn k => ( - callcc (fn k' => throw k k'); - f () handle _ => (); - dispatch ())) - val R.Q{front, ...} = rdyQ1 - in - front := (dummyTid, kont) :: !front - end - - val defaultHook : unit cont = SMLofNJ.Cont.isolate dispatch - - (* this holds an approximation of the current time of day. It is - * cleared at each pre-emption, and initialized on demand (by getTime). - *) - val clock = ref(SOME Time.zeroTime) - - (* returns an approximation of the current time of day (this is at - * least as accurate as the time quantum). - *) - fun getTime () = (case !clock - of NONE => let val t = Time.now() - in - clock := SOME t; t - end - | (SOME t) => t - (* end case *)) - - (* preempt the current thread (with continuation k). *) - fun preempt k = let - val curTid = getCurThread() - val curP = (curTid, k) - in - if (isMarked curTid) - then ( - unmarkTid curTid; - promote (); - enqueue curP) - else Q.enqueue(rdyQ2, curP) - end - - (* the preemption handler *) - fun alrmHandler (_, _, k) = ( - clock := NONE; - case !atomicState - of NonAtomic => (preempt k; !schedulerHook) - | Atomic => (atomicState := SignalPending; k) - | _ => k - (* end case *)) - - val defaultTimeQ = Time.fromMilliseconds 20 - val timeQ = ref defaultTimeQ - - structure IT = SMLofNJ.IntervalTimer - - fun startTimer tq = let - val tq = if Time.<(Time.zeroTime, tq) then tq else defaultTimeQ - in - timeQ := tq; - ignore (Sig.setHandler (Sig.sigALRM, Sig.HANDLER alrmHandler)); - ignore (IT.setIntTimer (SOME tq)) - end - - fun stopTimer () = ( - ignore (IT.setIntTimer NONE); - ignore (Sig.setHandler (Sig.sigALRM, Sig.IGNORE))) - - fun restartTimer () = startTimer (!timeQ) - - (* reset various pieces of state *) - fun reset running = ( - setCurThread dummyTid; - pauseHook := bogusHook; - shutdownHook := bogusShutdownHook; - schedulerHook := defaultHook; - clock := NONE; - Q.reset rdyQ1; Q.reset rdyQ2; - if (not running) then enqueueThread(errorTid, errorCont) else ()) - - val _ = reset false - - end - diff -Naur --exclude=_MTN base/src/cml/src/core-cml/simtimeoutdummy.sml final/src/cml/src/core-cml/simtimeoutdummy.sml --- base/src/cml/src/core-cml/simtimeoutdummy.sml 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/simtimeoutdummy.sml 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,23 @@ +(* timeout.sml + * + * COPYRIGHT (c) 1995 AT&T Bell Laboratories. + * COPYRIGHT (c) 1989-1991 John H. Reppy + * + * Events for synchronizing on timeouts. + *) + +structure SimTimeOutDummy: sig + + val reset : unit -> unit + val pollTime : unit -> unit + val anyWaiting : unit -> Time.time option + + end = struct + fun reset () = () + fun pollTime () = () + fun anyWaiting () = NONE +end; + +structure SimTimeOut = SimTimeOutDummy + + diff -Naur --exclude=_MTN base/src/cml/src/core-cml/simtimeout-leftist.sml final/src/cml/src/core-cml/simtimeout-leftist.sml --- base/src/cml/src/core-cml/simtimeout-leftist.sml 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/simtimeout-leftist.sml 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,135 @@ +(* sim-timeout.sml + * + * COPYRIGHT (c) 1995 AT&T Bell Laboratories. + * COPYRIGHT (c) 1989-1991 John H. Reppy + * + * Events for synchronizing on simulated timeouts. + * The list-based implementation was too slow + * when there were thousands of processes sleeping on + * timeouts; this impl is much faster but + * faster yet would be better. + *) + +structure SimTimeOut : sig + + type 'a event + + include TIME_OUT + + val reset : unit -> unit + val pollTime : unit -> unit + val anyWaiting : unit -> Time.time option + val now : unit -> Time.time + + end = struct + + structure R = RepTypes + structure S = Scheduler + + type 'a event = 'a Event.event + + (* The list of threads waiting for timeouts. It is sorted in increasing order + * of time value. + * NOTE: we may want to use some sort of balanced search structure in the + * future. + *) + + structure Item = struct + type item = (Time.time * (unit -> unit) * R.trans_id ref * unit S.cont) + type priority = Time.time + fun compare (t1, t2) = Time.compare (t2, t1) + fun priority (t,_,_,_) = t + end + structure Heap = LeftPriorityQFn(Item) + type item = Item.item + val timeQ = ref Heap.empty + val simNow = ref Time.zeroTime + fun now () = !simNow + + fun timeWait (t, f, id, k) = let + (* the list-based version of timeWait cleaned up CANCEL'd timeouts *) + (* for times less than that of the inserted item; this one doesn't do *) + (* that; is it a problem ? *) + in + timeQ := Heap.insert((t, f, id, k), !timeQ) + end + + fun checkQ q = let + val item = Heap.next q + in + case item of + NONE => Heap.empty + | SOME ((t,f,transId,k),newHeap) => let + val currTid as RepTypes.TID{priority,...} = Scheduler.getCurThread() + val now = (if SchedQueuesSim.allSimsBlocked() andalso (priority <> (SOME R.SIM)) + then simNow := t + else (); + !simNow) + in + case transId of + ref R.CANCEL => checkQ newHeap + | ref (R.TRANS tid) => + if (Time.<=(t, now)) + then ( + S.markAndEnqueue(tid, k); + f(); + checkQ newHeap + ) + else + q + end + end + + fun anyWaiting () = case Heap.next (!timeQ) + of NONE => NONE + | SOME ((_,_,ref R.CANCEL,_),newHeap) => (timeQ := newHeap; anyWaiting()) + | SOME ((t,_,_,_),_) => let + val now = !simNow + in + if (Time.<=(t, now)) + then SOME(Time.zeroTime) + else SOME(Time.-(t, now)) + end + + fun pollTime () = + if Heap.isEmpty (!timeQ) + then () + else timeQ := checkQ (!timeQ) + + fun reset () = (timeQ := Heap.empty; simNow := Time.zeroTime) + + (** NOTE: unlike for most base events, the block functions of time-out + ** events do not have to exit the atomic region or execute the clean-up + ** operation. This is done when they are removed from the waiting queue. + **) + fun timeOutEvt t = let + val t0 = !simNow + fun blockFn {transId, cleanUp, next} = let + val _ = !simNow (* silly syntax *) + in + SMLofNJ.Cont.callcc (fn k => ( + timeWait (Time.+(t, t0), cleanUp, transId, k); + next())); + S.atomicEnd() + end + fun pollFn () = if (t = Time.zeroTime) + then R.ENABLED{prio= ~1, doFn=S.atomicEnd} + else R.BLOCKED blockFn + in + R.BEVT[pollFn] + end + + fun atTimeEvt t = let + fun blockFn {transId, cleanUp, next} = ( + SMLofNJ.Cont.callcc (fn k => ( + timeWait (t, cleanUp, transId, k); + next())); + S.atomicEnd()) + fun pollFn () = if Time.<=(t, !simNow) + then R.ENABLED{prio= ~1, doFn=S.atomicEnd} + else R.BLOCKED blockFn + in + R.BEVT[pollFn] + end + + end; diff -Naur --exclude=_MTN base/src/cml/src/core-cml/simtimeout-list.sml final/src/cml/src/core-cml/simtimeout-list.sml --- base/src/cml/src/core-cml/simtimeout-list.sml 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/simtimeout-list.sml 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,120 @@ +(* timeout.sml + * + * COPYRIGHT (c) 1995 AT&T Bell Laboratories. + * COPYRIGHT (c) 1989-1991 John H. Reppy + * + * Events for synchronizing on simulated timeouts. + * Too slow; see the new impl using leftist heap priority queues. + *) + +structure SimTimeOut : sig + + type 'a event + + include TIME_OUT + + val reset : unit -> unit + val pollTime : unit -> unit + val anyWaiting : unit -> Time.time option + val now : unit -> Time.time + + end = struct + + structure R = RepTypes + structure S = Scheduler + + type 'a event = 'a Event.event + + (* The list of threads waiting for timouts. It is sorted in increasing order + * of time value. + * NOTE: we may want to use some sort of balanced search structure in the + * future. + *) + type item = (Time.time * (unit -> unit) * R.trans_id ref * unit S.cont) + val timeQ = ref ([] : item list) + val simNow = ref Time.zeroTime + fun now () = !simNow + + fun timeWait (t, f, id, k) = let + fun ins [] = [(t, f, id, k)] + | ins ((_, _, ref R.CANCEL, _) :: r) = ins r + | ins (l as ((item as (t', _, _, _)) :: r)) = if (Time.<(t', t)) + then item :: ins r + else (t, f, id, k) :: l + in + timeQ := ins (! timeQ) + end + + fun clean [] = [] + | clean ((_, _, ref R.CANCEL, _) :: r) = clean r + | clean (item :: r) = item :: clean r + + fun checkQ (q as ((item as (t, _, _, _)) :: _)) = let + val currTid as RepTypes.TID{alert,...} = Scheduler.getCurThread() + val now = (if SchedQueuesSim.allSimsBlocked() andalso (not (!alert)) then simNow := t else (); !simNow) + fun chk [] = [] + | chk ((_, _, ref R.CANCEL, _) :: r) = chk r + | chk (l as ((item as (t', f, transId as ref(R.TRANS tid), k)) :: r)) = + if (Time.<=(t', now)) + then ( + S.markAndEnqueue(tid, k); + f(); (* cleanup function *) + chk r) + else clean l + in + chk q + end + + fun anyWaiting () = (case clean(!timeQ) + of [] => NONE + | (q as ((t, _, _, _)::_)) => let + val now = !simNow + in + if (Time.<=(t, now)) + then SOME(Time.zeroTime) + else SOME(Time.-(t, now)) + end + (* end case *)) + + fun pollTime () = (case !timeQ + of [] => () + | q => timeQ := checkQ q + (* end case *)) + + fun reset () = (timeQ := []; simNow := Time.zeroTime) + + (** NOTE: unlike for most base events, the block functions of time-out + ** events do not have to exit the atomic region or execute the clean-up + ** operation. This is done when they are removed from the waiting queue. + **) + fun timeOutEvt t = let + val t0 = !simNow + fun blockFn {transId, cleanUp, next} = let + val _ = !simNow (* silly syntax *) + in + SMLofNJ.Cont.callcc (fn k => ( + timeWait (Time.+(t, t0), cleanUp, transId, k); + next())); + S.atomicEnd() + end + fun pollFn () = if (t = Time.zeroTime) + then R.ENABLED{prio= ~1, doFn=S.atomicEnd} + else R.BLOCKED blockFn + in + R.BEVT[pollFn] + end + + fun atTimeEvt t = let + fun blockFn {transId, cleanUp, next} = ( + SMLofNJ.Cont.callcc (fn k => ( + timeWait (t, cleanUp, transId, k); + next())); + S.atomicEnd()) + fun pollFn () = if Time.<=(t, !simNow) + then R.ENABLED{prio= ~1, doFn=S.atomicEnd} + else R.BLOCKED blockFn + in + R.BEVT[pollFn] + end + + end; diff -Naur --exclude=_MTN base/src/cml/src/core-cml/simtimeout.sml final/src/cml/src/core-cml/simtimeout.sml --- base/src/cml/src/core-cml/simtimeout.sml 1969-12-31 16:00:00.000000000 -0800 +++ final/src/cml/src/core-cml/simtimeout.sml 2006-10-20 13:42:29.000000000 -0700 @@ -0,0 +1,140 @@ +(* sim-timeout.sml + * + * COPYRIGHT (c) 1995 AT&T Bell Laboratories. + * COPYRIGHT (c) 1989-1991 John H. Reppy + * + * Events for synchronizing on simulated timeouts. + * The list-based implementation was too slow + * when there were thousands of processes sleeping on + * timeouts; this impl is much faster but + * faster yet would be better. + *) + +structure SimTimeOut : sig + + type 'a event + + include TIME_OUT + + val reset : unit -> unit + val pollTime : unit -> unit + val anyWaiting : unit -> Time.time option + val now : unit -> Time.time + + end = struct + + structure R = RepTypes + structure S = Scheduler + + type 'a event = 'a Event.event + + (* The list of threads waiting for timeouts. It is sorted in increasing order + * of time value. + * NOTE: we may want to use some sort of balanced search structure in the + * future. + *) + + structure Item = struct + type item = (Time.time * (unit -> unit) * R.trans_id ref * unit S.cont) + type priority = Time.time + fun compare (t1, t2) = Time.compare (t2, t1) + fun less (SOME ((t1,_,_,_):item), SOME ((t2,_,_,_):item)) = Time.<(t1,t2) + fun priority (t,_,_,_) = t + end + structure Heap = PriorityHeap + type item = Item.item + (* The Heap implementation, since it doesn't grow, limits max number of waiting timeouts to 110,000 *) + val timeQ = Heap.createN(Item.less, 110000, NONE : Item.item option) + val simNow = ref Time.zeroTime + fun now () = !simNow + + fun timeWait (t, f, id, k) = let + (* the list-based version of timeWait cleaned up CANCEL'd timeouts *) + (* for times less than that of the inserted item; this one doesn't do *) + (* that; is it a problem ? *) + in + Heap.insert timeQ (SOME (t, f, id, k)) + end + + fun checkQ q = let + in + case Heap.isEmpty q of + true => () + | false => let + val (t,f,transId,k) = valOf (Heap.min q) + val currTid as RepTypes.TID{priority,...} = Scheduler.getCurThread() + val now = (if SchedQueuesSim.allSimsBlocked() andalso (priority <> (SOME R.SIM)) + then simNow := t + else (); + !simNow) + in + case transId of + ref R.CANCEL => (Heap.deleteMin q; checkQ q) + | ref (R.TRANS tid) => + if (Time.<=(t, now)) + then ( + S.markAndEnqueue(tid, k); + f(); + Heap.deleteMin q; + checkQ q + ) + else + () + end + end + + fun anyWaiting () = case Heap.isEmpty timeQ + of true => NONE + | false => case valOf (Heap.min timeQ) + of (_,_,ref R.CANCEL,_) => + (Heap.deleteMin timeQ; anyWaiting()) + | (t,_,_,_) => let + val now = !simNow + in + if (Time.<=(t, now)) + then SOME(Time.zeroTime) + else SOME(Time.-(t, now)) + end + + fun pollTime () = + if Heap.isEmpty (timeQ) + then () + else checkQ timeQ + + fun reset () = (Heap.clear timeQ; simNow := Time.zeroTime) + + (** NOTE: unlike for most base events, the block functions of time-out + ** events do not have to exit the atomic region or execute the clean-up + ** operation. This is done when they are removed from the waiting queue. + **) + fun timeOutEvt t = let + + fun blockFn {transId, cleanUp, next} = let + val t0 = !simNow + in + SMLofNJ.Cont.callcc (fn k => ( + timeWait (Time.+(t, t0), cleanUp, transId, k); + next())); + S.atomicEnd() + end + fun pollFn () = if (t = Time.zeroTime) + then R.ENABLED{prio= ~1, doFn=S.atomicEnd} + else R.BLOCKED blockFn + in + R.BEVT[pollFn] + end + + fun atTimeEvt t = let + fun blockFn {transId, cleanUp, next} = ( + SMLofNJ.Cont.callcc (fn k => ( + timeWait (t, cleanUp, transId, k); + next())); + S.atomicEnd()) + fun pollFn () = if Time.<=(t, !simNow) + then R.ENABLED{prio= ~1, doFn=S.atomicEnd} + else R.BLOCKED blockFn + in + R.BEVT[pollFn] + end + + end; diff -Naur --exclude=_MTN base/src/cml/src/core-cml/thread-sig.sml final/src/cml/src/core-cml/thread-sig.sml --- base/src/cml/src/core-cml/thread-sig.sml 2006-10-20 13:42:14.000000000 -0700 +++ final/src/cml/src/core-cml/thread-sig.sml 2006-10-20 13:42:25.000000000 -0700 @@ -19,6 +19,11 @@ val spawnc : ('a -> unit) -> 'a -> thread_id val spawn : (unit -> unit) -> thread_id + +(* spawn function for priority queue*) + val spawnc_prio : int -> ('a -> unit) -> 'a -> thread_id + val spawn_prio : int -> (unit -> unit) -> thread_id + val exit : unit -> 'a diff -Naur --exclude=_MTN base/src/cml/src/core-cml/thread.sml final/src/cml/src/core-cml/thread.sml --- base/src/cml/src/core-cml/thread.sml 2006-10-20 13:42:14.000000000 -0700 +++ final/src/cml/src/core-cml/thread.sml 2006-10-20 13:42:25.000000000 -0700 @@ -28,15 +28,19 @@ tidCount := 0; S.reset running) - fun exnHandler (exn : exn) = () + fun exnHandler (exn : exn) = Debug.sayDebugId "Uncaught exception in thread" (* () *) val defaultExnHandler = ref exnHandler - fun newTId () = let val n = !tidCount + fun newTId ( prio ) = let + val n = !tidCount + val priority = S.getPriority prio in tidCount := n+1; TID{ id = n, + enqFn = ref (S.getEnqueueFn priority), + priority = priority, alert = ref false, done_comm = ref false, exnHandler = ref(! defaultExnHandler), @@ -60,34 +64,28 @@ fun doHandler (TID{exnHandler, ...}, exn) = ((!exnHandler) exn) handle _ => () -(** Eventually, this should be: - fun spawnc f x = let - val _ = S.atomicBegin() - val id = newTId() - fun thread () = ( - (f x) handle ex => doHandler(id, ex); - notifyAndDispatch id) - in - SMLofNJ.Cont.callcc (fn parentK => ( - S.enqueueAndSwitchCurThread(parentK, id); - S.atomicEnd(); - SMLofNJ.Cont.throw (SMLofNJ.Cont.isolate thread) ())); - id - end - **) - fun spawnc f x = let - val _ = S.atomicBegin() - val id = newTId() - in - SMLofNJ.Cont.callcc (fn parentK => ( - S.enqueueAndSwitchCurThread(parentK, id); - S.atomicEnd(); - (f x) handle ex => doHandler(id, ex); - notifyAndDispatch id)); - id - end + fun yield () = SMLofNJ.Cont.callcc (fn k => ( + S.atomicBegin(); + S.atomicYield k)) - fun spawn f = spawnc f () + fun spawnc_prio prio f x = let + val _ = S.atomicBegin() + val id = newTId( prio ) + in + SMLofNJ.Cont.callcc (fn parentK => ( + S.enqueueAndSwitchCurThread(parentK, id); + S.atomicEnd(); + yield(); (* new: to make sure right one of parent and child continues first *) + (f x) handle ex => doHandler(id, ex); + notifyAndDispatch id)); + id + end + + fun spawn_prio prio f = spawnc_prio prio f () + + fun spawnc f x = spawnc_prio 5 f x + + fun spawn f = spawnc f () fun joinEvt (TID{dead, ...}) = Event.cvarGetEvt dead @@ -100,10 +98,6 @@ notifyAndDispatch tid end - fun yield () = SMLofNJ.Cont.callcc (fn k => ( - S.atomicBegin(); - S.atomicYield k)) - (* thread-local data *) local fun mkProp () = let diff -Naur --exclude=_MTN base/src/cml/src/core-cml/timeout.sml final/src/cml/src/core-cml/timeout.sml --- base/src/cml/src/core-cml/timeout.sml 2006-10-20 13:42:14.000000000 -0700 +++ final/src/cml/src/core-cml/timeout.sml 2006-10-20 13:42:25.000000000 -0700 @@ -50,7 +50,7 @@ | chk (l as ((item as (t', f, transId as ref(R.TRANS tid), k)) :: r)) = if (Time.<=(t', now)) then ( - S.enqueueThread (tid, k); + S.markAndEnqueue(tid, k); f(); (* cleanup function *) chk r) else clean l diff -Naur --exclude=_MTN base/src/cml/src/core-cml.cm final/src/cml/src/core-cml.cm --- base/src/cml/src/core-cml.cm 2006-10-20 13:42:14.000000000 -0700 +++ final/src/cml/src/core-cml.cm 2006-10-20 13:42:25.000000000 -0700 @@ -8,6 +8,8 @@ signature SYNC_VAR signature MAILBOX signature CML_CLEANUP + structure SchedulerGlueControl + structure Debug structure CML structure Event structure Q @@ -16,18 +18,17 @@ structure SyncVar structure Mailbox structure TimeOut + structure SimTimeOut structure IOManager structure Running structure CleanUp - structure Debug is #if defined (NEW_CM) $/basis.cm #endif + core-cml/cml-util.cm core-cml/version.sml - core-cml/rep-types.sml - core-cml/queue.sml - core-cml/scheduler.sml + core-cml/event-sig.sml core-cml/event.sml core-cml/thread-sig.sml @@ -36,9 +37,14 @@ core-cml/channel.sml core-cml/timeout-sig.sml core-cml/timeout.sml + core-cml/simtimeout.sml + $/smlnj-lib.cm + ../../MLRISC/library/heap.sml (private) + ../../MLRISC/library/priQueue.sig (private) core-cml/io-manager.sml core-cml/cml-sig.sml core-cml/cml.sml + core-cml/sync-var-sig.sml core-cml/sync-var.sml @@ -49,4 +55,5 @@ core-cml/cml-cleanup-sig.sml core-cml/cleanup.sml - core-cml/debug.sml + core-cml/scheduler/scheduler.cm + diff -Naur --exclude=_MTN base/src/cml/src/glue/export-fn-fn.sml final/src/cml/src/glue/export-fn-fn.sml --- base/src/cml/src/glue/export-fn-fn.sml 2006-10-20 13:42:14.000000000 -0700 +++ final/src/cml/src/glue/export-fn-fn.sml 2006-10-20 13:42:25.000000000 -0700 @@ -32,7 +32,7 @@ (* first, we poll the OS to schedule any ready threads *) G.pollOS(); (* check for ready threads orelse pause *) - if (not (Q.isEmpty S.rdyQ1) orelse G.pause()) + if (S.readyThreads() orelse G.pause()) then S.atomicDispatch() else ( S.atomicEnd(); diff -Naur --exclude=_MTN base/src/cml/src/tests/test.sml final/src/cml/src/tests/test.sml --- base/src/cml/src/tests/test.sml 2006-10-20 13:42:14.000000000 -0700 +++ final/src/cml/src/tests/test.sml 2006-10-20 13:42:25.000000000 -0700 @@ -7,9 +7,10 @@ val proc = Unix.execute(prog, []) val (fin,fout) = Unix.streamsOf proc fun echo () = (case TextIO.inputLine fin - of "" => () - | s => (TextIO.output(TextIO.stdOut, s); echo()) - (* end case *)) + of NONE => () + | SOME "" => () + | SOME s => (TextIO.output(TextIO.stdOut, s); echo()) + (* end case *)) in TextIO.closeOut fout; echo (); diff -Naur --exclude=_MTN base/src/cml/src/Unix/unix-glue.sml final/src/cml/src/Unix/unix-glue.sml --- base/src/cml/src/Unix/unix-glue.sml 2006-10-20 13:42:14.000000000 -0700 +++ final/src/cml/src/Unix/unix-glue.sml 2006-10-20 13:42:25.000000000 -0700 @@ -9,12 +9,13 @@ structure UnixGlue : OS_GLUE = struct - fun init () = TimeOut.reset() + fun init () = (TimeOut.reset(); SimTimeOut.reset()) fun pollOS () = ( TimeOut.pollTime(); IOManager.pollIO(); - ProcManager.pollProcs()) + ProcManager.pollProcs(); + SimTimeOut.pollTime()) fun pause () = (case TimeOut.anyWaiting() of NONE => if (IOManager.anyWaiting() orelse ProcManager.anyWaiting()) @@ -24,7 +25,7 @@ | (SOME t) => (Signals.pause(); true) (* end case *)) - fun shutdown () = TimeOut.reset() + fun shutdown () = (TimeOut.reset(); SimTimeOut.reset()) end; diff -Naur --exclude=_MTN base/src/eXene/examples/nbody/display.sml final/src/eXene/examples/nbody/display.sml --- base/src/eXene/examples/nbody/display.sml 2006-10-20 13:42:16.000000000 -0700 +++ final/src/eXene/examples/nbody/display.sml 2006-10-20 13:42:26.000000000 -0700 @@ -126,13 +126,13 @@ val panEvt = CML.recvEvt panCh fun mouseThread () = let - fun idle () = + fun idle () = ( case I.msgBodyOf (CML.sync m) of I.MOUSE_FirstDown { but = I.MButton 1, pt, ... } => pan pt | I.MOUSE_FirstDown { but = I.MButton 3, ... } => (quit (); idle ()) - | _ => idle () + | _ => idle ()) and pan (pt' as G.PT { x = x', y = y' }) = case I.msgBodyOf (CML.sync m) of @@ -179,12 +179,12 @@ fun death cl = (print "Simulation has died!\n"; quit (); loop cl) - and loop cl = CML.select + and loop cl = (CML.select [CML.wrap (simDeathEvt, fn () => death cl), CML.wrap (timer, fn () => (loop (update cl))), CML.wrap (ci, fn x => handleCI (cl, I.msgBodyOf x)), CML.wrap (panEvt, fn p => handlePan (cl, p)), - CML.wrap (zoomEvt, fn z => handleZoom (cl, z))] + CML.wrap (zoomEvt, fn z => handleZoom (cl, z))]) and handleCI (cl, I.CI_Resize (G.RECT r)) = let diff -Naur --exclude=_MTN base/src/eXene/examples/nbody/main.sml final/src/eXene/examples/nbody/main.sml --- base/src/eXene/examples/nbody/main.sml 2006-10-20 13:42:16.000000000 -0700 +++ final/src/eXene/examples/nbody/main.sml 2006-10-20 13:42:26.000000000 -0700 @@ -19,4 +19,17 @@ val bodyData = bodyData) val run = AnimateSim.doit + + fun runSched sched = + ( + (case sched of + "Reppy" => SchedulerGlueControl.useReppyQueues() + | "Leftist" => SchedulerGlueControl.useLeftistQueues() + | _ => SchedulerGlueControl.usePriorityQueues()); + run() + ) + + end + + diff -Naur --exclude=_MTN base/src/eXene/examples/nbody/simul.sml final/src/eXene/examples/nbody/simul.sml --- base/src/eXene/examples/nbody/simul.sml 2006-10-20 13:42:16.000000000 -0700 +++ final/src/eXene/examples/nbody/simul.sml 2006-10-20 13:42:26.000000000 -0700 @@ -77,6 +77,6 @@ | QUERY c => (CML.send (c, bl); loop (bl, dt, n)) | STOP => ()) in - CML.spawn (fn () => loop (bodies, dt, n)) + CML.spawn_prio 10 (fn () => loop (bodies, dt, n)) end end \ No newline at end of file diff -Naur --exclude=_MTN base/src/eXene/widgets/util/run-exene.sml final/src/eXene/widgets/util/run-exene.sml --- base/src/eXene/widgets/util/run-exene.sml 2006-10-20 13:42:16.000000000 -0700 +++ final/src/eXene/widgets/util/run-exene.sml 2006-10-20 13:42:27.000000000 -0700 @@ -35,7 +35,7 @@ end (* the default time quantum *) - val defaultTimeQ = Time.fromMilliseconds 20 (* ms *) + val defaultTimeQ = Time.fromMilliseconds 10 (* ms *) fun run doit = let fun runIt () = doit (mkRoot NONE)