Advertisements
jump to navigation

ApprovaFlow: Using the Pipe and Filter Pattern to Build a Workflow Processor May 17, 2011

Posted by ActiveEngine Sensei in .Net, ActiveEngine, Approvaflow, ASP.Net, C#, Stateless, Workflow.
Tags: , , ,
add a comment

This is the third entry in a series of posts for ApprovaFlow, an alternative to Windows Workflow written in C# and JSON.Net.  Source code for this post is here.

What We’ve Accomplished Thus Far

In the last post we discussed how Stateless makes creating a lean workflow engine possible, and we saw that we were able to achieve two of our overall goals for ApprovaFlow.  Here’s what we accomplished:

Model a workflow in a clear format that is readable by both developer and business user. One set of verbiage for all parties.
•. Allow the state of a workflow to be peristed as an integer, string, etc.  Quickly fetch state of a workflow.

So we have these goals left:

•. Create pre and post processing methods that can enforce enforce rules or carry out actions when completing a workflow task.
•. Introduce new functionality while isolating the impact of the new changes. New components should not break old ones
•.Communicate to the client with a standard set of objects. In other words, your solution domain will not change how the user interface will gather data from the user.
•. Use one. aspx page to processes user input for any type of workflow.
•. Provide ability to roll your own customizations to the front end or backend of your application.

Our next goal will be Create pre and post processing methods that can enforce enforce rules or carry out actions when completing a workflow task.  We’ll use the Pipe and Filter Pattern to simplify the processing, and we’ll see that this approach not only streamlines how you handle variation in tasks, but also provides a clean method for extending our application abilities.


The advantage of breaking down the activities of a process is that you can create a series of inter-changeable actions.  There may be some cases where you want to re-order the order of operations at runtime and you can do so easily when the actions are individual components.

Before we proceed applying the Pipe and Filter pattern to our solution, we need to establish some nomenclature for our workflow processing.  The following chart lays out the vocab we’ll use for the rest of series.

Term Definition
State A stage of a workflow.
Trigger A message that tells the workflow how to change states.  If the state is “Phone Ringing” and the trigger is “Answer Phone” the new state for the phone would be “Off hook”.
StateConfig A StateConfig defines a pathway or transition from one state to another.  It is comprised of a State, the Trigger and the Target State.
Step A Step contains the workflow’s current State.  In the course of your workflow you may have many of the same type of steps differentiated by date and time.  In other words, when you workflow has looping capability, the workflow step for a state may be issued many times.
Answer The Step asks a question, waiting for the user response.  The answer the user provides is the trigger that will start the transition from one state to another.  The Answer becomes the Trigger that will change the State.
Workflow A series of Steps compromised of States, Triggers and their respective transition expressed as a series of State Configs.  Think of this as a definition of a process.
Workflow Instance The Workflow Instance is a running workflow.  The Steps of the Workflow Instance are governed by how the Steps are defined by a Workflow.

Essentially a framework for providing an extensible workflow system boils down to answering the following questions asked in this order:
• Is the user authorized to provide an Answer to trigger a change to the step’s State?
• Is a special data set required for this particular State that is not part of the Step properties?
• Is the data provided from the user sufficient / valid for triggering a transition in the Workflow Step’s State?
• Are there actions to be performed such as saving special data?
• Can the system execute custom actions based on the State’s Trigger?

This looks very similar to the Pipe and Filter pattern.  Every time a workflow processes a trigger, the questions we asked above must be answered.  Each question could be considered a filter in the pipe and filter scenario.

The five questions above become the basis for our workflow processor components.  For this post we’ll assume that all data will be simply fetched then saved with no special processing.  We’ll also assume that a Workflow Step is considered to be valid when the following elements are correctly supplied:

public bool IsValidForWorkflowTransition()
{
  return this.Enforce("Step", true)
                 .When("AnsweredBy", Janga.Validation.Compare.NotEqual, string.Empty)
                 .When("Answer", Janga.Validation.Compare.NotEqual, string.Empty)
                 .When("State", Janga.Validation.Compare.NotEqual, string.Empty)
                 .When("WorkflowInstanceId", Janga.Validation.Compare.NotEqual, string.Empty)
                 .IsValid;
}

public bool IsUserValidParticipant()
{
  return this.Enforce("Step", true)
    .When("Participants", Janga.Validation.Compare.Contains, this.AnsweredBy)
    .IsValid;
}

Our Workflow Processor will function in accordance with the Pipe and Filter pattern where no matter what type of workflow instance we wish to process, the questions that we listed above will be answered.  Later we will return to discuss points of where the workflow can execute actions respective to the workflow’s definition.

Workflow Processor Code In Depth

Well, how do we configure a Workflow Processor?  In other words, we want to process an actual workflow, but how will we know the workflow type and what to do?  Some of configuration steps were previewed in Simple Workflows With ApprovaFlow and Stateless and the same principles apply here with the Configure method.  Collect the States, the Triggers and the StateConfigs, load them into Stateless along with the current state and you are ready to accept or reject the Trigger for the next State.  The Workflow Processor will conduct these steps and here is the code:

public WorkflowProcessor ConfigureStateMachine()
        {
            Enforce.That(string.IsNullOrEmpty(this.step.State) == false,
                            "WorkflowProcessor.Confgiure - step.State can not be empty");

            this.stateMachine = new StateMachine(this.step.State);

            //  Get a distinct list of states with a trigger from state configuration
            //  "State => Trigger => TargetState
            var states = this.workflow.StateConfigs.AsQueryable()
                                    .Select(x => x.State)
                                    .Distinct()
                                    .Select(x => x)
                                    .ToList();

            //  Assing triggers to states
            states.ForEach(state =>
            {
                var triggers = this.workflow.StateConfigs.AsQueryable()
                                   .Where(config => config.State == state)
                                   .Select(config => new { Trigger = config.Trigger, TargeState = config.TargetState })
                                   .ToList();

                triggers.ForEach(trig =>
                {
                    this.stateMachine.Configure(state).Permit(trig.Trigger, trig.TargeState);
                });
            });

            return this;
        }

The Workflow Processor will need to know the current state of a workflow instance, the answer supplied, who supplied the answer, as well as any parameters that the filters will need fetching special data. This will be contained in the class Step.cs:

#region Properties

        public string WorkflowInstanceId { get; set; }
        public string WorkflowId { get; set; }
        public string StepId { get; set; }
        public string State { get; set; }
        public string PreviousState { get; set; }
        public string Answer { get; set; }
        public DateTime Created { get; set; }
        public string AnsweredBy { get; set; }
        public string Participants { get; set; }

        public List ErrorList;
        public bool CanProcess { get; set; }
        public IDictionary Parameters { get; set; }

        #endregion

        #region Constructors

        public Step()
            : this(string.Empty, string.Empty, string.Empty, string.Empty,
                   string.Empty, new DateTime(), string.Empty, string.Empty,
                    new Dictionary())
        { }

        public Step(string workflowInstanceId, string stepId, string state, string previousState,
                        string answer, DateTime created, string answeredBy, string participants,
                        Dictionary parameters)
        {
            this.WorkflowInstanceId = workflowInstanceId;
            this.StepId = stepId;
            this.State = state;
            this.PreviousState = previousState;
            this.Answer = answer;
            this.Created = created;
            this.AnsweredBy = answeredBy;
            this.Participants = participants;

            this.ErrorList = new List();
            this.Parameters = parameters;
        }

        #endregion

Our goal with the Workflow Processor is to accept the users answer, process actions, and create the next Step base on the new State all in one pass.  We will create a pipeline of actions that will always be invoked.  Each action or “filter” will be a component that performs and individual task, such as determining if the step is answered by the correct user.  Each filter will point to the subsequent filter in the pipeline, and the succession of the filters can change easily if we see fit.  All that is needed is to add the filters to the pipeline in the order we want.  Here is the class schema for the Pipe and Filter processing:

We’ll quickly find that the information regarding whether the result of an action or the condition of a Step will need to be accessible to each of the filters.  The class Step is the natural place to store this information, so we will include a property CanProcess to indicate that a filter should be invoked, as well a List<string> to act as an error log.  This log can be passed back to the client to communicate any errors to the user.  Note that the Step class has the Dictionary property named “Parameters” that allows a filter to pass data on to next filter in the sequence.

Setting Up the Pipeline

The sequence of filter execution is controlled by the order that the filters are registered.  The class Pipeline is responsible for registering and executing the chain of filters.  Here is the method Register that accepts a filter and retains it for future processing:

We also record the name of the filter so that we may interrogate the pipeline should we want to know if a filter has already been registered.

Pipeline.Register returns a reference to itself, so we can chain together commands fluently:

pipeline.Register(new ValidParticipantFilter())
                      .Register(new SaveDataFilter());

The class FilterBase is the foundation of our filter components.  As stated earlier, each component will point the subsequent filter in the filter chain.  You’ll note that the class also has a Register method.  This takes on the task of point the current filter to the next, and this method is called by the Pipeline as it registers all of the filters. Here is FilterBase:

public abstract class FilterBase : IFilter
{
  private IFilter next;

  protected abstract T Process(T input);

  public T Execute(T input)
  {
    T val = Process(input);

     if (this.next != null)
    {
      val = this.next.Execute(val);
    }

    return val;
  }

  public void Register(IFilter filter)
  {
    if (this.next == null)
    {
      this.next = filter;
    }
    else
    {
      this.next.Register(filter);
    }
  }
}

The method Execute accepts input of type T, and in the Workflow Processor instance this will Step.  Basically the Execute method is a wrapper, as we call the abstract method Process.  Process will be overridden in each filter, and this will contain the logic specific to the tasks that will be performed.  The code for a filter is quite simple:

public class ValidParticipantFilter : FilterBase
{
  protected override Step Process(Step input)
  {
    if (input.CanProcess)
    {
      input.Parameters["ValidFired"] = true;
      input.Parameters["FilterOrder"] += "ValidParticipantFilter;";

      input.CanProcess = input.IsUserValidParticipant();

    if(input.CanProcess == false)
    {
      input.ErrorList.Add("Invalid Pariticipant - " + input.AnsweredBy);
    }
  }

  return input;
}

Here we check to see if we can process, then perform specific actions if appropriate.  Given that the filters have no knowledge of each other, we can see that they can be executed in any order.  In other words you could have a Pipeline that had filters Step1, Step2, Step3 and you could configure a different pipeline to execute Step3, Step1, and Step2.

FilterRegistry Organizes Your Filters

Because we want to be able to use our filters in different successions we’ll need to keep a registry of what is available to use and provide the ability to look up or query different filters depending on our processing needs.  This registry will be created on application start up and will contain all objects of type FilterBase.  Later we’ll add the ability for the registry to load assemblies from a share, so that you can add other filters as simple plugins.  Information about each filter retained in a class FilterDefinition, and the FilterRegistry is merely a glorified List of the FilterDefintions. When we want to create a pipeline of filters we will want to instantiate new copies. Using Expressions we can create Functions that will be stored with with our definition for each filter type.  Here is FilterDefinition:

public class FilterDefinition
{
  public string Name { get; set; }
  public string FilterCategory { get; set; }
  public Type FilterType { get; set; }
  public Func> Filter{get; set;}

  public FilterDefinition() { }
}

We’ll invoke the compiled delegate at runtime to create our filter.  The method AddCreateFilter handles this:

private Func> AddCreateFilter(FilterDefinition filterDef)
        {
            var body = Expression.MemberInit(Expression.New(filterDef.FilterType));
            return Expression.Lambda>>(body, null).Compile();
        }

FilterRegistry is meant to be run once at start up so that all filters are registered and ready to use. You can imagine how slow it could become if every time you process a Workflow Step that you must interrogate all the assemblies.

Once you FilterRegistry has all assemblies registered you can query and create new combinations with the method GetFilters:

public IEnumerable> GetFilters(string filterNames)
        {
            Enforce.That(string.IsNullOrEmpty(filterNames) == false,
                            "FilterRegistry.GetFilters - filterNames can not be null");

            var returnFilters = new List>();
            var names = filterNames.Split(';').ToList();

            names.ForEach(name =>
            {
                var filter = this.filters.Where(x => x.Name == name)
                                           .SingleOrDefault();

                if (filter != null)
                {
                    returnFilters.Add(filter.Filter.Invoke());
                }
            });

            return returnFilters;
        }

Pipeline can accept a list of filters along with the string that represents the order of execution.  The method RegisterFrom accepts a reference to the FilterRegistry along with the names of the filters you want to use.

In the case of the Workflow Processor, we need to divide our filters into pre-trigger and post-trigger activities. Referring back to our 5 questions that our processor asks, question 1 – 3 must be answered before we attempt to transition the Workflow State, while steps 4-5 must be answered after the transition has succeeded. The method ConfigurePipeline in WorkflowProcessor.cs accomplishes this task:

public WorkflowProcessor ConfigurePipeline(string preProcessFilterNames, string postProcessFilterNames)
        {
            Enforce.That(string.IsNullOrEmpty(preProcessFilterNames) == false,
                            "WorkflowProcessor.Configure - preProcessFilterNames can not be null");

            Enforce.That(string.IsNullOrEmpty(postProcessFilterNames) == false,
                            "WorkflowProcessor.Configure - postProcessFilterNames can not be null");

            var actionWrapper = new ActionWrapperFilter(this.ExecuteTriggerFilter);

            this.pipeline.RegisterFromList(preProcessFilterNames, this.filterRegistry)
                            .Register(actionWrapper)
                            .RegisterFromList(postProcessFilterNames, this.filterRegistry);

            return this;
        }

Putting It all Together

A lot of talk and theory, so how does this all fit together?  The test class WorkflowScenarioTests illustrates how our processor works.  We are creating a workflow that implements the process for a Red Shirt requesting a promotion off a landing party.  You may recall that the dude wearing the red shirt usually got killed with in the first few minutes of Star Trek, so this workflow will help those poor saps get off the death list.  The configuration for the Workflow is contained within the file RedShirtPromotion.json.  There are a few simple rules that we want to enforce with the Workflow.  For one, Spock must review the Red Shirt request, but Kirk will have the final say.

Here is a sample from the class WorkflowScenarioTests.cs:

  string source = @"F:\vs10dev\ApprovaFlowSimpleWorkflowProcessor\TestSuite\TestData\RedShirtPromotion.json";
            string preFilterNames = "FetchDataFilter;ValidParticipantFilter;";
            string postFilterNames = "SaveDataFilter";

            var workflow = DeserializeWorkflow(source);

            var parameters = new Dictionary();
            parameters.Add("FilterOrder", string.Empty);
            parameters.Add("FetchDataFired", false);
            parameters.Add("SaveDataFired", false);
            parameters.Add("ValidFired", false);

            var step = new Step("13", "12", "RequestPromotionForm", "",
                                    "Complete", DateTime.Now, "RedShirtGuy", "Data;RedShirtGuy",
                                    parameters);
            step.CanProcess = true;

            var filterRegistry = new FilterRegistry();

            var processor = new WorkflowProcessor(step, filterRegistry, workflow);
            string newState = processor.ConfigurePipeline(preFilterNames, postFilterNames)
                        .ConfigureStateMachine()
                        .ProcessAnswer()
                        .GetCurrentState();

            Assert.AreEqual("FirstOfficerReview", newState);

Study the tests.  We’ve covered a lot together and admittedly there is a lot swallow in this post.  In our next episode we’ll look at how to the Pipe and Filter pattern can help us with extending our workflow processor’s capability without causing us a lot of pain.  Here’s the source code.  Enjoy and check back soon for our next installment.  Sensei will let you take it on out with this groovy theme (click play).

Advertisements

Simple Workflows With ApprovaFlow and Stateless April 2, 2011

Posted by ActiveEngine Sensei in .Net, ActiveEngine, Approvaflow, ASP.Net, C#, JSON.Net, New Techniques, Stateless.
Tags: , , , , ,
add a comment

This is the second in a series of posts for ApprovaFlow, an alternative to Windows Workflow written in C# and JSON.Net. Source code for this post is here.

Last time we laid out out goals for a simple workflow engine, ApprovaFlow, with the following objectives:
• Model a workflow in a clear format that is readable by both developer and business user. One set of verbiage for all parties.
•. Allow the state of a workflow to be peristed as an integer, string. Quicky fetch state of a workflow.
•. Create pre and post nprocessing methods that can enforce enforce rules or carry out actions when completing a workflow task.
•. Introduce new functionality while isolating the impact of the new changes. New components should not break old ones
•.Communicate to the client with a standard set of objects. In other words, your solution domain will not change how the user interface will gather data from the user.
•. Use one. aspx page to processes user input for any type of workflow.
•. Provide ability to roll your own customizations to the front end or backend of your application.

The fulcrum point of all we have set out to do with ApprovaFlow is a state machine that will present a state and accept answers supplied by the users. One of Sensei’s misgivings about Windows Workflow is that it is such a behemoth when all you want to implement is a state machine.
Stateless, created Nicholas Blumhardt, is a shining example of adhering to the rule of “necessary and sufficient”. By using Generics Stateless allows you to create a state machine where the State and Trigger can be represented by an integer, string double, enum – say this sounds like it fulfills our goal:

•. Allow the state of a workflow to be persisted as an integer, string. Quicky fetch state of a workflow.
Stateless constructs a state machine with the following syntax:

var statemachine =
       new StateMachine(TState currentState);

For our discussion we will create a state machine that will process a request for promotion workflow. We’ll use:

var statemachine =
       new StateMachine(string currentstate);

This could very easily take the form of

<int, int>

and will depend on your preferences. Regardless of your choice, if the current state is represent by a primitive like int or string, you can just fetch that from a database or a repository and now your state machine is loaded with the current state. Contrast that with WF where you have multiple projects and confusing nomenclature to learn. Stateless just stays out of our way.
Let’s lay out our request for promotion workflow. Here is our state machine represented in English:

Step: Request Promotion Form
  Answer => Complete
  Next Step => Manager Review

Step: Manager Review
  Answer => Deny
  Next Step => Promotion Denied
  Answer => Request Info
  Next Step => Request Promotion Form
  Answer => Approve
  Next Step => Vice President Approve

Step: Vice President Approve
  Answer => Deny
  Next Step => Promotion Denied
  Answer => Manager Justify
  Next Step => Manager Review
  Answer => Approve
  Next Step => Promoted

Step: Promotion Denied
Step: Promoted

Remember the goal Model a workflow in a clear format that is readable by both developer and business user. One set of verbiage for all parties? We are very close to achieving that goal. If we substitute “Step” with “State” and “Answer” with “Trigger”, then we have a model that matches how Stateless configures a state machine:

var statemachine = new StateMachine(startState);

//  Request Promo form states
statemachine.Configure("RequestPromotionForm")
               .Permit("Complete", "ManagerReview");

//  Manager Review states
statemachine.Configure("ManagerReview")
               .Permit("RequestInfo", "RequestPromotionForm")
               .Permit("Deny", "PromotionDenied")
               .Permit("Approve", "VicePresidentApprove");

Clearly you will not show the code to your business partners or end users, but a simple chart like this should not make anyone’s eyes glaze over:

State: Request Promotion Form
  Trigger => Complete
  Target State => Manager Review

Before we move on you may want to study the test in the file SimpleStateless.cs. Here configuring the state machine and advancing from state to state is laid out for you:

//  Request Promo form states
statemachine.Configure("RequestPromotionForm")
                    .Permit("Complete", "ManagerReview");

//  Manager Review states
statemachine.Configure("ManagerReview")
                     .Permit("RequestInfo", "RequestPromotionForm")
                     .Permit("Deny", "PromotionDenied")
                     .Permit("Approve", "VicePresidentApprove");

//  Vice President state configuration
statemachine.Configure("VicePresidentApprove")
                      .Permit("ManagerJustify", "ManagerReview")
                      .Permit("Deny", "PromotionDenied")
                      .Permit("Approve", "Promoted");

//  Tests
Assert.AreEqual(startState, statemachine.State);

//  Move to next state
statemachine.Fire("Complete");
Assert.IsTrue(statemachine.IsInState("ManagerReview"));

statemachine.Fire("Deny");
Assert.IsTrue(statemachine.IsInState("PromotionDenied"));

The next question that comes to mind is how to represent the various States, Triggers and State configurations as data. Our mission on this project is to adhere to simplicity. One way to represent a Stateless state machine is with JSON:

{WorkflowType : "RequestPromotion",
  States : [{Name : "RequestPromotionForm" ; DisplayName : "Request Promotion Form"}
    {Name : "ManagerReview", DisplayName : "Manager Review"},
    {Name : "VicePresidentApprove", DisplayName : "Vice President Approve"},
    {Name : "PromotionDenied", DisplayName : "Promotion Denied"},
    {Name : "Promoted", DisplayName : "Promoted"}
    ],
  Triggers : [{Name : "Complete", DisplayName : "Complete"},
     {Name : "Approve", DisplayName : "Approve"},
     {Name : "RequestInfo", DisplayName : "Request Info"},
     {Name : "ManagerJustify", DisplayName : "Manager Justify"},
     {Name : "Deny", DisplayName : "Deny"}
  ],
StateConfigs : [{State : "RequestPromotionForm", Trigger : "Complete", TargetState : "ManagerReview"},
     {State : "ManagerReview", Trigger : "RequestInfo", TargetState : "RequestPromotionForm"},
     {State : "ManagerReview", Trigger : "Deny", TargetState : "PromotionDenied"},
     {State : "ManagerReview", Trigger : "Approve", TargetState : "VicePresidentApprove"},
     {State : "VicePresidentApprove", Trigger : "ManagerJustify", TargetState : "ManagerApprove"},
     {State : "VicePresidentApprove", Trigger : "Deny", TargetState : "PromotionDenied"},
     {State : "VicePresidentApprove", Trigger : "Approve", TargetState : "Promoted"}
  ]
}

As you can see we are storing all States and all Triggers with their display names. This will allow you some flexibility with UI screens and reports. Each rule for transitioning a state to another is stored in the StateConfigs node. Here we are simply representing our chart that we created above as JSON.

Since we have a standard way of representing a workflow with JSON de-serializing this definition to objects is straight forward. Here are the corresponding classes that define a state machine:

public class WorkflowDefinition
{
        public string WorkflowType { get; set; }
        public List States { get; set; }
        public List Triggers { get; set; }
        public List StateConfigs { get; set; }

        public WorkflowDefinition() { }
}

public class State
{
        public string Name { get; set; }
        public string DisplayName { get; set; }
}

public class Trigger
{
        public string Name { get; set; }
        public string DisplayName { get; set; }

        public Trigger() { }
}
public class StateConfig
{
        public string State { get; set; }
        public string Trigger { get; set; }
        public string TargetState { get; set; }

        public StateConfig() { }
}

We’ll close out this post with an example that will de-serialize our state machine definition and allow us to respond to the triggers that we supply. Basically it will be a rudimentary workflow. RequestionPromotion.cs will be the workflow processor. The method Configure is where we will perform the de-serialization, and the process is quite straight forward:

  1. Deserialize the States
  2. Deserialize the Triggers
  3. Deserialize the StateConfigs that contain the transitions from state to state
  4. For every StateConfig, configure the state machine.

Here’s the code:

public void Configure()
{
    Enforce.That((string.IsNullOrEmpty(source) == false),
                            "RequestPromotion.Configure - source is null");

    string json = GetJson(source);

    var workflowDefintion = JsonConvert.DeserializeObject(json);

    Enforce.That((string.IsNullOrEmpty(startState) == false),
                            "RequestPromotion.Configure - startStep is null");

    this.stateMachine = new StateMachine(startState);

    //  Get a distinct list of states with a trigger from state configuration
    //  "State => Trigger => TargetState
    var states = workflowDefintion.StateConfigs.AsQueryable()
                                    .Select(x => x.State)
                                    .Distinct()
                                    .Select(x => x)
                                    .ToList();

    //  Assing triggers to states
    states.ForEach(state =>
    {
        var triggers = workflowDefintion.StateConfigs.AsQueryable()
                                   .Where(config => config.State == state)
                                   .Select(config => new { Trigger = config.Trigger, TargeState = config.TargetState })
                                   .ToList();

        triggers.ForEach(trig =>
        {
            this.stateMachine.Configure(state).Permit(trig.Trigger, trig.TargeState);
        });
    });
}

And we advance the workflow with this method:

public void ProgressToNextState(string trigger)
{
Enforce.That((string.IsNullOrEmpty(trigger) == false),
"RequestPromotion.ProgressToNextState – trigger is null");

this.stateMachine.Fire(trigger);
}

The class RequestPromotionTests.cs illustrates how this works.

We we have seen how we can fulfill the objectives laid out for ApprovaFlow and have covered a significant part of the functionality that Stateless will provide for our workflow engine.   Here is the source code.

%d bloggers like this: