See what's going on with flipcode!




This section of the archives stores flipcode's complete Developer Toolbox collection, featuring a variety of mini-articles and source code contributions from our readers.

 

  Flexible Synchronization Objects
  Submitted by



Here's code to simplify multithreading by abstracting mutex, event, semaphore, and timer functionality into classes. Each is derived from the single waitable class so that it may be used in combination with any other waitable object. I think the simplest way to cover this code is on a by-class basis, so here goes:

-- waitable - This is the interface class for all of the other waitable objects. It defines a single public function (besides the destructor), called wait, that causes the calling code to block until the object is signalled. Since it is only an interface, its constructor is protected.

-- waitable_group - This class allows several waitable objects to be grouped together and waited on as a single object. Since any waitable object can be used in a group, different sub-types may be used in conjunction, including waitable_group objects themselves. A waitable_group can be made to signal when all of its sub-objects are signalled, or when only one is signalled.



-- mutex - A mutual-exclusion object. Locks and unlocks a segment of code. For mutexes, a lock is equivalent to a call to wait(). Mutexes are best used with sentries (one is provided) to allow automatic, exception-safe, locking and unlocking.

-- event - An object that can be signalled one or more times. signal(), pulse(), and reset() functions modify the state of the event. Note, the broadcast() and set() functions are currently unimplemented because they posed some difficulties and I'm rather lazy on occasion.

-- semaphore - Basically a mutex that can be locked more than once. For those of you unfamiliar with semaphores, they work like so: Start with an initial count. For each lock() (or wait()), decrement that count if it is positive, or block if it is zero. For each unlock, increment that count.

-- callback_timer - This class is used as a utility for the timer class, although it can be used in and of itself. It essentially allows you to assign a callback function or function object to be called whenever the timer period elapses. Note, this timer isn't more precise than about +-10 milleseconds (to my estimate). If you need higher precision, you'll have to implement it using a win32 multimedia timer.

-- timer - An object that is signalled when (or every time) a period of time has elapsed. This object uses the callback_timer class, and is thus not extremely precise.

Also of note, the entire set is enclosed in the 'utility' namespace, since I am morbidly afraid of name conflicts. Remove it if you don't want it. I also may have omitted a standard header file or two, which you'll have to forgive (or not).

Download Associated File: syncutil.txt (12,269 bytes)

/*

Editor's note: COTD Entry: Flexible Synchronization Objects by Justin Wilder [justin@bigradio.com]

*/


#include <windows.h> #include <memory>

#ifdef _MT

namespace utility {

// Defines an interface for synchronization devices that can be waited on. class waitable { public: virtual ~waitable() throw() { for (handle_vector::size_type n = 0; n < hvect.size(); ++n) ::CloseHandle(hvect[n]); }

// Waits for this object to be signalled. Returns a positive integer context value, or less than // zero for an error or timeout. The timeout value defaults to infinite (no timeout). int wait(unsigned int timeout = INFINITE) const throw() { DWORD ret = ::WaitForMultipleObjects(hvect.size(), &hvect[0], wait_all, timeout); if ((ret >= WAIT_OBJECT_0) && (ret < (WAIT_OBJECT_0 + hvect.size()))) return ret - WAIT_OBJECT_0; else if ((ret >= WAIT_ABANDONED_0) && (ret < (WAIT_ABANDONED_0 + hvect.size()))) return ret - WAIT_ABANDONED_0; else return -1; }

protected: typedef std::vector<HANDLE> handle_vector;

handle_vector hvect; bool wait_all;

waitable(handle_vector::size_type res = 0, bool wait_all = false) throw() : wait_all(wait_all) { hvect.reserve(res); } waitable(const waitable& c) throw() : hvect(c.dup_handles()), wait_all(c.wait_all) {} handle_vector dup_handles() const throw() { handle_vector ret(hvect.size()); for (handle_vector::size_type n = 0; n < hvect.size(); ++n) ret[n] = dup_handle(hvect[n]); return ret; } HANDLE dup_handle(HANDLE h) const throw() { if (!DuplicateHandle(GetCurrentProcess(), h, GetCurrentProcess(), &h, 0, FALSE, DUPLICATE_SAME_ACCESS)) return NULL; else return h; } void insert(const handle_vector& hvect) throw() { for (handle_vector::size_type n = 0; n < hvect.size(); ++n) this->hvect.push_back(hvect[n]); } void insert(const waitable& c) throw() { insert(c.dup_handles()); } void insert(HANDLE h) throw() { hvect.push_back(h); }

HANDLE get_handle(handle_vector::size_type index = 0) const throw() { assert(index < hvect.size()); return hvect[index]; }

handle_vector::size_type size() const throw() { return hvect.size(); }

friend class waitable_group; friend class timer; };

// Groups multiple waitable objects into a single waitable object, that will be // signalled when all component objects are signalled (wait_all == true), or when // any one component object is signalled (wait_all == false, the default). class waitable_group : public waitable { public: waitable_group(const waitable_group& c) throw() : waitable(c) {} explicit waitable_group(const waitable& w1, bool wait_all = false) throw() : waitable(w1.size(), wait_all) { insert(w1); } waitable_group(const waitable& w1, const waitable& w2, bool wait_all = false) throw() : waitable(w1.size() + w2.size(), wait_all) { insert(w1); insert(w2); } waitable_group(const waitable& w1, const waitable& w2, const waitable& w3, bool wait_all = false) throw() : waitable(w1.size() + w2.size() + w3.size(), wait_all) { insert(w1); insert(w2); insert(w3); } waitable_group(const waitable& w1, const waitable& w2, const waitable& w3, const waitable& w4, bool wait_all = false) throw() : waitable(w1.size() + w2.size() + w3.size() + w4.size(), wait_all) { insert(w1); insert(w2); insert(w3); insert(w4); } waitable_group(const waitable* wa, unsigned int len, bool wait_all = false) throw() : waitable(0, wait_all) { assert((len > 0) && (len < 1000)); for (unsigned int n = 0; n < len; ++n) insert(wa[n]); } };

// TODO: make this derive from waitable. // A lock sentry. The object it's constructed with i\s lock()'ed when the sentry is created, and // unlock()'ed when the sentry is destroyed. template <typename _LOCK_TYPE> class lock_sentry { public: lock_sentry(const _LOCK_TYPE& _l) : _l(_l) { _l.lock(); } ~lock_sentry() { _l.unlock(); } private: const _LOCK_TYPE& _l; };

// Implements mutual-exclusion object. class mutex : public waitable { public: typedef lock_sentry<mutex> sentry; mutex() throw() { insert(::CreateMutex(NULL, FALSE, NULL)); assert(get_handle() != NULL); } // Blocks until the mutex can be locked. void lock() const throw() { wait(); } // Unlocks the mutex. Every call to lock must be matched with a subsequent unlock. void unlock() const throw() { ::ReleaseMutex(get_handle()); } };

// A waitable event. class event : public waitable { public: event() throw() { insert(CreateEvent(NULL, FALSE, FALSE, NULL)); assert(get_handle() != NULL); } // Creates a copy of this event. Copied events can be used interchangeably (i.e. a function // call on one is the same as a function call on another). The underlying system object is only // destroyed once all copies of an event have been destroyed. event(const event& c) : waitable(c) {} // Releases exactly one thread waiting on this event. If no threads are waiting at the time of the signal, the // event will remain signalled until a thread waits on it (and is subsequently released). void signal() const throw() { SetEvent(get_handle()); } // Releases zero or one waiting thread. If no threads are waiting at the time of the pulse, // no threads are released. void pulse() const throw() { PulseEvent(get_handle()); } // Resets the event. If the event is currently signalled, this function sets it to unsignalled. void reset() const throw() { ResetEvent(get_handle()); }

// INCOMPLETE: it's hard to implement these two with win32 events. May need to create another // manual-reset event for this one to work. Not implemented because it's currently unused. // Releases every thread currently waiting on this event. void broadcast() const throw() { assert2(false, "waitable::broadcast() not currently supported"); } // Releases every thread currently waiting on this event, as well as every thread that will wait on it // in the future, until reset is called. void set() const throw() { assert2(false, "waitable::set() not currently supported"); }

friend inline unsigned int wait(const event* ea, unsigned int len) throw(); };

// Your typical semaphore. It can be waited on in conjunction with any other waitable object. class semaphore : public waitable { public: // Initializes the semaphore with the given number of locks, and the given number initially locked. // 'size' must be greater than zero and 'locked' cannot be greater than 'size'. semaphore(unsigned int size, unsigned int locked = 0) throw() { assert((size > 0) && (locked <= size)); insert(::CreateSemaphore(NULL, size - locked, size, NULL)); assert(get_handle() != NULL); } // Creates a copy of this semaphore. Copied semaphores can be used interchangeably (i.e. a function // call on one is the same as a function call on another). The underlying system object is only // destroyed once all copies of a semaphore have been destroyed. semaphore(const semaphore& c) : waitable(c) {} // Takes a lock on the semaphore. Blocks if none are currently available. This function has // the same effect as waitable::wait(), and can be used interchangeably with it. void lock() const throw() { wait(); } // Releases a lock or a waiting thread (a thread blocking on waitable::wait()) on the semaphore. // Every lock (or wait) must be followed by an accompanying call to unlock. void unlock() const throw() { ::ReleaseSemaphore(get_handle(), 1, NULL); } };

// A timer that calls a callback function (or function object) at a specified time. template <typename _CALLBACK> class callback_timer { public: typedef _CALLBACK callback_type; typedef unsigned int time_type;

explicit callback_timer(const callback_type& cb = callback_type()) throw() : cb(cb), started(false) { }

// Constructs the timer initially started. explicit callback_timer(time_type period, bool periodic = false, callback_type& cb = callback_type()) throw(std::runtime_error) : cb(cb), started(false) { if (!start(period, periodic)) throw std::runtime_error("Failed to start timer."); }

callback_timer(const timer& c) throw() : cb(c.cb), started(false) {}

~callback_timer() { stop(); }

// Starts the timer to be signalled in period milliseconds. If periodic is false (default) // the timer will only be signalled once, if periodic is true, it will be signalled every // period milliseconds. bool start(time_type period, bool periodic = false) throw() { if (started == true) return false; thread_args* args = new thread_args(period, absolute_time(), periodic, cb, halt_event, going_mutex); if (_beginthread(thread_entry, 0, args) == -1) { delete args; return false; } started = true; return true; }

// Stops the timer. void stop() throw() { halt_event.signal(); going_mutex.lock(); // Wait for timer thread to cease. started = false; going_mutex.unlock(); }

protected: callback_type cb; event halt_event; mutex going_mutex; bool started;

struct absolute_time { absolute_time() { _ftime(&t); } absolute_time(const absolute_time& c) { t.time = c.t.time; t.millitm = c.t.millitm; } absolute_time& operator=(const absolute_time& c) { t.time = c.t.time; t.millitm = c.t.millitm; return *this; } absolute_time& operator+=(const absolute_time& c) { t.millitm += c.t.millitm; if (t.millitm >= 1000) { t.time += 1; t.millitm -= 1000; } t.time += c.t.time; return *this; } absolute_time& operator-=(const absolute_time& c) { t.millitm -= c.t.millitm; if ((signed)t.millitm < 0) { t.time -= 1; t.millitm += 1000; } t.time -= c.t.time; return *this; } absolute_time operator+(const absolute_time& c) const { absolute_time temp(*this); return temp += c; } absolute_time operator-(const absolute_time& c) const { absolute_time temp(*this); return temp -= c; } operator time_type () const { return t.time * 1000 + t.millitm; } protected: _timeb t; };

struct thread_args { thread_args() {} thread_args(time_type period, const absolute_time& start, bool periodic, const callback_type& cb, const event& halt_event, const mutex& going_mutex) : period(period), start(start), periodic(periodic), cb(cb), halt_event(halt_event), going_mutex(going_mutex) {} thread_args(const thread_args& c) : period(c.period), start(c.start), periodic(c.periodic), cb(c.cb), halt_event(c.halt_event), going_mutex(c.going_mutex) {} time_type period; absolute_time start; bool periodic; callback_type cb; event halt_event; mutex going_mutex; };

static void thread_entry(void* context) { assert(context != NULL); std::auto_ptr<thread_args> args((thread_args*)context); args->going_mutex.lock(); while (true) { time_type time_left = (absolute_time() - args->start) % args->period; if (args->halt_event.wait(time_left) < 0) { args->cb(); if (args->periodic) continue; } break; } args->going_mutex.unlock(); } };

// A waitable timer. Win32 is supposed to have a waitable timer, but it only works for NT. // So I had to implement one myself. class timer : public waitable { public: typedef unsigned int time_type;

// Constructs the timer initially stopped. timer() throw() : t(callback(signal_event)) {}

// Constructs the timer initially started. explicit timer(time_type period, bool periodic = false) throw(std::runtime_error) : t(period, periodic, callback(signal_event)) {}

timer(const timer& c) : waitable(c), t(c.t), signal_event(c.signal_event) {}

// Starts the timer to be signalled in period milliseconds. If periodic is false (default) // the timer will only be signalled once, if periodic is true, it will be signalled every // period milliseconds. bool start(time_type period, bool periodic = false) throw() { return t.start(period, periodic); }

// Stops the timer. This function will not change the signal state of the timer (if it was signalled // prior to the call, it remains so). void stop() throw() { t.stop(); }

protected: class callback { public: explicit callback(const event& e) : e(e) {} callback(const callback& c) : e(c.e) {} void operator()() { e.signal(); } protected: event e; };

typedef callback_timer<callback> timer_type;

timer_type t; event signal_event; };

}; //namespace utility { #endif //_MT

The zip file viewer built into the Developer Toolbox made use of the zlib library, as well as the zlibdll source additions.

 

Copyright 1999-2008 (C) FLIPCODE.COM and/or the original content author(s). All rights reserved.
Please read our Terms, Conditions, and Privacy information.