Monthly Archives: February 2013

Projections 6: A use case of indexing

As we went through in Projections 5 : Indexing. The linkTo() function is capable of emitting pointers to another stream. This can allow you to break apart streams in order to change their partitioning and to allow fast indexing. In this post we will look at a use case of how you can use this functionality.

I have seen no less than 10 custom auditing systems for nservicebus. Everyone seems to want to build out their own custom auditing system. Most involve writing out the messages to a database table then querying off the database table to show results for say a given correlationid or a certain user. Projections and the Event Store can handle this workload quite easily with two small indexing projections.

Let’s assume we have hooked the audit queue in nservicebus and we are writing into the Event Store all the messages. When writing we write metadata containing the correlationId and the username.

{
correlationId : “guid”,
username : “greg”
}

We would then write our two indexing projections (correlationid and username).

fromAll().when({$any : function(s,e) { linkTo(e.metadata.correlationId, e); }}})

fromAll().when({$any : function(s,e) { linkTo(e.metadata.username, e); }}})

This will create a stream for every correlationId and a stream for every username. Once those are run you would then just go to http://node:port/streams/greg and you would see all the messages I am doing in the system (the ui even updates on its own as I am watching it). This is a very common and simple usage. It should also be noted that while here I am basically promoting a piece of metadata, the code is in javascript and you could do pretty much anything there (maybe the modulus of the combined string of four properties of the to 42?)

Projections vs rxjs vs etc

There was a pretty good question this morning when I checked comments on posts.

“What is the difference between projections and say rxjs”. We could as easily include any of the functional reactive libraries out there (bacon.js, flapjax, elm, rx, etc). We can also start going way back and looking at predecessors like CLM

Let’s start with the similarities both are JavaScript. Both are functional reactive APIs (this is why they look rather similar). They if you go way back trace their lineages to the same places. All have concepts like “streams”, “events”, and “behaviours” as these are underlying concepts. There are however some differences as well.

In particular the tools are handling CEP

The largest difference is most of those tools listed are only capable of being run from now forward (for some you could write persistent adapters, we had experimented with one for rx). They are small libraries for dealing with things like events in a javascript UI. You could build up everything around something like rxjs as well (eg we could switch out our definitions for ones matching theirs).

When we talk about projections, the streams are persistent (and distributed). We host v8 internally and use this API as our query language over persistent streams of events. A projection is not just for running from this point forward but for running against previous history as well. This is a very powerful idea (and not a new one).

Projections are the query language of the Event Store. When you write a projection you can specify that you want it to run for history and stop or you want it to run for the history and continue. The Event Store will also handle you doing this in a durable, consistent, and distributed way (over terrabytes of indexed data). A projection will survive a power outage as example if configured to do so.

I hope this helps to clear up some of the similarities and differences between the options.

Projections 5: Indexing

Now we can start getting to some of the interesting things in Projections. It was quite odd, as I was leading up to this post last night we got into a very long discussion about indexing inside the Event Store on twitter. Mike Brown noted that it would be really useful if we built lucene like indexing into the system so he could use the Event Store as an Audit Log that was easily searchable by things like correlationId and username. While the indexing is very different than something like lucene it is still quite possible.

In order to get into indexing we will need to introduce a new basic operation linkTo. linkTo works very similarly to emit except when you linkTo(“stream”, event) it does not actually write the event to the stream. It instead emits a pointer to the event to the stream. When you resolve “stream” you will see the event as if it was part of that stream but that is due to the resolution of the pointer (with the TCP API you can say whether or not you want to resolve links). Let’s try an example.


fromAll().when({$any : function(s,e) { linkTo(e.metadata.username, e); });

What this code does is it listens to All events in the system. As they happen it will produce links into streams based upon the user (providing username is in metadata). If we were to have:

Chat1 -> Greg Says hi
Chat2 -> John Says yo
Chat1 -> John Says Hey Greg
Chat2 -> Jill Says donuts!
Chat3 -> Jill Says anyone there?
Chat3 -> Greg Says sure

With this projection running we would be creating indexes. To start with there are three streams here (Chat1,Chat2, and Chat3) They look like:

Stream Chat1
———
Chat1 -> Greg Says hi
Chat1 -> John Says Hey Greg

Stream Chat2
———
Chat2 -> John Says yo
Chat2 -> Jill Says donuts!

Stream Chat3
———
Chat3 -> Jill Says anyone there?
Chat3 -> Greg Says sure

After our index is built we will have six streams (you can build an index at anytime). It will create three additional streams Greg, John, and Jill. They would look at follows.

Stream Greg
———
Chat1 -> Greg Says hi
Chat3 -> Greg Says sure

Stream John
———
Chat1 -> John Says Hey Greg
Chat2 -> John Says yo

Stream Jill
———
Chat2 -> Jill Says donuts!
Chat3 -> Jill Says anyone there?

If I were to point my browser at mydomain.com/streams/jill I would see all of Jill’s chat messages. This is generally how indexes get built up using the Event Store. One nice thing about this methodology is that as the result is a stream, you can listen to that stream like any other stream in the system to get updates for when new things are happening.

You will also notice if you look at an ATOMfeed that the URI to the original event does not change. As such if you are working over ATOM you likely will not pull the event down twice (it will be in cache). In our next post we will look at a quick use case for applying linkTo.

Projections 4: Event Matching

In the “intermission” post we jumped ahead quite a bit in terms of the complexity of the projection we were building. Let’s jump back into our progression of learning bits.

The projections we have used so far have used a method called when(). This method allows you to match functions back to types of events. Up until now that has been a single match but you can also use more than one.

fromStream('test').when({
                           Event1: f1,
                           Event2: f2
                        });

This defines that every time an event of type Event1 is seen the function f1 should be called with that event and function f2 for events of type Event2. This is a very useful construct when trying to build out projections that require the ability to handle many different types of events.

There are also some special matches defined.

$any: $any will match all events to your function. This is useful for example when you want to build an index for all events. We will get into how this works later but you can imagine if I wanted to build an index based upon the user that created the event (stream per user) then the function would want to look at all events in the system.

It is important to remember that as of now $any cannot be under in conjunction with other filters.

$init: $init gets called before any other handler. The job of $init is to return the initial state that will be passed to the rest of your functions. In the intermission post this handler was used to set up initial state so the other handlers did not have to. The usage can also be seen in looking at the post from the Projections 3

fromStream('$stats-127.0.0.1:2113').
    when({
        "$stats-collected" : function(s,e) {
              var currentCpu = e.body["sys-cpu"];
              if(currentCpu > 40) {
                   if(!s.count) s.count = 0;
                   s.count += 1;
                   if(s.count >= 3)
                        emit("heavycpu", "heavyCpuFound", {"level" : currentCpu,
                                                           "count" : s.count});
              }
              else
                   s.count = 0;
         }
    });

In this projection the line if(!s.count) s.count = 0 is being used to initialize the state if its the first time into the function. This could also be implemented as

fromStream('$stats-127.0.0.1:2113').
    when({
        "$init" : function(s,e) { return {"count":0},
        "$stats-collected" : function(s,e) {
              var currentCpu = e.body["sys-cpu"];
              if(currentCpu > 40) {
                   s.count += 1;
                   if(s.count >= 3)
                        emit("heavycpu", "heavyCpuFound", {"level" : currentCpu,
                                                           "count" : s.count});
              }
              else
                   s.count = 0;
         }
    });

The two will work in the same way. In our next post we will start looking at how indexing works in the event store.

Spring->Summer

Wow it feels weird to have a schedule that is pretty much completely locked in between now and August. I guess things change when you don’t only have your own schedule to worry about. A friend tells me that soon I will be planning nine months in advance.

I have updated from now through the summer most of my schedule on the ”where am I page”. I have not included everything but most is up there.

One big change we made is that we are going to spend a month in Australia. She seems to have this crazy idea of driving through the outback with kangaroos jumping everywhere (might be the movie!). While there I will stop in SydneyCQRS + ES class.

But the next six months looks pretty crazy! Hopefully we can make the best of it.

Projections intermission

Yesterday I was meeting with a company. We were going through some of
their problems and looking at whether the Event Store and in
particular stream querying might be a viable solution to any of their
problems. It turned out one of the problems was a perfect example of
where projections can make a big project tiny.

The Problem

People put “Bid Strategies” in through a web application. These
strategies are then used for blind live bidding of prices of some
commodity. Sometimes a bid is won. When a bid is won, sometimes
“something good” happens later. Users want real-time feedback to their
web browser on the bidding process as they are changing it. Thousands
of bids happen per second.

This is a very stereotypical problem in many systems. There is some
process that’s happening very quickly and we want to summarize that
process and give real time feedback to a user. This is a typical
example of where projections can really shine.

The Solution

Let’s start by defining the events that will be happening in this system

StrategyStarted { strategyId }
StrategyEnded { strategyId }
BidPlace {bidId } //probably some bid information as well
BidWon {bidId }
SomethingGoodHappenned {bidId}
IntervalOccured {time

We will integrate with the current system by writing these events into
the event store. There will be a stream for each strategy. As they are
not interested in a long period of time we will also set $maxAge on
the streams to one day (all data older than one day can be deleted).

Now we have the data coming into the Event Store. It can easily handle
a few thousand transaction per second. But how do we get summarized
real time data out to the user? This is where projections can come
into play. This projection will use a lot that is not yet discussed in
the projections series. Don’t let that scare you we will explain what
its doing.

fromCategory('strategy').
    foreachStream().
    when( {
                $init : function() {
                              return {
                                   "id" : 0,
                               "bidsPlaced" : 0,
                               "bidsWon" : 0,
                               "goodthings" : 0
                                   }
                       },
                StrategyStarted : function(s,e) { s.id = e.body.strategyid},
                BidPlaced : function(s,e) { s.bidsPlace += 1; },
                BidWon : function(s,e) { s.bidsWon += 1; },
                SomethingGoodHappenned : function(s,e) {
                                         s.goodthings++;
                                         linkTo('goodthings-' + s.id, e);
                                      },

                IntervalOccurred : function(s,e) {
                                          emit('liveresults-' + s.id,
                                                        {
                                                                "strategyid" : s.id,
                                                    "goodthings" : s.goodthings,
                                                    "bidsPlaced" : s.bidsPlaced,
                                                    "bidsWon" : s.bidsWon,
                                                    "time" : e.time
                                                         });
                                           s.bidsPlaced = 0;
                                           s.bidsWon = 0;
                                           s.goodthings = 0;
                                      }
             });

Wow now that’s a little more complicated than the other projections we
have been looking at! Let’s get into a bit of what it does.

fromCategory() -> this selects all streams that are in a category
foreachStream() -> this says to run this on all streams in the
category (in parallel!)

The when() is basically what we have looked at before but we have now
said we want it to run independntly against every strategy stream in
the system (each has their own state variable). It is then calculating
the counts of bids/bidswon/goodthings that happen within the interval
of time which is overall pretty basic logic.

At the end of the interval it puts a new message out to
liveresults-{strategy} with the results from the interval. For good
measure it also puts out all “good things” to a separate stream as
links. Basically the whole backend of the system is done now. It will
scale, is highly available, and fully durable.

Connecting the client

To hook up a client to this we just need to access the streams and
update the UI for the user somehow. Luckily there is an easy way to do
this. Every stream is an atom feed. Instead of coming up with some
super fancy custom way to get to the client. We can just use something
like jFeed. The client is now receiving all the events via javascript
and will just need to draw the results on the screen. How long does it
take to draw pretty results? Well thats up to the person doing it.

Summary

As you can see projections can be very useful in certain categories of
problems. In using them here a highly available scalable system
processing thousands of messages/second can be built and put into
production in less than an afternoon using nothing but javascript.

Projections 3: Using State

In Projections 2 we looked at creating a very simple projection that would analyze our statistics inside of the Event Store. The projection was:


fromStream('$stats-127.0.0.1:2113').
    when({
        "$stats-collected" : function(s,e) {
              var currentCpu = e.body["sys-cpu"];
              if(currentCpu > 40) {
                   emit("heavycpu", "heavyCpuFound", {"level" : currentCpu})
              }
         }
    });

This is a very common type of scenario we will find in event based systems. We can describe this as

“When this event happens and this information is on the event, trigger a new event to a different stream.”

Very often however its not just one event that will cause something to trigger. This is why the state variable exists. Very often we want to handle a question that is more akin to:

“When this event happens, then this event happens, then this event happens trigger an event to a different stream”.

Let’s try to change our problem from Projections 2 into one like this. I am only interested in highcpu scenarios where the cpu is over 40% for more that 3 samplings in a row. A single one could just be a fluke that happened. In order to do this type of query we will have to use our state variable to tie together multiple function calls.


fromStream('$stats-127.0.0.1:2113').
    when({
        "$stats-collected" : function(s,e) {
              var currentCpu = e.body["sys-cpu"];
              if(currentCpu > 40) {
                   if(!s.count) s.count = 0;
                   s.count += 1;
                   if(s.count >= 3)
                        emit("heavycpu", "heavyCpuFound", {"level" : currentCpu,
                                                           "count" : s.count});
              }
              else
                   s.count = 0;
         }
    });

Note: if you are trying this at home you may want to change how often statistics are sampled. You can set this with –stats-period-sec=SECONDS.

Now we use our state that gets passed from call to call to correlate multiple events together. If we get three or more samples with a CPU usage greater than 40% in a row then we will produce a message to the heavycpu stream that looks like:

{
  "eventStreamId": "heavycpu",
  "eventNumber": 3,
  "eventType": "heavyCpuFound",
  "data": {
    "level": 41.896265,
    "count": 6
  },
  "metadata": {
    "streams": {
      "$stats-127.0.0.1:2113": 8
    }
  }
}

This is a very powerful paradigm as the state variable allows me to bring state from one call to the next allowing me to correlate multiple events together. Another example of this might be I am looking for users in twitter that said the word “coffee” and “happy” within 5 minutes of mentioning “starbucks”. This query would be implemented in the same one as the one we just tried.

As food for thought. Could I now write another projection off of “heavycpu” that then looked for items with 5 measurements>80 and counts >10? You wouldn’t probably do this in practice as you could put that logic in the first projection but you can compose projections as well!

In our next post we will look at having multiple types of events.

Projections 2: A simple SEP projection

In the first post on projections we talked a bit about the theory behind projections. In this post we are going to try to create a very simple projection and talk about how it actually works.

To start with there is a very special stream inside of the event store. This stream represents statistics measurements that are happening internally. You can control how often they are taken via config. To find this stream in your system you can assuming you are bringing up a brand new node look at the “streams” tab when going to whatever port you set for http.

Hint: as projections are experimental as of the last release you need to enable them on the command line or in configuration when bringing up the Event Store. The command line is –run-projections.

For me (the default) stream for statistics is $stats-127.0.0.1:2113. If you want to see statistics data you can point your browser to 127.0.0.1:2113/streams/$stats-127.0.0.1:2113 and view the data in the stream. You should see something that looks like this:

 

streamviewed

If you click on one of the events you should be able to see the actual data from a statistics event entry. If you want to save some time you can see it on my gist. This is a json encoding of what the statistics measurement looks like. We are going to write a basic projection against that stream.


fromStream('$stats-127.0.0.1:2113').
    when({
        "$stats-collected" : function(s,e) {
              var currentCpu = e.body["sys-cpu"];
              if(currentCpu > 40) {
                   emit("heavycpu", "heavyCpuFound", {"level" : currentCpu})
              }
         }
    });

If you want to test this projection. Go to new projection and paste it in. Give it a name and select “emit enabled” and for mode put “continuous”. We will discuss in a later post what these things mean. The UI around this is currently being changed as well, we see its not the most intuitive.

This is a very simple projection. Its not very interesting. We will get to doing more interesting ones shortly. What it does is it listens to your statistics stream. This is setup when it says “fromStream” this is says “listen to all events in stream s”. It then defines a function that will be passed all $stats-collected events which happen to be the ones we saw in the statistics stream above.

The function declared checks the “sys-cpu” field of the event. If the cpu is higher than 40% it emits a new event out to another stream called “heavycpu”. If you are running the projection you can bring up your CPU usage then try navigating to the stream 127.0.0.1:2113/streams/heavycpu. You will see an event there of the form.

EventStreamId: heavycpu, EventNumber: 2, EventType: heavyCpuFound, Data: {
  "level": 40.9823952
}, Metadata: {
  "streams": {
    "$stats-127.0.0.1:2113": 49
  }
}

This is a very basic projection that is emitting a new event based on some criteria that is found. This is a very common pattern in event based systems (SEP). In the next post we will introduce state into our projection to look at how we can alert not just off a single event but off some group of events that are correlated which is another very common pattern in projections (SEP).

Refactoring and the Ubiquitous Language

I was reading this morning an interesting post to the domain driven design list. I began replying on the list but as my answer started to become more than one paragraph figured it might be better to drop the answer on the blog.

The basic scenario as described is:

What we used to call an ‘Actor’ on an Order is now going to be called.
‘Collaborator’ on a Form in the UI.Collaborator is not a bad word, its a quite good.
Should I now do search and replace in our code in order to use the new word ?
 

This is a very common scenario when working with a domain model over a period of time. What something is called changes. This is especially prevalent when we are working on a domain model where we start out with a naive viewpoint as is often the case in non-formalized models. In these cases very often in business experts are “learning as we are doing”.

The asker goes on to mention:

My Product Owner (PO), and team member would think I had been smoking crack or
worse if i came up with a suggestion like that.
Anyway We have used the old word Actor in a webservice contract, so that is not
easily changed. (I know i could introduce ‘Collaborator’ and by time make actor
obsolete.I know common sense would suggest not to get religious, and simply accept the
old word. How ever i do feel tempted to use the new word.Eric Evans uses a term like “Hammering the ubiquitos language”. Which i
understand as “Its really really important to use the language/ concepts of the
Project Owner/Users”

A few people have jumped in with responses. Many suggest that doing the refactor is ok.

Dan Haywood mentions:

I have to say that we change names all the time.  But then, I work with naked objects-pattern [1] frameworks [2], [3] where there are very few DRY violations… so it’s no great hardship.
 

There is however a subtlety in the question that seems have been missed by many and it is actually a huge problem when dealing with the Ubiquitous Language. Going through and renaming Actors to Collaborators inside of the model is not really a big deal and should be done. However the asker mentions as well a webservice contract!

The webservice contract is a wire protocol. Generally we do not just “refactor” wire protocols. The reason why is that doing so is a breaking change for clients. If we broke the wire protocol we would as an example need a big bang release of all consumers of the protocol as well as ourselves. This causes quite a dilemma when talking about refactors for the purpose of the ubiquitous language. Wire protocols are harder to change and require more planning than internal models. This is an issue that is too often overlooked.

The first question that needs to be asked is. By default will we be upgrading all clients concurrently with the backend anyways? If so then no big deal. I know of many frameworks that use this model. This does however necessitate a big bang release. If you have many disparate clients or are risk adverse for deployments this may not be possible. I believe the naked objects framework mentioned by Dan takes this approach though after seeing some presentations they may be on the third option today.

Note that very often people think incorrectly about this question. If my domain is used in a web server as example then I blue/green deploy it would seem I would be ok (client is the web server). But what if some received a page from the first then posted to the second. This may be an entirely acceptable business risk but remember it exists.

The next question is. Do we want to versionize our wire protocol. I could always take our old method that includes actor and create a new method that then uses collaborator. Old clients could still use the old one new clients would use the new one. We could even deprecate the old method after some period of time. You can imagine however with non-matching UIs and various versions around this can get confusing both for the dev teams and for the developers. We have to be very careful when dealing with messaging contracts.

A third option exists. The third option is not something that we can do at the point in time of the question as we would have had to make a decision much sooner in the development process. What if instead of using webservice contracts we had decided to use a HATEOAS RESTful api? With such a client I could make changes like this without affecting any of my clients.

It is important to remember that data on the inside is often different than data on the outside. A HATEOAS based api will often help isolate yourself from these types of changes in the future. There are however very few people using HATEOAS based apis over domain models. Instead largely due to tooling that is being given to us “restful” tends to mean “expose my objects as resources”. The question above is the exact situation that a HATEOAS based api can serve well. Keep these kinds of scenarios in mind early when choosing an api over your model!

Projections 1: The Theory

Cross Posted from geteventstore.com/blog

Over at http://geteventstore we have 1.0’ed the Event Store as an Event Store (eg storing/retreiving events/multinode version/etc) but in the process we did not release projections. They are still marked as experimental. Projections are however getting close to ready so its time to write a bit about them as the official documentation takes shape.

Through the series we will get fairly deep into their usage but for many the best way to understand them to understand the underlying theory so let’s start there.

Functions

When we talk about a projection off of an event stream basically what we are describing is running a series of functions over the stream. We could as our simplest possible projection have a projection that runs through all of the events in the system (in order) passing current state from one function to the next. The simplest of these could be to count how many events there are.

var count = function(state, event) {
return state + 1;
}

Then to run it across all of the events (let’s imagine we had three) in our system we would end up with a pattern like:

var result = count(count(count(count, 0), event1), event2), event3)

This operation is known in the functional world as a higher order function -> left-fold. It is a very powerful construct that is useful in solving many problems. When we talk about Event Sourcing, current state is a left-fold of previous beahviours.

Projections at their heart allow for the specializing of a generalized function. Their underlying model is that of a left-fold. Looking at the above left fold there are a couple of pieces that we could possibly change. The generic version of this function would look something like:

var result = transform(f(f(f(initial()), e), e), e)

Let’s discuss briefly what the three main parts of this function are.

f(state, event) => state – is the function that is run over the series of events.

transform(state) => result – is a function that can transform the state to the form of result you want to receive

initial() => state – returns the initial state you want passed into your left-fold.

Taking one example (don’t worry we will go through more in later posts!): f(state, event) => state – gets specialized through a pattern match:

var f1 = function(state,event) {}

when({
$any : f //runs for all
})

or

var f1 = function(state,event) {}
var f2 = function(state,event) {}

when({
Something : f1, //match all Something
SomethingElse f2: //match all SomethingElse
})

To see the differences between the two, let’s imagine that we had a stream of events.

Something
SomethingElse
Something

The first would call as:

f1(f1(f1(nil, Something), SomethingElse),Something)

The second would call as

f1(f2(f1(nil, Something), SomethingElse),Something)

If we were to change the stream to:

Something
SomethingElse
SomethingElse

The second would end up as

f2(f2(f1(nil, Something), SomethingElse),SomethingElse)

Again don’t worry too much about the details. We will have a whole post on “when” and pattern matching

Event Selection

The second part of how projections work theoretically is the controlling of which events get passed to which definition. In our first example we passed all events in the system through a single left-fold. While many projections due in fact work exactly this way many are only interested in certain events not all events in the entire event store.

fromAll() would read all of the events in the entire event store. fromStream(‘stream’) tells projections to take whatever definition you have and run it against only the events within the stream ‘stream’. Where as fromStreams(‘s1’, ‘s2’, ‘s3’) joins the events from three streams and then runs the left-fold of the resulting stream.

Don’t worry if you don’t completely “get” projections after this post, there is a lot to them and we will be delving into a lot more detail and use cases of them. Overall however projections have a relatively simple underlying theory associated with them. Hopefully by understanding the underlying theory it will make the rest of the posts in the series easier to understand.