Testing the Reactor pattern

By: on September 30, 2013

A good while ago I wrote a SIP stack. Like many network things, a SIP stack needs to keep track of multiple tasks – reading or writing from sockets, asking the user to respond to events, and so on. And so I na├»vely added a bunch of threads. And then I spent a few weeks trying to fix nasty bugs. And then I got fed up and rewrote all the logic. It turns out that I accidentally stumbled upon the Reactor pattern… but with something that I’ve not seen elsewhere.

At the heart of the SIP stack is the unwieldily named TIdTimerQueue. (“T” because class names in (Object) Pascal traditionally start with “T”, and “Id” because at one point I’d aimed to work within the Indy Project namespace, and never bothered to remove/change the prefix.) So forgetting about some of the details, the basic interface of a TIdTimeQueue looks like this (with non-essentials removed):

TIdTimerQueue = class(TIdRegisteredObject)
  procedure AddEvent(MillisecsWait: Cardinal;
                     Event: TIdWait); virtual;
  procedure AddListener(Listener: IIdTimerQueueListener);
  procedure RemoveListener(Listener: IIdTimerQueueListener);

where a TIdWait is

TIdWait = class(TIdRegisteredObject)
  function  Due: Boolean;
  procedure Trigger; virtual;

Essentially a TIdWait is a closure, representing some chunk of work we want to perform at some point in the future.

Normally when I see a framework that uses the Reactor pattern – like in eventmachine – I see the clock hidden behind the curtain. You schedule something, and it just runs. Only, how do you test this? Writing tests that pause makes for seriously slow test, while virtually speeding up the clock still leaves you with fragiletests. Since I was in the first flush of love with test driven development, not being able to test the heartbeat of the whole stack worried me a great deal. And then I thought, but if time makes testing things hard, why not turn the whole construct inside out? Separate the queueing/scheduling of events from the running of events. And thus were born the brothers TIdDebugTimerQueue and TIdThreadedTimerQueue. A TIdDebugTimerQueue exposes a bunch of introspective tools for tests to check whether things correctly scheduled certain events, examine the order of scheduled events, and so on:

TIdDebugTimerQueue = class(TIdTimerQueue)
    procedure AddEvent(MillisecsWait: Cardinal;
                       Event: TIdWait); override;
    function  EventAt(Index: Integer): TIdWait; override;
    function  EventCount: Integer;
    function  EventCountFor(WaitType: TIdWaitClass; CountSubclasses: Boolean = false): Integer;
    function  FirstEventScheduledFor(Event: Pointer): TIdWait;
    function  LastEventScheduled: TIdWait; overload;
    function  LastEventScheduled(WaitType: TIdWaitClass): TIdWait; overload;
    function  LastEventScheduledFor(Event: Pointer): TIdWait;
    procedure RemoveAllEvents;
    function  ScheduledEvent(Event: TIdWait): Boolean; overload;
    function  SecondLastEventScheduled: TIdWait;
    procedure Terminate; override;
    procedure TriggerAllEventsOfType(WaitType: TIdWaitClass);
    procedure TriggerAllEventsUpToFirst(WaitType: TIdWaitClass);
    procedure TriggerEarliestEvent; override;

    property TriggerImmediateEvents: Boolean read fTriggerImmediateEvents write fTriggerImmediateEvents;

TIdThreadedTimeQueue is a TIdTimerQueue “enlivened” by a thread that breathes life into it:

// The name comes from Smalltalk - a block is a chunk of code, possibly
// together with some variables. A closure, in other words. Well, this is as
// close as we get in Delphi.
TIdBlockRunnerThread = class(TIdBaseThread)
  Block: TIdThreadProc;
  procedure Run; override;
  constructor Create(Block: TIdThreadProc;
                     CreateSuspended: Boolean = True); reintroduce;

TIdTimerEmptyProc = procedure(Sender: TIdTimerQueue) of object;

// I provide a thread in which to execute my events. Obviously, all
// TIdWaits execute in BlockRunner's context.
TIdThreadedTimerQueue = class(TIdTimerQueue)
  BlockRunner:     TIdBlockRunnerThread;
  fOnEmpty:        TIdTimerEmptyProc;
  TerminatedEvent: TEvent;

  procedure NotifyOfTermination;
  procedure PossiblyNotifyOfEmpty;
  procedure Run;
  constructor Create(CreateSuspended: Boolean); override;

  procedure Resume; override;
  procedure Terminate; override;
  procedure TerminateAndWaitFor(WaitEvent: TEvent);

  property OnEmpty: TIdTimerEmptyProc read fOnEmpty write fOnEmpty;
procedure TIdBlockRunnerThread.Run;
  if Assigned(Self.Block) then

Nice and simple… but where’d that Block come from?

constructor TIdThreadedTimerQueue.Create(CreateSuspended: Boolean);
  Self.TerminatedEvent := nil;

  inherited Create(CreateSuspended);

procedure TIdThreadedTimerQueue.Resume;
  inherited Resume;

  if not Assigned(Self.BlockRunner) then
    Self.BlockRunner := TIdBlockRunnerThread.Create(Self.Run, Self.CreateSuspended);


And finally the core of actually running the events, the event loop:

procedure TIdThreadedTimerQueue.Run;
    while not Self.Terminated do begin

      if not Self.Terminated then



And there you have it: separating the concerns of scheduling events from the event loop allows you to easily test code that produces the events.


Leave a Reply

Your email address will not be published.

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>