Tuesday, September 1, 2015

Whirlwind tour of data science

I'm giving a talk to the local Data Science / Machine Learning meetup. The topic is a shallow tour of data science topics including tasks and methods. This is a new group and many of the attendees do not have a data science background but are interested. Therefore, I thought a talk that went through the highlights would be useful. Not only is the tour a whirlwind, but creating the slides was a bit of a rush job in the wee hours of the night, so please excuse typos, factual errors, slander, blatent malicious lies, etc.

Presentation is: http://slides.com/steveash/datascience/

Thursday, August 6, 2015

Streaming standard deviation and replacing values

I'm working on a problem right now where I need to calculate the mean and standard deviation of a value in a streaming fashion.  That is I receive a stream of events that generate data points and I need to calculate standard deviation without retaining all of the data points in memory.  This has been covered ad nauseam in the literature -- I will just briefly summarize.

Ignoring the whole sample vs population correction, variance can be naively computed as the mean of the squares minus the square of the mean: $$\sigma^2 = \frac{\sum\nolimits_{i=0}^n x_{i}^2}{n} - \left( \frac{\sum\nolimits_{i=0}^n x_i}{n} \right)^2$$

But this approach is plagued with well-documented stability and overflow problems. There are two other methods that I've run across: Welford's which is by far the most published and Ross's which appears to be an independent derivation.

Welford's method keeps track of the count, the mean, and a sum of the squared differences from the mean as it goes like this:

public static double welford(DoubleStream values) {
  OfDouble iter = values.iterator();
  if (!iter.hasNext()) return 0;
  double m = iter.nextDouble();
  double s = 0;
  int count = 1;
  while (iter.hasNext()) {
    double x = iter.nextDouble();
    count += 1;
    double prevM = m;
    double delta = x - prevM;
    m = prevM + (delta / count);
    s += (delta * (x - m));
  return Math.sqrt(s / (count - 1));

This works, but what happens if you need to replace a value in the data population with another? Here's a scenario: you want to track the number of times people click on your ads. You store some counter per person and increment it every time someone clicks something. If you group those people into segments like "people from USA", "people from Canada" you might want to keep track of what is the average number of ad clicks per day for someone from Canada? and what is the standard deviation for this average? The data population that you are averaging and calculating the stddev for is the number of clicks per day -- but you dont want to run big expensive queries over all of your people records.

A solution would be: when you get an ad click for Steve, you notice that his previous counter value for today was 3 and you're about to make it four. So you previously accounted for a 3 in the group's mean/stddev calculation. Now you just need to remove the 3 and add the 4.

I didn't find a solution to this at first glance...though I did as I was writing this blog post >:| (more on that later). Here's one approach to replacing a value:

public void replace(double oldValue, double newValue) {
  if (count == 0) {
    add(newValue); // just add it new
  // precisely update the mean
  double prevM = m;
  double sum = m * count;
  sum -= oldValue;
  sum += newValue;
  m = sum / count;

  s -= ((oldValue - prevM) * (oldValue - m));
  s += ((newValue - prevM) * (newValue - m));

Since we have the mean and count we can precisely update the mean. Now we just back out the previous contribution to the sum of delta variance and add in the new. The means at the point of removal will be slightly different so it makes sense that this method would introduce a slight error. To measure the amount of error, I ran some simulations. I tried many variations on population size, modification count, drifting the mean over the modifications to simulate underlying data drift. Generally the error was very low -- keeping 7+ digits in agreement, which in most cases was an error < 10^-\7

Here is the output of a simulation with a data population size 1000 that went through 1 million random replacements. The mean was originally 50 and drifted up to 100,000 - so significant change in the underlying population. The final value calculated by the above "replacement" was 102.7715945 and the exact actual value was 102.7715947, which agrees to 9 digits. Here is output every 100,000 modifications. You can see that the error increases but very slowly.

Real 101.77630883973457 appx 101.77630883877222 err 9.455489999879566E-12
Real 108.95998062305821 appx 108.95998062134508 err 1.5722586742180294E-11
Real 111.47312659988472 appx 111.47312659954882 err 3.0133000046629816E-12
Real 104.24140320017268 appx 104.2414031714818  err 2.7523494838677614E-10
Real 108.11933587379178 appx 108.11933580405407 err 6.450068193382661E-10
Real 109.42545081143268 appx 109.42545070096071 err 1.0095637905406985E-9
Real 108.65152218111096 appx 108.65152203715265 err 1.3249544449723884E-9
Real 103.34165956961682 appx 103.3416593951618  err 1.6881384078315523E-9
Real 102.77159471215656 appx 102.77159451350815 err 1.9329116036550036E-9

As I was publishing this I found another approach for deleting values from Welford's standard deviation. This doesn't update the mean precisely, and I'm guessing that that introduces just a little more error. Running the exact same data through his approach shows just slightly more error:

Real 101.77630883973457 appx 101.77630883769623 err 2.0027587650921094E-11
Real 108.95998062305821 appx 108.95998062710102 err 3.710356226429184E-11
Real 111.47312659988472 appx 111.4731267005022  err 9.026165066977801E-10
Real 104.24140320017268 appx 104.2414035377799  err 3.2387056223459957E-9
Real 108.11933587379178 appx 108.11933681146603 err 8.672586049942834E-9
Real 109.42545081143268 appx 109.42545279287886 err 1.8107726862840628E-8
Real 108.65152218111096 appx 108.65152556310522 err 3.112698463527759E-8
Real 103.34165956961682 appx 103.34166620916582 err 6.424852307519515E-8
Real 102.77159471215656 appx 102.77160676463879 err 1.1727444988401424E-7

I put my code in a gist in case anyone wants to snag it.

Sunday, June 7, 2015

Using ELK (elasticsearch + logstash + kibana) to aggregate cassandra and spark logs

On one of my current projects we are using Cassandra and Spark Streaming to do some somewhat-close-to-real time analytics. The good folks at Datastax have built a commercial packaging (Datastax Enterprise, aka DSE) of Cassandra and Spark that allow you to get this stack up and running with relatively few headaches. One thing the Datastax offering does not include is a way to aggregate logs across all of these components. There are quite a few processes running across the cluster, each producing log files. Additionally, spark creates log directories for each application and driver program, each with their own logs. Between the high count of log files and the fact that work happens on all the nodes different each time depending on how the data gets partitioned- it can become a huge time sink to hunt around and grep for the log messages that you care about.

Enter the ELK stack. ELK is made up of three products:

  • Elasticsearch - a distributed indexing and search platform.  It provides a REST interface on top of a fancy distributed, highly available full text and semi-structured text searching system.  It uses Lucene internally for the inverted index and searching.
  • Logstash - log aggregator and processor.  This can take log feeds from lots of different sources, filter and mutate them, and output them somewhere else (elasticsearch in this case).  Logstash is the piece that needs to know about whatever structure you have in your log messages so that it can pull the structure out and send that semi-structured data to elasticsearch. 
  • Kibana - web based interactive visualization and query tool.  You can explore the raw data and turn it into fancy aggregate charts and build dashboards.
All three of these are open-source, Apache 2 licensed projects built by Elastic, a company founded by the folks that wrote Elasticsearch and Lucene.  They have all of the training, professional services, and production support subscriptions, and a stable of products with confusing names that you aren't quite sure if you need or not...

So how does this look at a high level? Spark and Cassandra run co-located on the same boxes.  This is by design so that your Spark jobs can use RDDs that use a partitioning scheme that is aware of Cassandra's ring topology.  This can minimize over-the-wire data shuffles, improving performance.  This diagram shows at a high level where each of these processes sit in this distributed environment:
This only depicts two "analytics" nodes and one ELK node, but obviously you will have more of each.  Each analytics node will be producing lots of logs.  Spark writes logs to:
  • /var/log/spark/worker
  • /var/lib/spark/worker/worker-0/app-{somelongtimestamp}/{taskslot#}
  • /var/lib/spark/worker/worker-0/driver-{somelongtimestamp}

To collect all of these logs and forward, there is an agent process on each node called Logstash-Forwarder that is monitoring user specified folders for new log files and is shipping them over via TCP to the actual logstash server process running on the ELK node.  Logstash receives these incoming feeds, parses them and sends them to elasticsearch.  Kibana responds to my interactive queries and delegates all of the search work to elasticsearch.  Kibana doesn't store any results internally or have its own indexes or anything.

Others have already done a good job explaining how to setup ELK and how to use Kibana, and therefore I won't repeat all of that here.  I will only highlight some of the differences, and share my Logstash configuration files that I had to create to handle the out-of-the-box log file formats for Cassandra and Spark (as packaged in DSE 4.7 at least).

I installed elasticsearch from the repo, which already created the systemd entries to run it as a service.  Following the ELK setup link above, I created systemd entries for logstash and kibana.  I also created a systemd unit file for the logstash-forwarder running on each analytics node.  

The logstash-forwarder needs to be configured with all of the locations that spark and cassandra will put logfiles.  It supports glob syntax, including recursive folder searches like "whatever/**/*.log", but I ended up not using that because it was duplicating some of the entries due to a wonky subfolder being created under the spark driver program log folder called cassandra_logging_unconfigured.  My forwarder configuration picks up all of the output logs for the workers, the applications, and the driver and creates a different type for each: spark-worker for generic /var/log spark output, spark-app for app-* log folder, spark-driver for the driver programs (where most of the interesting logging happens).  My logstash-forwarder.conf is in the gist.

For logstash I setup a few files as a pipeline to handle incoming log feeds:
  • 00_input.conf - sets up the lumberjack (protocol the forwarder uses) port and configures certs 
  • 01_cassandra_filter.conf - parses the logback formats that DSE delivers for cassandra.  Im not sure if vanilla open-source cassandra uses the same by defualt or not.  There are two formats between sometimes there is an extra value in here -- possibly from the logback equivalent of NDC.
  • 02_spark_filter.conf - parses the logback formats that DSE delivers for spark.  I see there are two formats I get here as well.  Sometimes with a line number, sometimes without.
  • 07_last_filter.conf - this is a multiline filter that recognizes java stacktraces and causes them to stay with the original message
  • 10_output.conf - sends everything to elasticsearch

All of my configuration files are available through the links above in this gist.  Between the linked guides above and the configuration here that should get you going if you need to monitor cassandra and spark logs with ELK!

Quick tip: While you're getting things configured and working, you might need to kill the currently indexed data and resend everything (so that it can reparse and re-index). The logstash-forwarder keeps a metadata file called .logstash-forwarder in the working directory where you started the forwarder.   If you want to kill all of the indexed data and resend everything, follow these steps:
  1. Kill the logstash-forwarder:
    sudo systemctl stop logstash-forwarder
  2. Delete the logstash-forwarder metadata so it starts from scratch next time:
    sudo rm /var/lib/logstash-forwarder/.logstash-forwarder
  3. If you need to change any logstash configuration files, do that and then restart logstash:
    sudo systemctl restart logstash
  4. Delete your existing elastic search indexes (be careful with this!):
    curl -XDELETE 'http://:9200/logstash-*'
  5. Start the logstash-forwarder again:
    sudo systemctl start logstash-forwarder
Also note that by default the logstash-forwarder will only pickup files less than 24 hours old.  So if you're configuring ELK and playing with filters on stagnant data, make sure at least some files are touched recently so it will pick them up.  Check out the log file in /var/log/logstash-forwarder to see if it's skipping particular entries.  You can also run the logstash-forwarder with -verbose to see additional debug information.

Quick tip: use the wonderfully useful http://grokdebug.herokuapp.com/ to test out your grow regex patterns to make sure they match and pull out the fields you want.

Sunday, August 24, 2014

Bushwhacker to help team members debug unit test failures

Right now I'm working on a project with a few people who have never worked in Java before. There have been times where they'll experience some problem that they can't easily figure out. They send me the stack trace and instantly I know what the problem is. I wanted a simple way to automate this.

So I created Bushwhacker a simple library to help provide more helpful messages at development time.  Senior members of the team edit and maintain a simple xml file that describes rules to detect particular exceptions and either replace or enhance the exception message in some way.  The rules are fairly sophisticated -- allowing you to, for example, only detect IllegalArgumentException if it was throw from a stack frame from MyClassWhatever.  Since the xml rules file is read off of the classpath, you can version the XML file along with your source code (in src/test/resources for example).  And you can even maintain multiple rules files for different modules and compose them all together.  So if there are common pitfalls in your corporate "commons" module you can keep the bushwhacker rules for it there, and put your team's additional rules in your module.

It's on Maven Central Repo so its easy to add to your project. Check out usage instructions here: https://github.com/steveash/bushwhacker

Here's a few of the rules that we're using right now:

Tuesday, June 24, 2014

Single record UPSERT in MSSQL 2008+

I guess I just don't use SQL's MERGE statement enough to remember its syntax off the top of my head. When I look at the Microsoft documentation for it, I can't find just a simple example to do a single record UPSERT. When I google, I don't get any brief blog posts that show it. So here's a brief one to add to the pile:


datakey varchar(16) not null primary key,
seqno bigint not null,
last_updated datetime not null

The table just keeps track of sequence numbers for things identified by 'datakey'.  The first time we want to increment a value for a key we need to insert a new record with starting with seqno = 1 then every time after that we just want to increment the existing row.


Sunday, March 30, 2014

Spring Test Context Caching + AspectJ @Transactional + Ehcache pain

Are you using AspectJ @Transactionals and Spring? Do you have multiple SessionFactory's maybe one for an embedded database for unit testing and one for the real database for integration testing? Are you getting one of these exceptions?
org.springframework.transaction.CannotCreateTransactionException: Could not open Hibernate Session for transaction; nested exception is org.hibernate.service.UnknownServiceException: Unknown service requested 
java.lang.NullPointerException at net.sf.ehcache.Cache.isKeyInCache(Cache.java:3068) at org.hibernate.cache.ehcache.internal.regions.EhcacheDataRegion.contains(EhcacheDataRegion.java:223)
Then you are running in to a problem where multiple, cached application contexts are stepping on each other. This blog post will describe some strategies to deal with the problems that we have encountered.


Spring's Text Context framework by default tries to minimize the number of times the spring container has to start by caching the containers. If you are running multiple tests that all use the same configuration, then you will only have to create the container once for all the tests instead of creating it before each test. If you have 1000's of tests and the container takes 10-15 seconds to startup, this makes a real difference in build/test time.

This only works if everyone (you and all of the libraries that you use) avoid static fields (global state), and unfortunately there are places where this is difficult/impossible to avoid -- even spring violates this! A few places that have caused us problems:
  • Spring AspectJ @Transactional support
  • EhCache cache managers
Aspects are singletons by design. Spring uses this to place a reference to the BeanFactory as well as the PlatformTransactionManager. If you have multiple containers each with their "own" AnnotationTransactionAspect, they are in fact sharing the AnnotationTransactionAspect and whichever container starts up last is the "winner" causing all kinds of unexpected hard to debug problems. 

Ehcache is also a pain here. The ehcache library maints a static list of all of the cache managers that it has created in the VM. So if you want to use multiple containers, they will all share a reference to the same cache. Spring Test gives you a mechanism to indicate that this test has "dirtied" the container and that it needs to be created. This translates to destroying the container after the test class is finished. This is fine, but if your container has objects that are shared by the other containers, then destroying that shared object breaks the other containers.


The easiest solution is to basically disable the application context caching entirely. This can be done simply by placing @DirtiesContext on every test or (better) you probably should use super classes ("abstract test fixtures") to organize your tests anyways, in which case just add @DirtiesContext on the base class. Unfortunately you also lose all of the benefit of caching and your build times will increase. 

There is no general mechanism for the spring container to "clean itself up", because this sharing of state across container is certainly an anti-pattern. The fact that they themselves do it (AnnotationTransactionAspect, EhCacheManagerFactoryBean.setShared(true), etc.) is an indication that perhaps they should add some support. If you want to keep caching, then step 1 is making sure that you don't use any "static field" singletons in your code. Also make sure that any external resources that you write to are separated so that multiple containers can co-exist in the same JVM.

To address the AspectJ problem, the best solution that I have found is to create a TestExecutionListener that "resets" the AnnotationTransactionAspect to point to the correct bean factory and PTM before test execution. The code for such a listener is in this gist.

To then use the listener you put @TestListeners on your base class test fixture so that all tests run with the new listener. Note that when you use the @TestListeners annotation you then have to specify all of the execution listeners, including the existing Spring ones. There is an example in the gist.

The workaround for Ehcache is to not allow CacheManager instances to be shared between containers. To do this, you have to ensure that the cache managers all have unique names. This is actually pretty easy to configure.

Related Issues

Here are some links to spring jira issues covering this problem:

Thursday, September 5, 2013

Java final fields on x86 a no-op?

I have always enjoyed digging in to the details of multi-threaded programming, and always enjoy that despite reading for years about CPU memory consistency models, wait-free and lock-free algorithms, the java memory model, java concurrency in practice, etc. etc. -- I still create multi-threaded programming bugs. It's always a wonderfully humbling experience that reminds me how complicated of a problem this is.

If you've read the JMM, then you might remember that one of the areas they strengthened was the guarantee of visibility of final fields after the constructor completes. For example,
public class ClassA {
   public final String b;

   public ClassA(String b) {
      this.b = b;
ClassA x = new ClassA("hello");

The JMM states that every thread (even threads other than the one that constructed the instance, x, of ClassA) will always observe x.b as "hello" and would never see a value of null (the default value for a reference field).

This is really great! That means that we can create immutable objects just by marking the fields as final and any constructed instance is automatically able to shared amongst threads with no other work to guarantee memory visibility. Woot! The flip-side of this is that if ClassA.b were not marked as final then you would have no such guarantee. And other threads could observe a x.b == null result (if no other "safe publication" mechanisms were employed to get visibility)

Well when they created the new JMM, everyone's favorite JCP member, Doug Lea, created a cookbook to help JVM developers implement the new memory model rules. If you read this, then you will see that the "rules" state that JIT compilers should emit a StoreStore memory barrier, right before the constructor returns. This StoreStore barrier is a kind of "memory fence". When emitted in the assembly instructions, it means that no memory writes (stores) after the fence can be re-ordered before memory writes that appear before the fence. Note that it doesn't say anything about reads -- they can "hop" the fence in either direction.

So what does this mean? well if you think about what the compiler does when you call a constructor:
String x = new ClassA("hello");
  get's broken down in to pseudo-code steps of:

1. pointer_to_A = allocate memory for ClassA 
    (mark word, class object pointer, one reference field for String b)
2. pointer_to_A.whatever class meta data = ...
3. pointer_to_A.b = address of "hello" string
4. emit a StoreStore memory barrier per the JMM
5. x = pointer_to_A
The StoreStore barrier at step 4 ensures that any writes (such as class meta-data and to field b are not re-ordered with the write to x on step 5. This is what makes sure that if x is visible to any other threads -- that that other thread can't see x without seeing x.b as well. Without the StoreStore memory barrier, then steps 3 and 5 could be re-ordered and the write to main memory for x could show up before the write to x.b and another cpu core could observe pointer_to_A.b to be 0 (null), which would violate the JMM.

Great news! However, if you look at that cookbook you'll see some interesting things: (1) a lot of people are writing JVMs on lots of processor architectures! (2) *all* of the memory barriers on x86 are no-ops except the StoreLoad barrier! This means that on x86 this StoreStore memory barrier above is a no-op and thus no assembly is emitted for it. It does nothing! This is because the x86's memory model is a strong "total store ordering" (TSO). X86 makes sure that all memory writes are observed as if they were all made in the same order. Thus, the write of 5 would never appear before 3 to any other thread anyways due to TSO, and there is no need to emit a memory fence. Other cpu architectures have weaker memory models which do not make such guarantees, and thus the StoreStore memory fence is necessary. Note that weaker memory models, while perhaps harder or less-intuitive to program against, are generally much faster as the cpu can re-order things to make more efficient use of cache writes and reduce cache coherency work.

Obviously you should continue to write correct code that follows the JMM. BUT it also means (unfortunately or fortunately) that forgetting this will not lead to bugs if you're running on x86...like I do at work.

To really drill this home and ensure that there are no other side effects that maybe aren't being described in the cookbook, I ran the x86 assembly outputter as described here and captured the output of calling the constructor for ClassA (with the final on the reference type field) and the constructor for a ClassB, which was identical to ClassA except without the final keyword on the class member. The x86 assembly output is identical. So from a JIT perspective, on x86 (not itanium, not arm, etc), the final keyword has no impact.

If you're curious what the assembly code looks like here it is below. Note the lack of any locked instructions. When Oracle's 7u25 JRE emits an x86 StoreLoad memory fence it is done through emitting lock addl $0x0,(%rsp) which just adds zero to the stack pointer -- a no-op, but since its locked -- that has the effect of a full fence (which meets the criteria of a StoreLoad fence). There are a few different ways in x86 to cause the effect of a full fence, and they are discussed in the OpenJDK mailing list. They observed that at least on nehelem intel the lock add of 0 was most space compact/efficient.
  0x00007f152c020c60: mov    %eax,-0x14000(%rsp)
  0x00007f152c020c67: push   %rbp
  0x00007f152c020c68: sub    $0x20,%rsp         ;*synchronization entry
                                                ; - com.argodata.match.profiling.FinalConstructorMain::callA@-1 (line 60)
  0x00007f152c020c6c: mov    %rdx,(%rsp)
  0x00007f152c020c70: mov    %esi,%ebp
  0x00007f152c020c72: mov    0x60(%r15),%rax
  0x00007f152c020c76: mov    %rax,%r10
  0x00007f152c020c79: add    $0x18,%r10
  0x00007f152c020c7d: cmp    0x70(%r15),%r10
  0x00007f152c020c81: jae    0x00007f152c020cd6
  0x00007f152c020c83: mov    %r10,0x60(%r15)
  0x00007f152c020c87: prefetchnta 0xc0(%r10)
  0x00007f152c020c8f: mov    $0x8356f3d0,%r11d  ;   {oop('com/argodata/match/profiling/FinalConstructorMain$ClassA')}
  0x00007f152c020c95: mov    0xb0(%r11),%r10
  0x00007f152c020c9c: mov    %r10,(%rax)
  0x00007f152c020c9f: movl   $0x8356f3d0,0x8(%rax)  ;   {oop('com/argodata/match/profiling/FinalConstructorMain$ClassA')}
  0x00007f152c020ca6: mov    %r12d,0x14(%rax)   ;*new  ; - com.argodata.match.profiling.FinalConstructorMain::callA@0 (line 60)
  0x00007f152c020caa: mov    %ebp,0xc(%rax)     ;*putfield a
                                                ; - com.argodata.match.profiling.FinalConstructorMain$ClassA::@6 (line 17)
                                                ; - com.argodata.match.profiling.FinalConstructorMain::callA@6 (line 60)
  0x00007f152c020cad: mov    (%rsp),%r10
  0x00007f152c020cb1: mov    %r10d,0x10(%rax)   ;*new  ; - com.argodata.match.profiling.FinalConstructorMain::callA@0 (line 60)
  0x00007f152c020cb5: mov    %rax,%r10
  0x00007f152c020cb8: shr    $0x9,%r10
  0x00007f152c020cbc: mov    $0x7f152b765000,%r11
  0x00007f152c020cc6: mov    %r12b,(%r11,%r10,1)  ;*synchronization entry
                                                ; - com.argodata.match.profiling.FinalConstructorMain::callA@-1 (line 60)
  0x00007f152c020cca: add    $0x20,%rsp
  0x00007f152c020cce: pop    %rbp
  0x00007f152c020ccf: test   %eax,0x9fb932b(%rip)        # 0x00007f1535fda000
                                                ;   {poll_return}
  0x00007f152c020cd5: retq   
  0x00007f152c020cd6: mov    $0x8356f3d0,%rsi   ;   {oop('com/argodata/match/profiling/FinalConstructorMain$ClassA')}
  0x00007f152c020ce0: xchg   %ax,%ax
  0x00007f152c020ce3: callq  0x00007f152bfc51e0  ; OopMap{[0]=Oop off=136}
                                                ;*new  ; - com.argodata.match.profiling.FinalConstructorMain::callA@0 (line 60)
                                                ;   {runtime_call}
  0x00007f152c020ce8: jmp    0x00007f152c020caa  ;*new
                                                ; - com.argodata.match.profiling.FinalConstructorMain::callA@0 (line 60)
  0x00007f152c020cea: mov    %rax,%rsi
  0x00007f152c020ced: add    $0x20,%rsp
  0x00007f152c020cf1: pop    %rbp
  0x00007f152c020cf2: jmpq   0x00007f152bfc8920  ;   {runtime_call}