[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Commit-gnuradio] r7020 - gnuradio/branches/developers/eb/gcell/src/lib
From: |
eb |
Subject: |
[Commit-gnuradio] r7020 - gnuradio/branches/developers/eb/gcell/src/lib |
Date: |
Sat, 24 Nov 2007 19:48:29 -0700 (MST) |
Author: eb
Date: 2007-11-24 19:48:29 -0700 (Sat, 24 Nov 2007)
New Revision: 7020
Modified:
gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
Log:
work-in-progress on cell job manager.
Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
2007-11-24 09:29:11 UTC (rev 7019)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.cc
2007-11-25 02:48:29 UTC (rev 7020)
@@ -102,8 +102,6 @@
// clamp nspes
d_options.nspes = std::min(d_options.nspes, (unsigned int) MAX_SPES);
-
- // FIXME when we get that next gen Cell
nusable_spes = std::min(nusable_spes, (int) MAX_SPES);
//
@@ -253,7 +251,7 @@
d_shutdown_requested = true; // set flag for event handler thread
// should only happens during early QA code
- if (d_eh_state == EHS_INIT)
+ if (d_eh_thread == 0 && d_eh_state == EHS_INIT)
return false;
while (d_eh_state != EHS_DEAD) // wait for it to finish
@@ -286,6 +284,9 @@
bool
gc_job_manager_impl::submit_job(gc_job_desc *jd)
{
+ if (d_shutdown_requested)
+ return false;
+
return false; // FIXME
}
@@ -295,7 +296,52 @@
return false; // FIXME
}
-extern "C" static void *
+static void
+pthread_create_failure_msg(int r, const char *which)
+{
+ char buf[256];
+ char *s = 0;
+
+ switch (r){
+ case EAGAIN: s = "EAGAIN"; break;
+ case EINVAL: s = "EINVAL"; break;
+ case EPERM: s = "EPERM"; break;
+ default:
+ snprintf(buf, sizeof(buf), "Unknown error %d", r);
+ s = buf;
+ break;
+ }
+ fprintf(stderr, "pthread_create[%s] failed: %s\n", which, s);
+}
+
+
+static bool
+start_thread(pthread_t *thread,
+ void *(*start_routine)(void *), void *arg,
+ const char *msg)
+{
+ pthread_attr_t attr;
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+
+ // FIXME save sigprocmask
+ // FIXME set sigprocmask
+
+ int r = pthread_create(thread, &attr, start_routine, arg);
+
+ // FIXME restore sigprocmask
+
+ if (r != 0){
+ pthread_create_failure_msg(r, msg);
+ return false;
+ }
+ return true;
+}
+
+
+
+//extern "C"
+static void *
start_event_handler(void *arg)
{
gc_job_manager_impl *p = (gc_job_manager_impl *) arg;
@@ -303,6 +349,24 @@
return 0;
}
+//extern "C"
+static void *
+worker_loop(void *arg)
+{
+ // FIXME need to pass additional info.
+
+ worker_ctx *w = (worker_ctx *) arg;
+
+ w->state = WS_RUNNING;
+
+ printf("worker running\n");
+
+ // FIXME spe_context_run
+
+ w->state = WS_DEAD;
+ return 0;
+}
+
void
gc_job_manager_impl::create_event_handler()
{
@@ -329,8 +393,32 @@
// create our event handling thread
-
+ if (!start_thread(&d_eh_thread, start_event_handler, this, "event_handler")){
+ throw std::runtime_error("pthread_create");
+ }
+ // create the SPE worker threads
+
+ bool ok = true;
+ for (unsigned int i = 0; ok && i < d_options.nspes; i++){
+ char name[256];
+ snprintf(name, sizeof(name), "worker[%d]", i);
+ ok &= start_thread(&d_worker[i].thread, worker_loop,
+ &d_worker[i], name);
+ }
+
+ if (!ok){
+ //
+ // FIXME Clean up the mess. Need to terminate event handler and all
workers.
+ //
+ // this should cause the workers to exit, unless they're really broken
+ send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
+
+ shutdown();
+ sleep(1); // FIXME remove when shutdown waits for workers
+
+ throw std::runtime_error("pthread_create");
+ }
}
bool
@@ -347,7 +435,17 @@
bool
gc_job_manager_impl::send_spe(unsigned int spe, uint32_t msg)
{
- return false;
+ if (spe >= d_options.nspes)
+ return false;
+
+ int r = spe_in_mbox_write(d_worker[spe].spe_ctx, &msg, 1,
+ SPE_MBOX_ALL_BLOCKING);
+ if (r < 0){
+ perror("spe_in_mbox_write");
+ return false;
+ }
+
+ return r == 1;
}
void
@@ -379,6 +477,100 @@
}
void
+gc_job_manager_impl::handle_event(spe_event_unit_t *evt)
+{
+ print_event(evt);
+
+ int spe_num = evt->data.u32;
+
+ // we assume that only a single event type can be signaled at once
+
+ if (evt->events == SPE_EVENT_OUT_INTR_MBOX) { // SPE sent us 1 or more msgs
+ static const int NMSGS = 8;
+ unsigned int msg[NMSGS];
+ int n = spe_out_mbox_read(evt->spe, msg, NMSGS);
+ if (n < 0){
+ perror("spe_out_mbox_read");
+ }
+ else {
+ for (int i = 0; i < n; i++){
+ switch(MBOX_MSG_OP(msg[i])){
+ case OP_JOB_DONE:
+ printf("job_done (0x%08x) from spe[%d]\n", msg[i], spe_num);
+ break;
+
+ case OP_PING_REPLY:
+ printf("ping_reply (0x%08x) from spe[%d]\n", msg[i], spe_num);
+ break;
+
+ case OP_EXIT:
+ case OP_PING:
+ default:
+ printf("Unexpected msg (0x%08x) from spe[%d]\n", msg[i], spe_num);
+ break;
+ }
+ }
+ }
+ }
+ else if (evt->events == SPE_EVENT_IN_MBOX){ // there's room to write to SPE
+ // spe_in_mbox_write (ignore)
+ }
+ else if (evt->events == SPE_EVENT_TAG_GROUP){ // our DMA completed
+ // spe_mfcio_tag_status_read
+ }
+ else if (evt->events == SPE_EVENT_SPE_STOPPED){ // the SPE stopped
+ spe_stop_info_t si;
+ int r = spe_stop_info_read(evt->spe, &si);
+ if (r < 0){
+ perror("spe_stop_info_read");
+ }
+ else {
+ switch (si.stop_reason){
+ case SPE_EXIT:
+ printf("spe[%d] SPE_EXIT w/ exit_code = %d\n",
+ spe_num, si.result.spe_exit_code);
+ break;
+ case SPE_STOP_AND_SIGNAL:
+ printf("spe[%d] SPE_STOP_AND_SIGNAL w/ spe_signal_code = 0x%x\n",
+ spe_num, si.result.spe_signal_code);
+ break;
+ case SPE_RUNTIME_ERROR:
+ printf("spe[%d] SPE_RUNTIME_ERROR w/ spe_runtime_error = 0x%x\n",
+ spe_num, si.result.spe_runtime_error);
+ break;
+ case SPE_RUNTIME_EXCEPTION:
+ printf("spe[%d] SPE_RUNTIME_EXCEPTION w/ spe_runtime_exception =
0x%x\n",
+ spe_num, si.result.spe_runtime_exception);
+ break;
+ case SPE_RUNTIME_FATAL:
+ printf("spe[%d] SPE_RUNTIME_FATAL w/ spe_runtime_fatal = 0x%x\n",
+ spe_num, si.result.spe_runtime_fatal);
+ break;
+ case SPE_CALLBACK_ERROR:
+ printf("spe[%d] SPE_CALLBACK_ERROR w/ spe_callback_error = 0x%x\n",
+ spe_num, si.result.spe_callback_error);
+ break;
+ case SPE_ISOLATION_ERROR:
+ printf("spe[%d] SPE_ISOLATION_ERROR w/ spe_isolation_error = 0x%x\n",
+ spe_num, si.result.spe_isolation_error);
+ break;
+ default:
+ printf("spe[%d] UNKNOWN STOP REASON (%d) w/ spu_status = 0x%x\n",
+ spe_num, si.stop_reason, si.spu_status);
+ break;
+ }
+ }
+ }
+ else {
+ fprintf(stderr, "handle_event: unexpected evt->events = 0x%x\n",
evt->events);
+ return;
+ }
+}
+
+//
+// This is the "main program" of the event handling thread
+//
+void
gc_job_manager_impl::event_handler_loop()
{
static const int MAX_EVENTS = 16;
@@ -386,6 +578,8 @@
spe_event_unit_t events[MAX_EVENTS];
+ printf("event_handler_loop: starting\n");
+
set_eh_state(EHS_RUNNING);
while (1){
@@ -394,18 +588,25 @@
case EHS_RUNNING: // normal stuff
if (d_shutdown_requested) {
set_eh_state(EHS_SHUTTING_DOWN);
- send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
}
break;
case EHS_SHUTTING_DOWN:
- break;
+ // FIXME wait until job queue is empty, then tell them to exit
+ send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
- case EHS_DEAD:
+ // FIXME when all workers have died, we return.
+ // Not sure yet if we see the SPE_EXIT events.
+ // If we do, we can keep track here.
+ // In the meanwhile...
+
+ set_eh_state(EHS_DEAD); // FIXME
+ printf("event_handler_loop: exiting\n");
return;
default:
set_eh_state(EHS_DEAD);
+ printf("event_handler_loop: exiting\n");
return;
}
@@ -417,8 +618,7 @@
// FIXME bail?
}
for (int i = 0; i < nevents; i++){
- print_event(&events[i]);
- // FIXME do the real work
+ handle_event(&events[i]);
}
}
}
Modified: gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
===================================================================
--- gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
2007-11-24 09:29:11 UTC (rev 7019)
+++ gnuradio/branches/developers/eb/gcell/src/lib/gc_job_manager_impl.h
2007-11-25 02:48:29 UTC (rev 7020)
@@ -41,7 +41,7 @@
};
struct worker_ctx {
- worker_state state;
+ volatile worker_state state;
spe_context_ptr_t spe_ctx;
pthread_t thread;
@@ -135,6 +135,7 @@
bool send_all_spes(uint32_t msg);
bool send_spe(unsigned int spe, uint32_t msg);
void print_event(spe_event_unit_t *evt);
+ void handle_event(spe_event_unit_t *evt);
friend gc_job_manager *gc_make_job_manager(const gc_jm_options *options);
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [Commit-gnuradio] r7020 - gnuradio/branches/developers/eb/gcell/src/lib,
eb <=