Monthly Archives: May 2010

Cross-platform WaitForMultipleObjects like function?

Sometimes we need to simultaneously wait to get notified on one of many things. For example, in a thread we could wait to get notiofied about main program requesting shutdown, or another thread leting us known that data to be processed is available. While in windows this could be elegantly solved with the WaitForMultipleObjects() call, doing it cross-platform is another story.
While in windows, WaitForMultipleObjects() call can wait on many object types (Threads, Sockets, condition variables, events, etc), sometimes it is enough to be able to wait only on one type, or only a subset of these. So maybe we could implement it only to signal that something happent?

A crude approach would be to use threads. For each object we are waiting on, we start a thread to wait for an event or timeout, and in the function itself, after threads are spawned, we simply wait to see if either one of threads notify us about it’s object being signaled, or we timeout. One of this approaches’s flaw is that for each object we are trying to wait on, uses a thread. But if we look carefully, we notice, this entire spawning threads,listening for getting notified fits perfectly into observer pattern. The caller needs to observe if something happens with the objects we are waiting on (they got signaled or not), in a given time interval, or if not, timeouts. The objects we are waiting on should also let known their observers if something happent to them (they got signaled)

So, considering wxWidgets, we could use it’s wxCondition class, which gives us a platform-independent condition variable. We need to have a object on which we could wait, which can be signaled,and which can be observed. wxCondition fits the bill,except .. noobservers. So let’s extend it, to allow objserver management:

class Event: public wxCondition
{
	protected:
		std::list< Event* > mObservers;
		wxMutex mObserverMutex;

	public:
		void addObserver(Event* e)
		{
		        wxMutexLocker observerLock(mObserverMutex);
		        mObservers.push_back(e);
		}

		void removeObserver(Event* e)
		{
		        wxMutexLocker observerLock(mObserverMutex);
		        mObservers.remove(e);
		}
}

We also need to keep track if event is signaled or not. wxCondition does not give us this ability. Also, by reading wxCondition documentation, we see wxCondition will need a wxMutex to work, so let add these:

class Event: public wxCondition
{
	protected:
		...
		bool mSignaled;
		wxMutex mMutex;

	public:
		Event()
		:wxCondition(mMutex),
		mSignaled(false)
		{
		}

		bool Signaled()
		{
			return mSignaled;
		}
}

For things to work as they should work, we need to rewrite Signal() to notify it’s observers and set mSignaled member variable, and also we should rewrite Wait() and WaitTimeout() to clear it, and of course these should work with the own mMutex mutex:

class Event: public wxCondition
{
	public:
		...
		void Signal()
		{
		    wxMutexLocker lock(mMutex);
		    wxMutexLocker observerLock(mObserverMutex);

		    std::list< Event* >::iterator it = mObservers.begin();
		    while(it != mObservers.end())
		    {
		            (*it)->Signal();
		            it++;
		    }

		    wxCondition::Signal();
		    mSignaled = true;
		}

		wxCondError Wait()
		{
		    mMutex.Lock();
		    mSignaled = false;
		    return wxCondition::Wait();
		    mMutex.Unlock();
		}

		wxCondError WaitTimeout(unsigned long timeout)
		{
		    mMutex.Lock();
		    mSignaled = false;
		    return wxCondition::WaitTimeout(timeout);
		    mMutex.Unlock();
		}
}

At this moment we have a Event class which can be observed, and while observed, if it is signaled, will also signal it’s observer. So, how can we use this to implement WaitForMultipleObjects() function?
First, let’s define some constants, to be in concordance with Windows code:

#define WAIT_OBJECT_0 0
#define WAIT_TIMEOUT 0x00000102L
#define WAIT_ABANDONED 0x00000080L
#define WAIT_FAILED 0xFFFFFFFF

WAIT_OBJECT_0+number of signaled object is returned when one of the objects we are waiting on gets signaled.
WAIT_OBJECT_TIMEOUT is returned if none of the objects we are waiting on are signaled.

WaitForMultipeObject() gets as parameter number of objects it should wait on, an array with the objects, a boolean stating if it should wait on all objects or not, after how many milliseconds it should timeout, and if it is aleratable or not.
Since waiting on all objects to get signaled is trivial to implement (we wait on all objects one after another until either all objects get signaled or we timeout), we will skip this part of implementation.
The alertable flag we are not interested in, it is used only to specify if APC-s can be run or not while we are waiting on objects. Since we are not implementing APCs, we can simply ignore this flag.

int WaitForMultipleObjectsEx(unsigned long numObjs, Event** objs,
                             bool waitAll, unsigned long timeoutMillis, bool alertable)
{

Note that we are passing an array of our Event class,not wxCondition!

Specification also states that if timeout period is 0, we should not wait on objects get signaled, but just check if they are signaled or not, and return. So let’s check that first inside the function:

	if(timeoutMillis == 0)
	{
		for(int i(0);i < numObjs;i++)
			{
				return WAIT_OBJECT_0+i;
			}
		}

		return WAIT_TIMEOUT;
	}

If we have timeout specified and different than 0, we should start waiting until either one of objects get signaled, or we spend more than specified time in function (and then we return with timeout). So, we mark when we started to wait, create a observer event, and add it as observer to all events we are waiting on:

	wxLongLong startmsec(wxGetLocalTimeMillis());

	// create a observer event and subscribe it as observer to all events we are waiting on
	Event observerEvent;

	for(int i(0);i < numObjs;i++)
	{
		objs[i] -> addObserver(&observerEvent);
	}

Now, if one of our observed events get signaled, the observer event also gets signaled. Now let’s use the remaining time from timeout to wait until observer gets notified or timeouts on wait:

	wxLongLong usedmsec = wxGetLocalTimeMillis()-startmsec;

	// how much we should sleep?
	usedmsec = timeoutMillis-usedmsec;

	wxCondError r = observerEvent.WaitTimeout(usedmsec.ToLong());

After either one of observed events get signaled, or the required timeout milliseconds pass, WaitTimeout() function returns. We now have to check the return code. If returns timeout, we also return timeout. If returns no error, it means one of objects we were waiting on got signaled, so we have to find it’s index in object array and return WAIT_OBEJCT_0+<index of object signaled>. In both cases, before returning, we should to deregister the observer event from the list of observed objects:

	if(wxCOND_TIMEOUT == r)
	{
		// if wait returned timeout, we also return timeout
		return WAIT_TIMEOUT;
	}
	else if(wxCOND_NO_ERROR == r)
	{
		// if wait succeeded, we unregister the observer event from observed events
		// meanwhile we check to see if object got signaled or not, to find out the
		// index of the signaled object.

		bool gotSignaledObject = false;

		// we should return WAIT_OBJECT_0 + index of signaled object, so let's start
		// at WAIT_OBJECT_0
		int res = WAIT_OBJECT_0;

		for (int i(0);i < numObjs;i++)
		{
			if (!gotSignaledObject && objs[i] -> Signaled())
			{
				// got the index of signaled object.
				gotSignaledObject = true;
				res += i;
			}

			objs[i] -> removeObserver(&observerEvent);
		}

		return res;
	}

	return WAIT_FAILED;
}

This is all. So, let’s see a simple application which test the functionality:

class MyApp: public wxApp
{
	public:
		bool OnInit();
} myApp;

// the events we will wait on
Event e1, e2;

class T : public wxThread
{
	public:
		void* Entry()
		{
			printf("Thread started\n");
			usleep(10000);
			printf("Event 1 signaled\n");
			e1.Signal();
			return 0;
		}
};

bool MyApp::OnInit()
{
	Event e;
	T* t = new T();
	t->Create();
	t->Run();

	Event* evs[]={&e1,&e2};
	printf("Waiting \n");
	int res = WaitForMultipleObjectsEx(2, evs,false, 2000,false);
	printf("Completed %d\n", res);

	return true;
}

IMPLEMENT_APP(MyApp)

Note: Code provided here is for educational purposes. It cannot be used in any projects, specially commercial, without my consent.