Monday, June 3, 2013

Pipelines as a generic framework for multi-threading

In this post I'm going to talk about a new abstraction (to the C2C codebase, that is) available in the form of some framework classes, intended to provide a fairly generic mechanism for a lot of our multi-threading needs.

Fundamentally, turn processing in C2C consists of doing a once-per-turn action for each game entity.  In some cases this may involve doing the same for 'owned subservient entities'.  So, deconstructing from the highest level down (taking a 10,000 foot view), it looks something like this:

For each plot
    process plot
For each player
    for each city
        process city
    for each unit
        process unit

Each entity processed (a city say) will normally have a number of aspects to be updated in its processing, so for example the line 'process city' in the above breaks down to something a bit like this:

process invasions
process damage against attacking adjacent units
process growth
process culture
process build queue
process religion spread
...

Most (though not all) of the actions involved in processing a city (or any entity) primarily modify that entity, but not its peers (other cities in this case).  That mean that in principal we should be able to process different cities at the same time on multiple threads.

This is where pipelines come in.  The processing of an entity can be thought of as a set of independent (but serialized) steps.  Some of these steps have to be performed in set order even relative to other (peer) entities, whereas others are entirely independent and have no constraints on how they are ordered.  For example, cities can process their culture spread independently - it makes no difference if city A performs its culture spread before or after city B.  On the other hand this does not (fully) apply to production - suppose city A and city B both want to choose what to build - if they were to choose a Wonder then that choice would no longer be valid for the other city, so if the order of processing of city A and city B were non-deterministic the results would also be (which one got to build the Wonder might vary).  This means that some aspects of the processing have to synchronize in some way to ensure a deterministic order of processing (or else multiplayer out-of-sync errors will result).

Imagine a production line, where each station performs one of the actions involved in processing an entity.  Taking the city example, and its associated actions, as listed above, station one in the city example processes invasions, station 2 processes damage against attacking units, and so on.  The city entity passes down this production line, being updated at each stage.

Now in a traditional, physical production line, we would speed things up by keeping all the stations working in parallel, feeding new city entities into the line as each one moves along a stage.  This is one model that can be used to deploy multiple threads (one thread per production line stage), but it doesn't work very well for us, because to work efficiently it requires all stages to move at the same 'speed'.  If one slows down it slows the entire production line.  In our case some action (e.g. - choosing what to build in a city) are likely to be both much longer than others, and also potentially very variable in their execution time.

So another way to split the work up is to have lots of production lines operating in parallel, and a single entity progressing down each of them at their own rate.  For stages of the production line that don't need to synchronize, this allows each to keep working independently.  However, as pointed out above, some actions need synchronization (the building of a Wonder being one example).  In effect this introduces 'gates' on the production line such that no entity can progress past a 'gate' until ALL entities (on the parallel production lines) have caught up.

At first sight this requirement to synchronize means idle workers!  But we get round this by reducing our headcount!  Instead of having one employee run each production line we have a ticketing system like you sometimes see in grocery stores or post offices.  Each time an entity progresses to a new stage of our production line, it posts a ticket saying 'hey guys - I'm waiting for one of you to come and work on me'.  Workers take tickets in a priority order (earliest production line stage first, within that a fixed ordering of production lines) and man whatever station the ticket refers to, performing whatever changes are needed to move the entity through that step.  This means that (provided we have significantly more production lines than workers) most of the time when one line is waiting for others to catch up there will still be work to do on those others, and our workers won't spend all their time taking tea breaks.

The class framework

Ok, enough hokey analogies.  The framework (new source files, CvPipeline.cpp and CvPipeline.h) implements a generic pipeline model as below (public and overridable API only shown). Note - there will be a concrete example later!

Work items

CvPipelineWorkItem is an abstract class, representing an entity to be worked on (e.g. - a city in a city processing pipeline).  Its role is to hold state information about the entity being worked on as it progresses through the pipeline.  Here is the API:
 
typedef enum
{
 WORKITEM_STATE_NONE = -1,
 WORKITEM_STATE_QUEUED,
 WORKITEM_STATE_PROCESSING,
} PipelineWorkItemState;
 
// A pipeline work item - this records the state of the work item as
// it flows through the pipeline.  Appropriate subclasses should be
// defined for concrete items, sufficient to record their state
// changes as they flow through the pipeline stages
class CvPipelineWorkItem
{
public:
 CvPipelineWorkItem();
 virtual ~CvPipelineWorkItem();

 // Get the priority of this item (determines dequeue order
 // at each stage and may return different values at
 // different stages if necessary).  Must be deterministic
 // based on item state however to avoid OOS issues.
 // In cases where requeuing may result in different states
 // the priority used should be related to the item
 // identifier (e.g. - city id) to ensure deterministic
 // processing order in synchronous stages
 virtual int GetPriority(void) const = 0;

 // Requeue the workitem to the last non-synchronous stage
 // not yet completed
 // This is intended for use with optimistic locking systems
 // to handle a lock failure down-stream of the pipeline
 // stage that took the lock
 void Requeue(void);

 // Abandon a work item
 void Abandon(void);

 // Note that processing has completed (for the current
 // pipeline stage) on this work item
 void Complete(int iKey);

 // Get a queuing key - this should be queried before
 // processing and passed to Complete() on completion
 int GetKey(void) const;

 // Initialize the seed to use in the item's sync rand
 void SetRandSeed(unsigned long ulSeed);

 // A synchronous random number, which is OOS safe even in
 // multi-threaded pipeline usage.  Always use this in
 // pipeline stage processing rather than the regular
 // synced random number generator
 int GetSyncRand(int iNum, const char* pszLog);

 // Get the current workflow sate of this item
 PipelineWorkItemState GetState(void) const;
}; 

You just need to override GetPriority() to return some deterministic ordering between entities (e.g. - city id).  Obviously you may also need to add state variables to record specific state that is important for your usage also.

Pipeline Stages

The entities represented by the work items will flow through a number of pipeline stages (the stations on our production line).  Each stage is represented by an instance of the CvPipelineStage class (or more specifically one derived from it for a particular usage):
// Class representing a processing stage in a pipeline.  A stage
// may be synchronous or asynchronous.  A synchronous stage will
// not begin executing until all workflow items have been
// completely processed up to the end of the previous stage,
// whereas an asynchronous one will process items as soon as
// they are queued
// A stage may have multiple threads allocated to its processing
// if desired
// Define appropriate subclasses of this class to provide the
// processing
class CvPipelineStage
{
public:
 CvPipelineStage(const char* name,
                 bool bSynchronous,
                 int iMaxThreads);

 virtual ~CvPipelineStage();

 // Queue a work item into the stage
 void EnqueueWorkItem(CvPipelineWorkItem* item);

 // Process a work item
 virtual void ProcessWorkItem(CvPipelineWorkItem* item) = 0;
};

In this case you just need to override ProcessWorkItem() to provide code to actually do whatever work this stage is supposed to be responsible for.

Pipelines

A pipeline consists of a number of stages, so this the API basically just lets you string stages together and then run the resulting pipeline.

// A workflow pipeline the processes work items through
// multiple pipeline processing stages
class CvPipeline
{
public:
 CvPipeline();
 virtual ~CvPipeline();

 // Add a stage to the pipeline.  Must occur before the
 // pipeline's Begin() is called
 // Note - a stage is assumed to be owned by the pipeline
 // once added, and will be deleted by it when it is finished
 // with it.
 // FUTURE - encapsulate better within a stage factory
 void AddStage(CvPipelineStage* stage);

 // Queue a work item to be processed by the pipeline
 void EnqueueWorkItem(CvPipelineWorkItem* item);

 // Commence processing of queued work items
 void Begin(void);

 // Complete processing - will synchronously wait for all
 // work items to complete
 void End(void);
};

Concrete Example

As an initial use, I decided to multi-thread enable the choice of what to produce in cities, since this is a reasonably substantial block of time, that is largely independent between cities.  The fact that it both needs to consider multi-player synchronization issues, and is not entirely independent between cities (e.g. - the Wonder example cited earlier) makes it a good proof-of-concept to implement first.

Processing a city can be split into three conceptual stages as a first cut:
  1. Do some stuff before considering production
  2. Do the production
  3. Do the stuff that comes after production
This is just a way to slice up the existing code in CvCity::doTurn() into three parts so that we can attempt to parallelize the middle one.  So to begin with we'll need pipeline stages for the 'stuff' in (1) and (3) - this is completely mechanical - we can literally just cut&paste the code from the original monolithic CvCity:doTurn into three separate methods:
  • CvCity::doTurnPreProduction
  • CvCity::doTurnProduction
  • CvCity::doTurnPostProduction
The first and last of these we don't care about the contents of - they are just whatever CvCity::doTurn used to do on each side the production processing.  Each becomes a pipeline stage, wherein the override of CvPipelineStage::processWorkItem() is a call to one of the methods above.  Because we are not (currently) trying to parallelize those pieces, they will just be synchronous single-threaded stages.  For instance:

class CvCityTurnPreProductionPipelineStage : public CvPipelineStage
{
public:
    CvCityTurnPreProductionPipelineStage(const char* name) :
     CvPipelineStage(name, true, 1)
    {
    }
    virtual ~CvCityTurnPreProductionPipelineStage()
    {
    }

    virtual void ProcessWorkItem(CvPipelineWorkItem* item)
    {
        CvCityTurnPipelineWorkItem* cityItem = (CvCityTurnPipelineWorkItem*)item;
        int iKey = cityItem->GetKey();

        cityItem->getCity()->doTurnPreProduction();
        cityItem->Complete(iKey);
    }
};

The middle part is what we want to run across several threads, so clearly it has to involve an asynchronous stage, and since it's the choice of what to produce we are seeking to split across threads, the asynchronous stage must involve that.  However, actually making a choice (and adding it to the build queue) effects what is a valid choice in other cities (national units, wonders, etc.).  Furthermore the act of building something will influence the desirability of other things (potentially in other cities too), so we cannot actually enact the builds until everyone has made their initial choices.  This means we need to split production up into several sub-stages.  The actual list of sub-stages and their purpose is as follows:
  1. Enact current production - process the existing production queue and build whatever is at its head if we have enough production to do so.
  2. Current production sync point - this is a pseudo stage that is actually a no-op, but because it is tagged as synchronous no cities can get past this point until every city has completed the previous (current production) stage.  This ensures that the basis for choices (the current state of the empire) is consistent regardless of processing order.
  3. Choose production - this is the key asynchronous stage, which means all cities partake of it at once.  The choice they make is stored in the work item state for each city - it is NOT enacted in this stage, because to do so would impact other cities' choices, and introduce non-determinism that would result in multi-player OOS errors
  4. Enact production - this is a single-threaded synchronous stage, which means cities process through it one at a time in priority order, only after every city has made its initial choice.  At this point we may be in a situation where cities have made mutually incompatible choices (two have decided to go for the same Wonder say).  It is in this stage that we detect and handle this.  Because this stage processes cities one at a time strictly in priority order, it is deterministic which city that is trying to build the hypothetical wonder will process first (and succeed).  Assuming the choice is legal (we can push it onto the build queue) this stage then continues by processing the head item of the build queue - if it has sufficient production to complete it then the order is popped from the build queue and production adjusted.  Note that the popping of the order from the build queue merely adds whatever is to be built to a pending completion list for the city - it doesn't actually process its effect yet.  I'll come back to why this is necessary shortly.
    If we could not add the choice we had made to the build queue (second city trying to build the wonder in our canonical example) then we call CvPipelineWorkItem::Requeue() - this causes what is known as a 'pipeline stall' which means that once processing the current stage finishes, the overall pipeline processing will go back to the last asynchronous stage rather than forward to the next stage.  It also requeues (hence the name) the work item for the city that could not enact its build back on the previous asynchronous stage (choosing what to produce) so that it can choose again and get its second preference.  Because the choice stage is asynchronous, work on this will commence immediately, even while the remainder of the cities are still progressing through the next (enact) stage - it is this parallelism that means the enact stage cannot actually process the effects of the build (that would introduce non-determinism into the requeued choice processing).
    The enact stage also handles multiple production.  With multiple production enabled, if a build completes we also requeue back to the choice stage unless the build queue already contains further items (in which case we drain it first).  This is exactly the same requeue mechanism that is triggered by inability to enact a choice - it's just a way of getting a new choice made, whether because we exhausted the build queue in multiple production, or because a higher priority city pre-empted our choice before we got to enact it.
  5. Complete production - currently a synchronous single-threaded stage, but can probably be split into two and the expensive part made asynchronous in future - this stage actually processes the effects of the production that occurred (the building effects, instantiate units, etc.)
The code that used to be part of CvCity::doTurn was this simple loop:

 for (pLoopCity = firstCity(&iLoop); pLoopCity != NULL; pLoopCity = nextCity(&iLoop))
 {
  pLoopCity->doTurn();
 }

Turning this into a partially parallelized pipeline yields this code (still fairly simple, if a little more verbose):
 CvPipeline cityProcessingPipeline;
 CvPipelineStage* preProductionStage = new CvCityTurnPreProductionPipelineStage("PreProduction");
 CvPipelineStage* productionEnactCurrentStage = new CvCityTurnEnactCurrentProductionPipelineStage("EnactCurrentProduction");
 CvPipelineSyncPointStage* currentProductionSyncPoint = new CvPipelineSyncPointStage("CurrentProductionSyncPoint");
 CvPipelineStage* productionChooseStage = new CvCityTurnChooseProductionPipelineStage("ChooseProduction");
 CvPipelineStage* productionEnactNewStage = new CvCityTurnEnactNewProductionPipelineStage("EnactNewProduction");
 CvPipelineStage* productionCompletionStage = new CvCityTurnCompleteProductionPipelineStage("CompleteProdcution");
 CvPipelineStage* postProductionStage = new CvCityTurnPostProductionPipelineStage("PostProduction");

 cityProcessingPipeline.AddStage(preProductionStage);
 cityProcessingPipeline.AddStage(productionEnactCurrentStage);
 cityProcessingPipeline.AddStage(currentProductionSyncPoint);
 cityProcessingPipeline.AddStage(productionChooseStage);
 cityProcessingPipeline.AddStage(productionEnactNewStage);
 cityProcessingPipeline.AddStage(productionCompletionStage);
 cityProcessingPipeline.AddStage(postProductionStage);

 cityProcessingPipeline.Begin();

 for (pLoopCity = firstCity(&iLoop); pLoopCity != NULL; pLoopCity = nextCity(&iLoop))
 {
  cityProcessingPipeline.EnqueueWorkItem(new CvCityTurnPipelineWorkItem(pLoopCity));
 }

 cityProcessingPipeline.End();

Multi-Player Synchronization

IP-connected multi-player mode in Civ IV relies on the AIs on all machines making the same decisions when the AI turns are processed.  Any difference in decision taken (by AIs, automation, or governors) will result in OOS errors.  Since many decisions are made with a pseudo-random element (in order to provide varying experiences) this means:
  • The same set of random numbers must be used for the same decision points on all machines
  • The same state must exist at the point a decision is taken in regard to all factors that impact that decision
These constraints make the non-deterministic processing order between threads problematic.  The CvPipeline framework tackles this in two ways:
  1. By defining certain stages to be synchronous we can set up 'gates' at which all entities synchronize.  In the city example the split of pipeline stages ensures that the state present when build choices are made is always consistent, regardless of processing between threads in the asynchronous stages.  This is due to:
    1. The use of a synchronous stage between enactment of current production and new production choice (ensures the state in which all current production is complete is present before any choices are made)
    2. The separation of processing the build queue from processing the effects of actually building the resulting buildings/units into two synchronous stages (ensures that requeued choices never see the results of other enacted choices that might be processing at the same time)
  2. By defining a random number API against a work item, rather than using a global one.  This ensures that each city (in this case) generates its own deterministic random number stream, and hence is not impacted by random number rolls in the processing of other cities hat may be happening in parallel
Miscellaneous Notes for modders and adventurous users

Global Defines

The number of threads the choice stage of the city processing pipeline uses is defined by a global define in 'A_New_Dawn_GlobalDefines.xml' called 'NUM_CITY_PIPELINE_THREADS'.  This is currently set rather conservatively, to 2.  Setting this to 1 will ensure single-threaded operation.  I'm currently running with it set to 4.

Helper classes

In addition to the 3 framework classes, CvPipeline.h also defines a concrete helper class called 'CvPipelineSyncPointStage'.  This is just a no-op synchronous single-threaded stage that can be inserted to provide synchronization checkpoints.

Cautions

The big gotcha's in using this framework are:
  • Designing the necessary synchronization points so that the asynchronous bits can be OOS safe - this is probably what needs the most up-front thought in any new usage.  The best advice is probably to ensure that asynchronous stages themselves do not change the persistent state of anything  - mostly they should be performing expensive calculations necessary to decide what changes to make, and leave the actual making of those changes to down-stream synchronous stages.  That, coupled with appropriate assignment of work item priority and pipeline stalling via requeueing when necessary, should enable the result to be OOS safe.
  • Making sure that the stage processing function for any asynchronous stages is really thread-safe!  That includes everything it calls.  You'll probably find yourself having to dot critical sections around as you go, but the main piece of advice I can give here is to keep the asynchronous bit limited to fairly isolated code, that ideally only operates on the entity represented by the work item it is processing (anything else it modifies will require locks as well  as consideration of potential OOS effects)

Future development

I'm planning to extend the use of asynchronous stages to other elements of the city production pipeline, such as:
  • Processing added/removed buildings (after all decisions about WHAT is going to be built have been made, actually performing those changes in a non-deterministic order should be fine)
  • Worked tile choice
  • Maybe other things (like culture spread), if any look expensive enough on profiles to justify it
I'm also planning to make use of CvPipeline in some UnitAI processing.  Many of the unitAI routines are of the form:

for each plot within a certain range
  evaluate a cost function for performing an action at the plot
  if the action has some value evaluate a path to it for the unit
    if it is possible to path there
      normalize the cost by how long it will take to get there
      if the normalized cost is the best so far
        note this plot as the best target found
if we found a best target plot at all
  queue up orders to move there and enact a mission

In a few cases both the cost function evaluation and the pathing are expensive operations.  In such cases recoding this as a two stage pipeline (cost and path) with the costing stage running asynchronously could be beneficial.  Profiles suggest CvUnitAI::AI_attackCityMove() might be a good candidate.

Another potential use is in tender finalization, which basically asks every city to evaluate a cost function for each unit tender request, in order to decide where it is best to get them built.  City tender processing can almost certainly be done as an asynchronous stage in this.

No comments:

Post a Comment