Saturday, December 19, 2015

Some neat Spring4 and Spring Boot features

Here's a few slides on some of the Spring4 and Spring Boot features that I used on a recent project that are interesting. This is not really an introduction to Spring Boot -- but just highlighting a few features. This talk was an internal talk for our team - thus it is informal and brief. But in case anyone else has any interest:

Tuesday, September 1, 2015

Whirlwind tour of data science

I'm giving a talk to the local Data Science / Machine Learning meetup. The topic is a shallow tour of data science topics including tasks and methods. This is a new group and many of the attendees do not have a data science background but are interested. Therefore, I thought a talk that went through the highlights would be useful. Not only is the tour a whirlwind, but creating the slides was a bit of a rush job in the wee hours of the night, so please excuse typos, factual errors, slander, blatent malicious lies, etc.

Presentation is: http://slides.com/steveash/datascience/

Thursday, August 6, 2015

Streaming standard deviation and replacing values

I'm working on a problem right now where I need to calculate the mean and standard deviation of a value in a streaming fashion.  That is I receive a stream of events that generate data points and I need to calculate standard deviation without retaining all of the data points in memory.  This has been covered ad nauseam in the literature -- I will just briefly summarize.

Ignoring the whole sample vs population correction, variance can be naively computed as the mean of the squares minus the square of the mean: $$\sigma^2 = \frac{\sum\nolimits_{i=0}^n x_{i}^2}{n} - \left( \frac{\sum\nolimits_{i=0}^n x_i}{n} \right)^2$$

But this approach is plagued with well-documented stability and overflow problems. There are two other methods that I've run across: Welford's which is by far the most published and Ross's which appears to be an independent derivation.

Welford's method keeps track of the count, the mean, and a sum of the squared differences from the mean as it goes like this:

public static double welford(DoubleStream values) {
  OfDouble iter = values.iterator();
  if (!iter.hasNext()) return 0;
  double m = iter.nextDouble();
  double s = 0;
  int count = 1;
  while (iter.hasNext()) {
    double x = iter.nextDouble();
    count += 1;
    double prevM = m;
    double delta = x - prevM;
    m = prevM + (delta / count);
    s += (delta * (x - m));
  }
  return Math.sqrt(s / (count - 1));
}

This works, but what happens if you need to replace a value in the data population with another? Here's a scenario: you want to track the number of times people click on your ads. You store some counter per person and increment it every time someone clicks something. If you group those people into segments like "people from USA", "people from Canada" you might want to keep track of what is the average number of ad clicks per day for someone from Canada? and what is the standard deviation for this average? The data population that you are averaging and calculating the stddev for is the number of clicks per day -- but you dont want to run big expensive queries over all of your people records.

A solution would be: when you get an ad click for Steve, you notice that his previous counter value for today was 3 and you're about to make it four. So you previously accounted for a 3 in the group's mean/stddev calculation. Now you just need to remove the 3 and add the 4.

I didn't find a solution to this at first glance...though I did as I was writing this blog post >:| (more on that later). Here's one approach to replacing a value:

public void replace(double oldValue, double newValue) {
  if (count == 0) {
    add(newValue); // just add it new
    return;
  }
  // precisely update the mean
  double prevM = m;
  double sum = m * count;
  sum -= oldValue;
  sum += newValue;
  m = sum / count;

  s -= ((oldValue - prevM) * (oldValue - m));
  s += ((newValue - prevM) * (newValue - m));
}

Since we have the mean and count we can precisely update the mean. Now we just back out the previous contribution to the sum of delta variance and add in the new. The means at the point of removal will be slightly different so it makes sense that this method would introduce a slight error. To measure the amount of error, I ran some simulations. I tried many variations on population size, modification count, drifting the mean over the modifications to simulate underlying data drift. Generally the error was very low -- keeping 7+ digits in agreement, which in most cases was an error < 10^-\7

Here is the output of a simulation with a data population size 1000 that went through 1 million random replacements. The mean was originally 50 and drifted up to 100,000 - so significant change in the underlying population. The final value calculated by the above "replacement" was 102.7715945 and the exact actual value was 102.7715947, which agrees to 9 digits. Here is output every 100,000 modifications. You can see that the error increases but very slowly.

Real 101.77630883973457 appx 101.77630883877222 err 9.455489999879566E-12
Real 108.95998062305821 appx 108.95998062134508 err 1.5722586742180294E-11
Real 111.47312659988472 appx 111.47312659954882 err 3.0133000046629816E-12
Real 104.24140320017268 appx 104.2414031714818  err 2.7523494838677614E-10
Real 108.11933587379178 appx 108.11933580405407 err 6.450068193382661E-10
Real 109.42545081143268 appx 109.42545070096071 err 1.0095637905406985E-9
Real 108.65152218111096 appx 108.65152203715265 err 1.3249544449723884E-9
Real 103.34165956961682 appx 103.3416593951618  err 1.6881384078315523E-9
Real 102.77159471215656 appx 102.77159451350815 err 1.9329116036550036E-9

As I was publishing this I found another approach for deleting values from Welford's standard deviation. This doesn't update the mean precisely, and I'm guessing that that introduces just a little more error. Running the exact same data through his approach shows just slightly more error:

Real 101.77630883973457 appx 101.77630883769623 err 2.0027587650921094E-11
Real 108.95998062305821 appx 108.95998062710102 err 3.710356226429184E-11
Real 111.47312659988472 appx 111.4731267005022  err 9.026165066977801E-10
Real 104.24140320017268 appx 104.2414035377799  err 3.2387056223459957E-9
Real 108.11933587379178 appx 108.11933681146603 err 8.672586049942834E-9
Real 109.42545081143268 appx 109.42545279287886 err 1.8107726862840628E-8
Real 108.65152218111096 appx 108.65152556310522 err 3.112698463527759E-8
Real 103.34165956961682 appx 103.34166620916582 err 6.424852307519515E-8
Real 102.77159471215656 appx 102.77160676463879 err 1.1727444988401424E-7

I put my code in a gist in case anyone wants to snag it.

Sunday, June 7, 2015

Using ELK (elasticsearch + logstash + kibana) to aggregate cassandra and spark logs

On one of my current projects we are using Cassandra and Spark Streaming to do some somewhat-close-to-real time analytics. The good folks at Datastax have built a commercial packaging (Datastax Enterprise, aka DSE) of Cassandra and Spark that allow you to get this stack up and running with relatively few headaches. One thing the Datastax offering does not include is a way to aggregate logs across all of these components. There are quite a few processes running across the cluster, each producing log files. Additionally, spark creates log directories for each application and driver program, each with their own logs. Between the high count of log files and the fact that work happens on all the nodes different each time depending on how the data gets partitioned- it can become a huge time sink to hunt around and grep for the log messages that you care about.

Enter the ELK stack. ELK is made up of three products:

  • Elasticsearch - a distributed indexing and search platform.  It provides a REST interface on top of a fancy distributed, highly available full text and semi-structured text searching system.  It uses Lucene internally for the inverted index and searching.
  • Logstash - log aggregator and processor.  This can take log feeds from lots of different sources, filter and mutate them, and output them somewhere else (elasticsearch in this case).  Logstash is the piece that needs to know about whatever structure you have in your log messages so that it can pull the structure out and send that semi-structured data to elasticsearch. 
  • Kibana - web based interactive visualization and query tool.  You can explore the raw data and turn it into fancy aggregate charts and build dashboards.
All three of these are open-source, Apache 2 licensed projects built by Elastic, a company founded by the folks that wrote Elasticsearch and Lucene.  They have all of the training, professional services, and production support subscriptions, and a stable of products with confusing names that you aren't quite sure if you need or not...

So how does this look at a high level? Spark and Cassandra run co-located on the same boxes.  This is by design so that your Spark jobs can use RDDs that use a partitioning scheme that is aware of Cassandra's ring topology.  This can minimize over-the-wire data shuffles, improving performance.  This diagram shows at a high level where each of these processes sit in this distributed environment:
This only depicts two "analytics" nodes and one ELK node, but obviously you will have more of each.  Each analytics node will be producing lots of logs.  Spark writes logs to:
  • /var/log/spark/worker
  • /var/lib/spark/worker/worker-0/app-{somelongtimestamp}/{taskslot#}
  • /var/lib/spark/worker/worker-0/driver-{somelongtimestamp}

To collect all of these logs and forward, there is an agent process on each node called Logstash-Forwarder that is monitoring user specified folders for new log files and is shipping them over via TCP to the actual logstash server process running on the ELK node.  Logstash receives these incoming feeds, parses them and sends them to elasticsearch.  Kibana responds to my interactive queries and delegates all of the search work to elasticsearch.  Kibana doesn't store any results internally or have its own indexes or anything.

Others have already done a good job explaining how to setup ELK and how to use Kibana, and therefore I won't repeat all of that here.  I will only highlight some of the differences, and share my Logstash configuration files that I had to create to handle the out-of-the-box log file formats for Cassandra and Spark (as packaged in DSE 4.7 at least).

I installed elasticsearch from the repo, which already created the systemd entries to run it as a service.  Following the ELK setup link above, I created systemd entries for logstash and kibana.  I also created a systemd unit file for the logstash-forwarder running on each analytics node.  

The logstash-forwarder needs to be configured with all of the locations that spark and cassandra will put logfiles.  It supports glob syntax, including recursive folder searches like "whatever/**/*.log", but I ended up not using that because it was duplicating some of the entries due to a wonky subfolder being created under the spark driver program log folder called cassandra_logging_unconfigured.  My forwarder configuration picks up all of the output logs for the workers, the applications, and the driver and creates a different type for each: spark-worker for generic /var/log spark output, spark-app for app-* log folder, spark-driver for the driver programs (where most of the interesting logging happens).  My logstash-forwarder.conf is in the gist.

For logstash I setup a few files as a pipeline to handle incoming log feeds:
  • 00_input.conf - sets up the lumberjack (protocol the forwarder uses) port and configures certs 
  • 01_cassandra_filter.conf - parses the logback formats that DSE delivers for cassandra.  Im not sure if vanilla open-source cassandra uses the same by defualt or not.  There are two formats between sometimes there is an extra value in here -- possibly from the logback equivalent of NDC.
  • 02_spark_filter.conf - parses the logback formats that DSE delivers for spark.  I see there are two formats I get here as well.  Sometimes with a line number, sometimes without.
  • 07_last_filter.conf - this is a multiline filter that recognizes java stacktraces and causes them to stay with the original message
  • 10_output.conf - sends everything to elasticsearch

All of my configuration files are available through the links above in this gist.  Between the linked guides above and the configuration here that should get you going if you need to monitor cassandra and spark logs with ELK!

Quick tip: While you're getting things configured and working, you might need to kill the currently indexed data and resend everything (so that it can reparse and re-index). The logstash-forwarder keeps a metadata file called .logstash-forwarder in the working directory where you started the forwarder.   If you want to kill all of the indexed data and resend everything, follow these steps:
  1. Kill the logstash-forwarder:
    sudo systemctl stop logstash-forwarder
  2. Delete the logstash-forwarder metadata so it starts from scratch next time:
    sudo rm /var/lib/logstash-forwarder/.logstash-forwarder
  3. If you need to change any logstash configuration files, do that and then restart logstash:
    sudo systemctl restart logstash
  4. Delete your existing elastic search indexes (be careful with this!):
    curl -XDELETE 'http://:9200/logstash-*'
  5. Start the logstash-forwarder again:
    sudo systemctl start logstash-forwarder
Also note that by default the logstash-forwarder will only pickup files less than 24 hours old.  So if you're configuring ELK and playing with filters on stagnant data, make sure at least some files are touched recently so it will pick them up.  Check out the log file in /var/log/logstash-forwarder to see if it's skipping particular entries.  You can also run the logstash-forwarder with -verbose to see additional debug information.

Quick tip: use the wonderfully useful http://grokdebug.herokuapp.com/ to test out your grow regex patterns to make sure they match and pull out the fields you want.