Max 5 API Reference
00001 00002 00003 /* 00004 00005 parallel processing api 00006 00007 try to keep as simple and general as possible, 00008 more complicated/specific behavior can be added 00009 on top of the following 00010 00011 example applications 00012 - poly~ 00013 - jit.matrix operators 00014 - other parallelizable tasks 00015 00016 00017 - t_parallel_task: object which manages overall workload and workjer threads 00018 state: 00019 - maximum worker count 00020 - worker entry proc 00021 - thread priority (making use of thread pools for efficiency sake, 00022 currenlty 3 thread pools: audio (high), scheduler (med), queue (low)) 00023 - task global storage, user defined 00024 - state related to benchmarking/load balancing 00025 00026 verbs: 00027 - run task 00028 - cancel task 00029 00030 - t_parallel_worker: one worker thread 00031 - workerid 00032 - worker entry proc 00033 - pointer to parent task object 00034 - state related to benchmarking 00035 - worker specific data? 00036 verbs: 00037 - run worker 00038 - some means of benchmarking 00039 00040 00041 TODO: 00042 - t_parallel_iterator: utility wrapper for parallel task for iterating on a dataset 00043 could use object methods for ease of use 00044 00045 - means of negotiating behavior(initial callback before spawning threads) 00046 - setup initial worker state(secondary callback once for each thread) 00047 - shutdown per worker state(can cleanup any data needed) 00048 - shutdown of task execution handled in the function 00049 - notification of completion if complete asynchronous? 00050 00051 00052 */ 00053 00054 // for use inside an external 00055 #include "ext.h" 00056 #include "ext_obex.h" 00057 #include "ext_systhread.h" 00058 #include "ext_common.h" 00059 00060 #include "parallel.h" 00061 00062 00063 //private 00064 00065 t_class *_parallel_task_class=NULL; 00066 t_class *_parallel_worker_class=NULL; 00067 00068 // thread pool stuff. can improve after initial pass 00069 00070 typedef struct _parallel_thread 00071 { 00072 t_systhread thread; 00073 t_systhread_mutex mutex; // establishes external ownership of thread 00074 t_systhread_cond cond; 00075 t_parallel_worker *worker; 00076 long state; 00077 struct _parallel_thread_pool *pool; //parent 00078 } t_parallel_thread; 00079 00080 typedef struct _parallel_thread_pool 00081 { 00082 t_parallel_thread threads[PARALLEL_MAX_WORKERS]; 00083 long count; 00084 t_systhread_mutex count_mutex; 00085 t_systhread_cond count_cond; 00086 } t_parallel_thread_pool; 00087 00088 static t_parallel_thread_pool _pool_low; 00089 static t_parallel_thread_pool _pool_med; 00090 static t_parallel_thread_pool _pool_high; 00091 00092 void parallel_thread_init(t_parallel_thread *x); 00093 void parallel_threadproc(t_parallel_thread *x); 00094 void parallel_thread_pool_init(t_parallel_thread_pool *pool); 00095 00096 // do we want to add support for "barriers"? 00097 // see p. 215 in "High Performance Computing", Dowd + Severance 00098 00099 void parallel_init(void) 00100 { 00101 long i,attrflags=0; 00102 t_class *c; 00103 t_object *attr; 00104 00105 // CLASS DEFINITION: parallel_task 00106 c = class_new("parallel_task", (method)parallel_task_new, 00107 (method)parallel_task_free, sizeof(t_parallel_task), (method)NULL, A_CANT, 0L); 00108 00109 class_obexoffset_set(c, calcoffset(t_parallel_task, obex)); 00110 00111 /* TODO: 00112 attr = attr_offset_new("workercount",_sym_long,attrflags, 00113 (method)0L,(method)parallel_task_workercount, 00114 calcoffset(t_parallel_task, workercount)); 00115 class_addattr(c,attr); 00116 */ 00117 attr = attr_offset_new("benchmark",gensym("char"),attrflags, 00118 (method)0L,(method)0L, calcoffset(t_parallel_task, benchmark)); 00119 class_addattr(c,attr); 00120 attr = attr_offset_new("priority",gensym("char"),attrflags, 00121 (method)0L,(method)0L, calcoffset(t_parallel_task, priority)); 00122 class_addattr(c,attr); 00123 attr = attr_offset_new("iteration",gensym("long"),attrflags, 00124 (method)0L,(method)0L, calcoffset(t_parallel_task, iteration)); 00125 class_addattr(c,attr); 00126 attr = attr_offset_new("begintime",gensym("long"),attrflags, 00127 (method)0L,(method)0L, calcoffset(t_parallel_task, begintime)); 00128 class_addattr(c,attr); 00129 attr = attr_offset_new("endtime",gensym("long"),attrflags, 00130 (method)0L,(method)0L, calcoffset(t_parallel_task, endtime)); 00131 class_addattr(c,attr); 00132 00133 class_addmethod(c, (method)parallel_task_execute, "execute", 0L); 00134 class_addmethod(c, (method)parallel_task_cancel, "cancel", 0L); 00135 class_addmethod(c, (method)parallel_task_workerproc, "workerproc", A_CANT, 0L); 00136 class_addmethod(c, (method)parallel_task_data, "data", A_CANT, 0L); 00137 00138 _parallel_task_class = c; 00139 class_register(CLASS_NOBOX,_parallel_task_class); 00140 00141 // CLASS DEFINITION: parallel_worker 00142 c = class_new("parallel_worker", (method)parallel_worker_new, 00143 (method)parallel_worker_free, sizeof(t_parallel_worker), (method)NULL, A_CANT, 0L); 00144 00145 class_obexoffset_set(c, calcoffset(t_parallel_worker, obex)); 00146 00147 attr = attr_offset_new("id",gensym("long"),attrflags, 00148 (method)0L,(method)0L, calcoffset(t_parallel_worker, id)); 00149 class_addattr(c,attr); 00150 attr = attr_offset_new("begintime",gensym("long"),attrflags, 00151 (method)0L,(method)0L, calcoffset(t_parallel_worker, begintime)); 00152 class_addattr(c,attr); 00153 attr = attr_offset_new("endtime",gensym("long"),attrflags, 00154 (method)0L,(method)0L, calcoffset(t_parallel_worker, endtime)); 00155 class_addattr(c,attr); 00156 00157 class_addmethod(c, (method)parallel_worker_execute, "execute", 0L); 00158 00159 _parallel_worker_class = c; 00160 class_register(CLASS_NOBOX,_parallel_worker_class); 00161 00162 00163 // thread pool stuff. can improve after initial pass 00164 parallel_thread_pool_init(&_pool_low); 00165 parallel_thread_pool_init(&_pool_med); 00166 parallel_thread_pool_init(&_pool_high); 00167 } 00168 00169 00170 void parallel_thread_pool_init(t_parallel_thread_pool *pool) 00171 { 00172 long i; 00173 00174 for (i=0;i<PARALLEL_MAX_WORKERS;i++) { 00175 pool->threads[i].thread = NULL; 00176 systhread_mutex_new(&pool->threads[i].mutex,0); 00177 systhread_cond_new(&pool->threads[i].cond,0); 00178 pool->threads[i].worker = NULL; 00179 pool->threads[i].state = 0; 00180 pool->threads[i].pool = pool; 00181 } 00182 pool->count = 0; 00183 systhread_mutex_new(&pool->count_mutex,0); 00184 systhread_cond_new(&pool->count_cond,0); 00185 00186 } 00187 00188 long parallel_processorcount(void) 00189 { 00190 #ifdef MAC_VERSION 00191 return MPProcessorsScheduled(); 00192 #else 00193 SYSTEM_INFO info; 00194 GetSystemInfo(&info); 00195 return info.dwNumberOfProcessors; 00196 #endif 00197 } 00198 00199 t_parallel_task *parallel_task_new(void *data, method workerproc, long workercount) 00200 { 00201 long i; 00202 t_parallel_task *x=NULL; 00203 t_parallel_worker *w=NULL; 00204 00205 if (workerproc && (x=(t_parallel_task *)object_alloc(_parallel_task_class))) { 00206 x->data = data; 00207 x->workerproc = workerproc; 00208 if (workercount<=0) 00209 workercount = parallel_processorcount(); 00210 x->workercount = MIN(workercount,PARALLEL_MAX_WORKERS); 00211 x->priority = PARALLEL_PRIORITY_DEFAULT; 00212 x->benchmark = 0; 00213 x->iteration = 0; 00214 x->begintime = 0; 00215 x->endtime = 0; 00216 x->cancel = 0; 00217 x->workers = linklist_new(); 00218 if (!x->workers) { 00219 freeobject((t_object *)x); 00220 x=NULL; 00221 } 00222 for (i=0;i<x->workercount;i++) { 00223 w = parallel_worker_new(data,workerproc,x); 00224 if (w) { 00225 w->id = i; 00226 // set any other state 00227 linklist_append(x->workers,w); 00228 } else { 00229 freeobject((t_object *)x); 00230 x=NULL; 00231 } 00232 } 00233 } 00234 return x; 00235 } 00236 00237 t_max_err parallel_task_execute(t_parallel_task *x) 00238 { 00239 long i; 00240 t_parallel_thread_pool *pool; 00241 t_parallel_worker *worker; 00242 00243 switch (x->priority) { 00244 case PARALLEL_PRIORITY_HIGH: 00245 pool = &_pool_high; 00246 break; 00247 case PARALLEL_PRIORITY_MEDIUM: 00248 pool = &_pool_med; 00249 break; 00250 case PARALLEL_PRIORITY_LOW: 00251 default: 00252 pool = &_pool_low; 00253 } 00254 00255 x->begintime = x->benchmark ? systimer_gettime() : 0.; 00256 00257 if (pool->count!=0) { 00258 error("our pool count isn't zero (%d), must be calling from the wrong thread...living dangerously",pool->count); 00259 pool->count = 0; 00260 } 00261 00262 for (i=0;i<(x->workercount-1);i++) { 00263 //lock and increment thread count 00264 systhread_mutex_lock(pool->count_mutex); 00265 pool->count++; 00266 systhread_mutex_unlock(pool->count_mutex); 00267 00268 //lock worker to modify worker state. will block if worker is currently executing 00269 systhread_mutex_lock(pool->threads[i].mutex); 00270 parallel_thread_init((pool->threads)+i); // create systhread if not yet 00271 pool->threads[i].worker = (t_parallel_worker *)linklist_getindex(x->workers,i); 00272 pool->threads[i].state = PARALLEL_STATE_RUN; 00273 systhread_mutex_unlock(pool->threads[i].mutex); 00274 //signal the worker 00275 systhread_cond_signal(pool->threads[i].cond); 00276 } 00277 00278 worker = (t_parallel_worker *)linklist_getindex(x->workers,x->workercount-1); 00279 parallel_worker_execute(worker); 00280 00281 // wait for all threads to complete 00282 systhread_mutex_lock(pool->count_mutex); 00283 while (pool->count>0) { 00284 systhread_cond_wait(pool->count_cond,pool->count_mutex); 00285 } 00286 systhread_mutex_unlock(pool->count_mutex); 00287 00288 x->endtime = x->benchmark ? systimer_gettime() : 0.; 00289 00290 return MAX_ERR_NONE; 00291 } 00292 00293 t_max_err parallel_task_cancel(t_parallel_task *x) 00294 { 00295 x->cancel = 1; 00296 return MAX_ERR_NONE; 00297 } 00298 00299 void parallel_task_free(t_parallel_task *x) 00300 { 00301 if (x->workers) 00302 object_free(x->workers); 00303 } 00304 00305 void parallel_task_benchprint(t_parallel_task *x) 00306 { 00307 t_parallel_worker *w; 00308 double avg=0,tmp; 00309 long i; 00310 00311 jit_object_post((t_object *)x,"PARALLEL BENCH"); 00312 jit_object_post((t_object *)x,"PROCESSOR COUNT: %d", parallel_processorcount()); 00313 jit_object_post((t_object *)x,"WORKER COUNT: %d", x->workercount); 00314 jit_object_post((t_object *)x,"TOTAL TASK TIME: %f", x->endtime - x->begintime); 00315 for (i=0;i<x->workercount;i++) { 00316 if (x->workers&&(w=(t_parallel_worker *)linklist_getindex(x->workers,i))) { 00317 tmp = w->endtime - w->begintime; 00318 jit_object_post((t_object *)x,"WORKER[%d] TIME: %f", i, tmp); 00319 avg += tmp; 00320 } else { 00321 jit_object_post((t_object *)x,"error trying to print worker[%d]",i); 00322 } 00323 } 00324 avg = avg/(double)x->workercount; 00325 jit_object_post((t_object *)x,"AVERAGE WORKER TIME: %f", avg); 00326 } 00327 00328 void parallel_task_data(t_parallel_task *x, void * data) 00329 { 00330 t_parallel_worker *w; 00331 long i; 00332 00333 x->data = data; 00334 for (i=0;i<x->workercount;i++) { 00335 if (x->workers&&(w=(t_parallel_worker *)linklist_getindex(x->workers,i))) { 00336 w->data = data; 00337 } 00338 } 00339 } 00340 00341 void parallel_task_workerproc(t_parallel_task *x, method workerproc) 00342 { 00343 t_parallel_worker *w; 00344 long i; 00345 00346 x->workerproc = workerproc; 00347 for (i=0;i<x->workercount;i++) { 00348 if (x->workers&&(w=(t_parallel_worker *)linklist_getindex(x->workers,i))) { 00349 w->workerproc = workerproc; 00350 } 00351 } 00352 } 00353 00354 // TODO: parallel_task_workercount 00355 00356 t_parallel_worker *parallel_worker_new(void *data, method workerproc, t_parallel_task *task) 00357 { 00358 t_parallel_worker *x=NULL; 00359 00360 if (workerproc && task && (x=(t_parallel_worker *)object_alloc(_parallel_worker_class))) { 00361 x->data = data; 00362 x->workerproc = workerproc; 00363 x->task = task; 00364 x->begintime = 0; 00365 x->endtime = 0; 00366 } 00367 return x; 00368 } 00369 00370 t_max_err parallel_worker_execute(t_parallel_worker *x) 00371 { 00372 x->begintime = x->task->benchmark ? systimer_gettime() : 0.; 00373 x->workerproc(x); 00374 x->endtime = x->task->benchmark ? systimer_gettime() : 0.; 00375 00376 return MAX_ERR_NONE; 00377 } 00378 00379 void parallel_worker_free(t_parallel_worker *x) 00380 { 00381 // nada for now 00382 } 00383 00384 void parallel_thread_init(t_parallel_thread *x) 00385 { 00386 if (!x->thread) 00387 systhread_create((method)parallel_threadproc,x,0,0,0,&x->thread); 00388 } 00389 00390 void parallel_threadproc(t_parallel_thread *x) 00391 { 00392 systhread_mutex_lock(x->mutex); 00393 while (TRUE) { 00394 while (x->state!=PARALLEL_STATE_RUN) { 00395 systhread_cond_wait(x->cond,x->mutex); 00396 } 00397 //if (x->state==PARALLEL_STATE_RUN) { 00398 parallel_worker_execute(x->worker); 00399 x->state=PARALLEL_STATE_DONE; 00400 systhread_mutex_lock(x->pool->count_mutex); 00401 x->pool->count--; 00402 systhread_mutex_unlock(x->pool->count_mutex); 00403 systhread_cond_broadcast(x->pool->count_cond); // could just use signal here since only one thread waiting 00404 //} 00405 } 00406 systhread_mutex_unlock(x->mutex); 00407 }
Copyright © 2008, Cycling '74