jump to navigation

ApprovaFlow: Using the Pipe and Filter Pattern to Build a Workflow Processor May 17, 2011

Posted by ActiveEngine Sensei in .Net, ActiveEngine, Approvaflow, ASP.Net, C#, Stateless, Workflow.
Tags: , , ,
trackback

This is the third entry in a series of posts for ApprovaFlow, an alternative to Windows Workflow written in C# and JSON.Net.  Source code for this post is here.

What We’ve Accomplished Thus Far

In the last post we discussed how Stateless makes creating a lean workflow engine possible, and we saw that we were able to achieve two of our overall goals for ApprovaFlow.  Here’s what we accomplished:

Model a workflow in a clear format that is readable by both developer and business user. One set of verbiage for all parties.
•. Allow the state of a workflow to be peristed as an integer, string, etc.  Quickly fetch state of a workflow.

So we have these goals left:

•. Create pre and post processing methods that can enforce enforce rules or carry out actions when completing a workflow task.
•. Introduce new functionality while isolating the impact of the new changes. New components should not break old ones
•.Communicate to the client with a standard set of objects. In other words, your solution domain will not change how the user interface will gather data from the user.
•. Use one. aspx page to processes user input for any type of workflow.
•. Provide ability to roll your own customizations to the front end or backend of your application.

Our next goal will be Create pre and post processing methods that can enforce enforce rules or carry out actions when completing a workflow task.  We’ll use the Pipe and Filter Pattern to simplify the processing, and we’ll see that this approach not only streamlines how you handle variation in tasks, but also provides a clean method for extending our application abilities.


The advantage of breaking down the activities of a process is that you can create a series of inter-changeable actions.  There may be some cases where you want to re-order the order of operations at runtime and you can do so easily when the actions are individual components.

Before we proceed applying the Pipe and Filter pattern to our solution, we need to establish some nomenclature for our workflow processing.  The following chart lays out the vocab we’ll use for the rest of series.

Term Definition
State A stage of a workflow.
Trigger A message that tells the workflow how to change states.  If the state is “Phone Ringing” and the trigger is “Answer Phone” the new state for the phone would be “Off hook”.
StateConfig A StateConfig defines a pathway or transition from one state to another.  It is comprised of a State, the Trigger and the Target State.
Step A Step contains the workflow’s current State.  In the course of your workflow you may have many of the same type of steps differentiated by date and time.  In other words, when you workflow has looping capability, the workflow step for a state may be issued many times.
Answer The Step asks a question, waiting for the user response.  The answer the user provides is the trigger that will start the transition from one state to another.  The Answer becomes the Trigger that will change the State.
Workflow A series of Steps compromised of States, Triggers and their respective transition expressed as a series of State Configs.  Think of this as a definition of a process.
Workflow Instance The Workflow Instance is a running workflow.  The Steps of the Workflow Instance are governed by how the Steps are defined by a Workflow.

Essentially a framework for providing an extensible workflow system boils down to answering the following questions asked in this order:
• Is the user authorized to provide an Answer to trigger a change to the step’s State?
• Is a special data set required for this particular State that is not part of the Step properties?
• Is the data provided from the user sufficient / valid for triggering a transition in the Workflow Step’s State?
• Are there actions to be performed such as saving special data?
• Can the system execute custom actions based on the State’s Trigger?

This looks very similar to the Pipe and Filter pattern.  Every time a workflow processes a trigger, the questions we asked above must be answered.  Each question could be considered a filter in the pipe and filter scenario.

The five questions above become the basis for our workflow processor components.  For this post we’ll assume that all data will be simply fetched then saved with no special processing.  We’ll also assume that a Workflow Step is considered to be valid when the following elements are correctly supplied:

public bool IsValidForWorkflowTransition()
{
  return this.Enforce("Step", true)
                 .When("AnsweredBy", Janga.Validation.Compare.NotEqual, string.Empty)
                 .When("Answer", Janga.Validation.Compare.NotEqual, string.Empty)
                 .When("State", Janga.Validation.Compare.NotEqual, string.Empty)
                 .When("WorkflowInstanceId", Janga.Validation.Compare.NotEqual, string.Empty)
                 .IsValid;
}

public bool IsUserValidParticipant()
{
  return this.Enforce("Step", true)
    .When("Participants", Janga.Validation.Compare.Contains, this.AnsweredBy)
    .IsValid;
}

Our Workflow Processor will function in accordance with the Pipe and Filter pattern where no matter what type of workflow instance we wish to process, the questions that we listed above will be answered.  Later we will return to discuss points of where the workflow can execute actions respective to the workflow’s definition.

Workflow Processor Code In Depth

Well, how do we configure a Workflow Processor?  In other words, we want to process an actual workflow, but how will we know the workflow type and what to do?  Some of configuration steps were previewed in Simple Workflows With ApprovaFlow and Stateless and the same principles apply here with the Configure method.  Collect the States, the Triggers and the StateConfigs, load them into Stateless along with the current state and you are ready to accept or reject the Trigger for the next State.  The Workflow Processor will conduct these steps and here is the code:

public WorkflowProcessor ConfigureStateMachine()
        {
            Enforce.That(string.IsNullOrEmpty(this.step.State) == false,
                            "WorkflowProcessor.Confgiure - step.State can not be empty");

            this.stateMachine = new StateMachine(this.step.State);

            //  Get a distinct list of states with a trigger from state configuration
            //  "State => Trigger => TargetState
            var states = this.workflow.StateConfigs.AsQueryable()
                                    .Select(x => x.State)
                                    .Distinct()
                                    .Select(x => x)
                                    .ToList();

            //  Assing triggers to states
            states.ForEach(state =>
            {
                var triggers = this.workflow.StateConfigs.AsQueryable()
                                   .Where(config => config.State == state)
                                   .Select(config => new { Trigger = config.Trigger, TargeState = config.TargetState })
                                   .ToList();

                triggers.ForEach(trig =>
                {
                    this.stateMachine.Configure(state).Permit(trig.Trigger, trig.TargeState);
                });
            });

            return this;
        }

The Workflow Processor will need to know the current state of a workflow instance, the answer supplied, who supplied the answer, as well as any parameters that the filters will need fetching special data. This will be contained in the class Step.cs:

#region Properties

        public string WorkflowInstanceId { get; set; }
        public string WorkflowId { get; set; }
        public string StepId { get; set; }
        public string State { get; set; }
        public string PreviousState { get; set; }
        public string Answer { get; set; }
        public DateTime Created { get; set; }
        public string AnsweredBy { get; set; }
        public string Participants { get; set; }

        public List ErrorList;
        public bool CanProcess { get; set; }
        public IDictionary Parameters { get; set; }

        #endregion

        #region Constructors

        public Step()
            : this(string.Empty, string.Empty, string.Empty, string.Empty,
                   string.Empty, new DateTime(), string.Empty, string.Empty,
                    new Dictionary())
        { }

        public Step(string workflowInstanceId, string stepId, string state, string previousState,
                        string answer, DateTime created, string answeredBy, string participants,
                        Dictionary parameters)
        {
            this.WorkflowInstanceId = workflowInstanceId;
            this.StepId = stepId;
            this.State = state;
            this.PreviousState = previousState;
            this.Answer = answer;
            this.Created = created;
            this.AnsweredBy = answeredBy;
            this.Participants = participants;

            this.ErrorList = new List();
            this.Parameters = parameters;
        }

        #endregion

Our goal with the Workflow Processor is to accept the users answer, process actions, and create the next Step base on the new State all in one pass.  We will create a pipeline of actions that will always be invoked.  Each action or “filter” will be a component that performs and individual task, such as determining if the step is answered by the correct user.  Each filter will point to the subsequent filter in the pipeline, and the succession of the filters can change easily if we see fit.  All that is needed is to add the filters to the pipeline in the order we want.  Here is the class schema for the Pipe and Filter processing:

We’ll quickly find that the information regarding whether the result of an action or the condition of a Step will need to be accessible to each of the filters.  The class Step is the natural place to store this information, so we will include a property CanProcess to indicate that a filter should be invoked, as well a List<string> to act as an error log.  This log can be passed back to the client to communicate any errors to the user.  Note that the Step class has the Dictionary property named “Parameters” that allows a filter to pass data on to next filter in the sequence.

Setting Up the Pipeline

The sequence of filter execution is controlled by the order that the filters are registered.  The class Pipeline is responsible for registering and executing the chain of filters.  Here is the method Register that accepts a filter and retains it for future processing:

We also record the name of the filter so that we may interrogate the pipeline should we want to know if a filter has already been registered.

Pipeline.Register returns a reference to itself, so we can chain together commands fluently:

pipeline.Register(new ValidParticipantFilter())
                      .Register(new SaveDataFilter());

The class FilterBase is the foundation of our filter components.  As stated earlier, each component will point the subsequent filter in the filter chain.  You’ll note that the class also has a Register method.  This takes on the task of point the current filter to the next, and this method is called by the Pipeline as it registers all of the filters. Here is FilterBase:

public abstract class FilterBase : IFilter
{
  private IFilter next;

  protected abstract T Process(T input);

  public T Execute(T input)
  {
    T val = Process(input);

     if (this.next != null)
    {
      val = this.next.Execute(val);
    }

    return val;
  }

  public void Register(IFilter filter)
  {
    if (this.next == null)
    {
      this.next = filter;
    }
    else
    {
      this.next.Register(filter);
    }
  }
}

The method Execute accepts input of type T, and in the Workflow Processor instance this will Step.  Basically the Execute method is a wrapper, as we call the abstract method Process.  Process will be overridden in each filter, and this will contain the logic specific to the tasks that will be performed.  The code for a filter is quite simple:

public class ValidParticipantFilter : FilterBase
{
  protected override Step Process(Step input)
  {
    if (input.CanProcess)
    {
      input.Parameters["ValidFired"] = true;
      input.Parameters["FilterOrder"] += "ValidParticipantFilter;";

      input.CanProcess = input.IsUserValidParticipant();

    if(input.CanProcess == false)
    {
      input.ErrorList.Add("Invalid Pariticipant - " + input.AnsweredBy);
    }
  }

  return input;
}

Here we check to see if we can process, then perform specific actions if appropriate.  Given that the filters have no knowledge of each other, we can see that they can be executed in any order.  In other words you could have a Pipeline that had filters Step1, Step2, Step3 and you could configure a different pipeline to execute Step3, Step1, and Step2.

FilterRegistry Organizes Your Filters

Because we want to be able to use our filters in different successions we’ll need to keep a registry of what is available to use and provide the ability to look up or query different filters depending on our processing needs.  This registry will be created on application start up and will contain all objects of type FilterBase.  Later we’ll add the ability for the registry to load assemblies from a share, so that you can add other filters as simple plugins.  Information about each filter retained in a class FilterDefinition, and the FilterRegistry is merely a glorified List of the FilterDefintions. When we want to create a pipeline of filters we will want to instantiate new copies. Using Expressions we can create Functions that will be stored with with our definition for each filter type.  Here is FilterDefinition:

public class FilterDefinition
{
  public string Name { get; set; }
  public string FilterCategory { get; set; }
  public Type FilterType { get; set; }
  public Func> Filter{get; set;}

  public FilterDefinition() { }
}

We’ll invoke the compiled delegate at runtime to create our filter.  The method AddCreateFilter handles this:

private Func> AddCreateFilter(FilterDefinition filterDef)
        {
            var body = Expression.MemberInit(Expression.New(filterDef.FilterType));
            return Expression.Lambda>>(body, null).Compile();
        }

FilterRegistry is meant to be run once at start up so that all filters are registered and ready to use. You can imagine how slow it could become if every time you process a Workflow Step that you must interrogate all the assemblies.

Once you FilterRegistry has all assemblies registered you can query and create new combinations with the method GetFilters:

public IEnumerable> GetFilters(string filterNames)
        {
            Enforce.That(string.IsNullOrEmpty(filterNames) == false,
                            "FilterRegistry.GetFilters - filterNames can not be null");

            var returnFilters = new List>();
            var names = filterNames.Split(';').ToList();

            names.ForEach(name =>
            {
                var filter = this.filters.Where(x => x.Name == name)
                                           .SingleOrDefault();

                if (filter != null)
                {
                    returnFilters.Add(filter.Filter.Invoke());
                }
            });

            return returnFilters;
        }

Pipeline can accept a list of filters along with the string that represents the order of execution.  The method RegisterFrom accepts a reference to the FilterRegistry along with the names of the filters you want to use.

In the case of the Workflow Processor, we need to divide our filters into pre-trigger and post-trigger activities. Referring back to our 5 questions that our processor asks, question 1 – 3 must be answered before we attempt to transition the Workflow State, while steps 4-5 must be answered after the transition has succeeded. The method ConfigurePipeline in WorkflowProcessor.cs accomplishes this task:

public WorkflowProcessor ConfigurePipeline(string preProcessFilterNames, string postProcessFilterNames)
        {
            Enforce.That(string.IsNullOrEmpty(preProcessFilterNames) == false,
                            "WorkflowProcessor.Configure - preProcessFilterNames can not be null");

            Enforce.That(string.IsNullOrEmpty(postProcessFilterNames) == false,
                            "WorkflowProcessor.Configure - postProcessFilterNames can not be null");

            var actionWrapper = new ActionWrapperFilter(this.ExecuteTriggerFilter);

            this.pipeline.RegisterFromList(preProcessFilterNames, this.filterRegistry)
                            .Register(actionWrapper)
                            .RegisterFromList(postProcessFilterNames, this.filterRegistry);

            return this;
        }

Putting It all Together

A lot of talk and theory, so how does this all fit together?  The test class WorkflowScenarioTests illustrates how our processor works.  We are creating a workflow that implements the process for a Red Shirt requesting a promotion off a landing party.  You may recall that the dude wearing the red shirt usually got killed with in the first few minutes of Star Trek, so this workflow will help those poor saps get off the death list.  The configuration for the Workflow is contained within the file RedShirtPromotion.json.  There are a few simple rules that we want to enforce with the Workflow.  For one, Spock must review the Red Shirt request, but Kirk will have the final say.

Here is a sample from the class WorkflowScenarioTests.cs:

  string source = @"F:\vs10dev\ApprovaFlowSimpleWorkflowProcessor\TestSuite\TestData\RedShirtPromotion.json";
            string preFilterNames = "FetchDataFilter;ValidParticipantFilter;";
            string postFilterNames = "SaveDataFilter";

            var workflow = DeserializeWorkflow(source);

            var parameters = new Dictionary();
            parameters.Add("FilterOrder", string.Empty);
            parameters.Add("FetchDataFired", false);
            parameters.Add("SaveDataFired", false);
            parameters.Add("ValidFired", false);

            var step = new Step("13", "12", "RequestPromotionForm", "",
                                    "Complete", DateTime.Now, "RedShirtGuy", "Data;RedShirtGuy",
                                    parameters);
            step.CanProcess = true;

            var filterRegistry = new FilterRegistry();

            var processor = new WorkflowProcessor(step, filterRegistry, workflow);
            string newState = processor.ConfigurePipeline(preFilterNames, postFilterNames)
                        .ConfigureStateMachine()
                        .ProcessAnswer()
                        .GetCurrentState();

            Assert.AreEqual("FirstOfficerReview", newState);

Study the tests.  We’ve covered a lot together and admittedly there is a lot swallow in this post.  In our next episode we’ll look at how to the Pipe and Filter pattern can help us with extending our workflow processor’s capability without causing us a lot of pain.  Here’s the source code.  Enjoy and check back soon for our next installment.  Sensei will let you take it on out with this groovy theme (click play).

Comments»

No comments yet — be the first.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: