Monthly Archives: March 2013

Projections 8: Internal Indexing

In the last post we introduced the new concept of fromStreams([]) that will join multiple streams into a single stream for your fold to be run against. We also introduced options and two options that can be used to control the re-ordering behaviour in a distributed environment. In this post we are going to look at how this concept is used internally in the Event Store in order to provide indexing of queries.

Let’s propose a scenario where you have 80,000,000 events in the Event Store. You wish to write a query that looks like:

        Foo: function(s,e) { ... },
        Bar: function(s,e) { ... },
        Baz: function(s,e) { ... } 

In the store there are a total of 500 Foo events, 100 Bar events, and 1000 Baz events. This query would be very very expensive to run as it would need to look through 80,000,000 events in order to run on the 1600 events that you are interested in (the equivalent of a table scan in sql). This query though can (and is) indexed internally!

If you look in options there is another option called “useEventIndexes” that defaults to true. If you enable this option, this query will only look at 1600 events and ignore the other 80,000,000!

This works using the same principles we have been learning in the blog post series. If you look there is a special projection in the Event Store called $by_event_type. This projection is at this point implemented internally but if it were written in javascript it would look something like:

        $any : function(s,e) { linkTo("$et-" + e.type, e); }

In other words the projection will create a stream per event type named $et-{eventtype} that contains links to all events of that type. This standard projection can then be used in conjunction with other projections to provide indexing. Consider our original projection:

        Foo: function(s,e) { ... },
        Bar: function(s,e) { ... },
        Baz: function(s,e) { ... } 

This can now be converted into an indexed projection using fromStreams(). We can convert it into:

fromStreams(["$et-Foo", "$et-Bar", "$et-Baz"]).
        Foo: function(s,e) { ... },
        Bar: function(s,e) { ... },
        Baz: function(s,e) { ... } 

This is done by default. It will then read only from the three joined streams listed! not from all of the events in the system, in our example it will only see 1600 events, not the 80,000,000 that exist. This is how you can combine some of the ideas we have been looking at to provide indexing in various forms for your projections.

Projections 7: Multiple Streams

Up until this point we have only used two event selection methods for our projections. We have used fromStream(‘stream’) which will select all of the events in a stream and we have used fromAll() which selects all events in the system. There is another quite useful selection that will move us from SEP (Simple Event Processing) to CEP (Complex Event Processing). This is the ability to select between multiple streams.

To select from multiple streams we use fromStreams([‘stream1’, ‘stream2’, ‘stream3’]) what this will do is bring together the events from multiple streams. This can also be called a Join operation. fromStreams([‘stream1’, ‘stream2’, ‘stream3’]) will take the three streams (stream1, stream2, stream3) and produce a single output stream containing events from all three that the projection will be run against.

This operation while seemingly simple is actually quite difficult. Generally the partition point of the system is streams. If you are running in a single node group (not partitioned, either the replicated group or single node) then this projection will have assurances that the events will come in perfect order (even when being processed in real time). But what happens if this is distributed?

You can imagine stream1 lives on one machine and stream2 lives on another machine. This could cause problems with ordering due to situations where the machines are partitioned from each other. Luckily projections allows you to solve this problem with some options that you can set. In particular you can add options

reorderEvents: true,
processingLag: 500 //time in ms

This will tell projections to introduce a delay to allow for the reordering of events from different partitions. This allows a much better handling for fromStreams in a distributed scenario. A buffer of processingLag milliseconds will be introduced to allow for the reordering of events before they are run through the projection.


Startups and TDD

Yesterday Uncle Bob put up a post on using TDD in start up environments “The Startup Trap” its a good read. Check it out.

Nate soon after posted:


I wanted to write a few comments about TDD in startups. Good code is the least of the risks in a startup. Sorry but worrying about technical debt making us go slower when we have a two month runway and likely will pivot four times to quote Bob.

Captain Sulu when the Klingon power moon of Praxis exploded and a young Lieutenant asked whether they should notify Star-Fleet: “Are you kidding?” ARE YOU KIDDING?

One of the biggest mistakes in my career was building something appropriate…

It was just after Hurricane Katrina. I was living in a hotel. An acquaintance asked me if we could hack together this business idea they had for a trading system. He had the knowledge but not the know how. I said sure, hell I was living in a hotel!

In less than two weeks we had an algorithmic trading system. It was a monstrosity of a source base. It was literally a winforms app connected directly to the stock market. UI interactions happened off events directly from the feed! Everything was in code behinds (including the algos!) Due to the nature of the protocol if anything failed during the day and crashed the app (say bad parsing of a string?) the day for the trader was over as they could not restart.

But after two weeks we put it in front of a trader who started using it. We made about 70-80k$ the first month. We had blundered into the pit of success. A few months later I moved up with the company. We decided that we were going to “do things right”. While keeping the original version running and limping along as stable as we could keep it while adding just a few features.

We ended up with a redundant multi-user architecture nine months or so later, it was really quite a beautiful system. If a client/server crashed, no big deal just sign it back on, multiple clients? no problem. We moved from a third party provider to a direct exchange link (faster and more information!). We had > 95% code coverage on our core stuff, integration suites including a fake stock exchange that actually sent packets over UDP so we could force various problems with retry reconnects etc/errors. We were very stable and had a proper clean architecture.

In fact you could say that we were dealing with what Bob describes in:

As time passes your estimates will grow. You’ll find it harder and harder to add new features. You will find more and more bugs accumulating. You’ll start to parse the bugs into critical and acceptable (as if any bug is acceptable!) You’ll create modules that are so fragile you won’t trust yourself, or anyone else, to modify them; so you’ll work around them. You’ll build a festering pile of code that, with every passing week, requires more and more effort just to keep running. Forward progress will slow and falter. It may even reverse as each release becomes buggier and buggier, and less and less stable. Catastrophes will become more and more common as errors, that should never have happened, create corruptions and damage that take huge traunches of time to repair.

We had built a production prototype and were suffering all the pain described by Bob. We were paying down our debt in an “intelligent” way much the way many companies that start with production prototypes do.

However this is still a naive viewpoint. What really mattered was that after our nine months of beautiful architecture and coding work we were making approximately 10k/month more than what our stupid production prototype made for all of its shortcomings.

We would have been better off making 30 new production prototypes of different strategies and “throwing shit at the wall” to see what worked than spending any time beyond a bit of stabilization of the first. How many new business opportunities would we have found?

There are some lessons here.

1) If we had started with a nine month project it never would have been done

2) A Production Prototype is common as a Minimum Viable Product. Yes testing, engineering, or properly architecting will likely slow you down on a production prototype.

3) Even if you succeed you are often better to stabilize your Production Prototype than to “build it right”. Be very careful about taking the “build it right” point of view.

4) Context is important!

Never underestimate the value of working software.

Why not pluralsight

I have had a lot of people ask me why I am not putting my online classes into PluralSight. The answer is quite simple, their business model does not align well with the type of content being discussed, I have had these discussions with them before.

The pluralsight default royalty model is setup upon a fractional payment payment system based upon views % during a period. This works wonderful for commoditized trainings such as “learn C# quickly” especially if its primarily screencasts. It makes just next to no sense to put up 4 days worth of content.

Think about your viewing habits. Are you going to watch one episode at a time over the course of 12 months or are you going to spend a weekend/week during gym time etc and watch all of the content? So let’s just imagine that the way it worked was based upon viewing % during the month the royalties would end up being +- $10/viewer on average. Factor in 15k-20keu in order to film/edit 20-25 hours of content and you are saying that maybe you hope to break even at 2500 people viewing.

For a short commoditized class such as “Introduction to iOS programming” things would make total sense, wide audience and low production cost, high relevance. For something like a 3 day class + extra material it makes almost no sense.

That said they do now say that “things are open for negotiation” so maybe its worth talking to them.

Ensuring Writes – Multi-Node Replication

We have gotten the question very often how the multi-node version works with replication. After typing it up about five times in email I figured it might be valuable to do a longer write-up that we can suck into our documentation (and a blog post).

One could quite easily put the Event Store Open Source version running on a virtual machine with data storage on a SAN. If the first machine goes wrong for some reason the VM is spun back up on a different host with connection to the same SAN. This is a perfectly valid way of handling a failover. There are some issues that come up with it however. The largest is the amount of downtime in the middle with a close second being a whole slew of byzantine problems that can occur (network reachability issues as an example)

This is not by any means a “bad” system. The Event Store however uses a different model for its replication that comes with its own strengths and weaknesses.

The Event Store uses a quorum based model for replication. As of now all replication is fully consistent though we have talked about other models in the future (more discussion further). When you use a quorum based model you almost always want to have an odd number of servers.

When a server is brought up it will use the dns given to it to find other nodes in its replication group. There is a config point for the predetermined quorum size of the group. It will begin gossiping with these other servers over http until it can get some information about the other nodes that are there.

If there is no quorum yet a Paxos Election will then occur to determine which of the nodes is to be the leader of the replication group (for right now). The Paxos Election basically says that all nodes will agree upon a leader, or none of them will think that they have agreed upon a leader. Once the leader is picked, the leader can begin accepting write transactions (other nodes forward writes to the leader, clients do not need to be aware of the leader).

Every write even in the Single Node open sourced version has a small manager that controls it’s lifecycle. It will write a prepare followed by a commit. This is done for various reasons not the least of which is transactional writes are supported. It is also used in replication.

The manager will first send out a Prepare of the write. This will go to its local storage writer which will reply with an ACK once the item has been fsynced to disk. The item is also asynchronously dispatched to the other nodes. The manager will not send its commit until a quorum of nodes has acknowledged the message as being fsynced to disk.

If a failure happens during the process of a prepare the commit will never happen, the client will be told that the transaction timed out (or perhaps the connection was lost if the leader was the one that died). The client’s responsibility in all cases is to retry, the Event Store is idempotent. The C# client will do this automatically for you. This client retrying also handles the case that the manager sent a commit and then died (not sending client a response). Again the client retries in this case. This also holds true in a multi-node scenario.

The interesting case that comes up here is when a commit has been written in the leader but it dies before distributing it to the other nodes and getting acks. The old leader will actually truncate in the process of coming back into the cluster (the client was never told the transaction was committed and thus will retry with the new leader who will know whether or not it was committed).

The key to the style of replication is the two quorums being used. Since a transaction is only considered written when a majority of nodes have it, when an election happens (also requires a quorum) you are assured that one node in the election will have the item or the election will to build a quorum. To illustrate let’s try an example. To illustrate the example let’s imagine that we are dealing with an incrementing sequence number.

Node A Leader -> 5555
Node B -> 5553
Node C -> 5554

Node A sends commit on 5555 and moves to 5556
Node C acks 5555

Node A Leader -> 5556
Node B -> 5553
Node C -> 5555

Now the leader dies.

Node A dead -> 5556
Node B -> 5553
Node C -> 5555

Nodes B/C have an election. They decide that C is the winner as it has more information than B, thus C is the new leader and sends B 5554 and 5555. The client on 5556 will receive a dropped connection and will retry 5556 with C. If both nodes A and B were dead. C would not be capable of creating a quorum. As such C would enter into read-only mode.

This replication model ensures consistency throughout the replica group and is a well known replication model. It has some strengths and weaknesses operationally over other models.

A main strength is that its fully consistent. There are not possibilities of conflicting data on different nodes (eg A accepted a write without seeing the write to C). For most business systems this is a huge gain as dealing with these rare problems often gets looked over when using models that allows them.

Another strength is that failovers happen very quickly with minimal impact on clients. This is especially true when you consider that the nodes internally route so the client does not need to in most cases know who the leader is. Along with this, for a group of three nodes, so long as any two are up and communicating the system is considered running and consistent.

The main weakness of the model is largely from the point of view that its consistent. If a quorum is unable to be built, the system will no longer accept writes and it will be readonly. This weakness also brings us to a strength of event sourcing. Keeping events locally in the application and synchronizing them later is actually a pretty easy process if you want to have eventuallly consistent behaviour with the Event Store.