Discussion of software development pragmatics in the space of java enterprise development, the confluence of errors that inevitably plague such projects, and the many cups of coffee required to make it through the day.
Saturday, December 19, 2015
Some neat Spring4 and Spring Boot features
Tuesday, September 1, 2015
Whirlwind tour of data science
Presentation is: http://slides.com/steveash/datascience/
Thursday, August 6, 2015
Streaming standard deviation and replacing values
I'm working on a problem right now where I need to calculate the mean and standard deviation of a value in a streaming fashion. That is I receive a stream of events that generate data points and I need to calculate standard deviation without retaining all of the data points in memory. This has been covered ad nauseam in the literature -- I will just briefly summarize.
Ignoring the whole sample vs population correction, variance can be naively computed as the mean of the squares minus the square of the mean: $$\sigma^2 = \frac{\sum\nolimits_{i=0}^n x_{i}^2}{n} - \left( \frac{\sum\nolimits_{i=0}^n x_i}{n} \right)^2$$
But this approach is plagued with well-documented stability and overflow problems. There are two other methods that I've run across: Welford's which is by far the most published and Ross's which appears to be an independent derivation.
Welford's method keeps track of the count, the mean, and a sum of the squared differences from the mean as it goes like this:
public static double welford(DoubleStream values) { OfDouble iter = values.iterator(); if (!iter.hasNext()) return 0; double m = iter.nextDouble(); double s = 0; int count = 1; while (iter.hasNext()) { double x = iter.nextDouble(); count += 1; double prevM = m; double delta = x - prevM; m = prevM + (delta / count); s += (delta * (x - m)); } return Math.sqrt(s / (count - 1)); }
This works, but what happens if you need to replace a value in the data population with another? Here's a scenario: you want to track the number of times people click on your ads. You store some counter per person and increment it every time someone clicks something. If you group those people into segments like "people from USA", "people from Canada" you might want to keep track of what is the average number of ad clicks per day for someone from Canada? and what is the standard deviation for this average? The data population that you are averaging and calculating the stddev for is the number of clicks per day -- but you dont want to run big expensive queries over all of your people records.
A solution would be: when you get an ad click for Steve, you notice that his previous counter value for today was 3 and you're about to make it four. So you previously accounted for a 3 in the group's mean/stddev calculation. Now you just need to remove the 3 and add the 4.
I didn't find a solution to this at first glance...though I did as I was writing this blog post >:| (more on that later). Here's one approach to replacing a value:
public void replace(double oldValue, double newValue) { if (count == 0) { add(newValue); // just add it new return; } // precisely update the mean double prevM = m; double sum = m * count; sum -= oldValue; sum += newValue; m = sum / count; s -= ((oldValue - prevM) * (oldValue - m)); s += ((newValue - prevM) * (newValue - m)); }
Since we have the mean and count we can precisely update the mean. Now we just back out the previous contribution to the sum of delta variance and add in the new. The means at the point of removal will be slightly different so it makes sense that this method would introduce a slight error. To measure the amount of error, I ran some simulations. I tried many variations on population size, modification count, drifting the mean over the modifications to simulate underlying data drift. Generally the error was very low -- keeping 7+ digits in agreement, which in most cases was an error < 10^-\7
Here is the output of a simulation with a data population size 1000 that went through 1 million random replacements. The mean was originally 50 and drifted up to 100,000 - so significant change in the underlying population. The final value calculated by the above "replacement" was 102.7715945 and the exact actual value was 102.7715947, which agrees to 9 digits. Here is output every 100,000 modifications. You can see that the error increases but very slowly.
Real 101.77630883973457 appx 101.77630883877222 err 9.455489999879566E-12 Real 108.95998062305821 appx 108.95998062134508 err 1.5722586742180294E-11 Real 111.47312659988472 appx 111.47312659954882 err 3.0133000046629816E-12 Real 104.24140320017268 appx 104.2414031714818 err 2.7523494838677614E-10 Real 108.11933587379178 appx 108.11933580405407 err 6.450068193382661E-10 Real 109.42545081143268 appx 109.42545070096071 err 1.0095637905406985E-9 Real 108.65152218111096 appx 108.65152203715265 err 1.3249544449723884E-9 Real 103.34165956961682 appx 103.3416593951618 err 1.6881384078315523E-9 Real 102.77159471215656 appx 102.77159451350815 err 1.9329116036550036E-9
As I was publishing this I found another approach for deleting values from Welford's standard deviation. This doesn't update the mean precisely, and I'm guessing that that introduces just a little more error. Running the exact same data through his approach shows just slightly more error:
Real 101.77630883973457 appx 101.77630883769623 err 2.0027587650921094E-11 Real 108.95998062305821 appx 108.95998062710102 err 3.710356226429184E-11 Real 111.47312659988472 appx 111.4731267005022 err 9.026165066977801E-10 Real 104.24140320017268 appx 104.2414035377799 err 3.2387056223459957E-9 Real 108.11933587379178 appx 108.11933681146603 err 8.672586049942834E-9 Real 109.42545081143268 appx 109.42545279287886 err 1.8107726862840628E-8 Real 108.65152218111096 appx 108.65152556310522 err 3.112698463527759E-8 Real 103.34165956961682 appx 103.34166620916582 err 6.424852307519515E-8 Real 102.77159471215656 appx 102.77160676463879 err 1.1727444988401424E-7
I put my code in a gist in case anyone wants to snag it.
Sunday, June 7, 2015
Using ELK (elasticsearch + logstash + kibana) to aggregate cassandra and spark logs
Enter the ELK stack. ELK is made up of three products:
- Elasticsearch - a distributed indexing and search platform. It provides a REST interface on top of a fancy distributed, highly available full text and semi-structured text searching system. It uses Lucene internally for the inverted index and searching.
- Logstash - log aggregator and processor. This can take log feeds from lots of different sources, filter and mutate them, and output them somewhere else (elasticsearch in this case). Logstash is the piece that needs to know about whatever structure you have in your log messages so that it can pull the structure out and send that semi-structured data to elasticsearch.
- Kibana - web based interactive visualization and query tool. You can explore the raw data and turn it into fancy aggregate charts and build dashboards.
- /var/log/spark/worker
- /var/lib/spark/worker/worker-0/app-{somelongtimestamp}/{taskslot#}
- /var/lib/spark/worker/worker-0/driver-{somelongtimestamp}
- 00_input.conf - sets up the lumberjack (protocol the forwarder uses) port and configures certs
- 01_cassandra_filter.conf - parses the logback formats that DSE delivers for cassandra. Im not sure if vanilla open-source cassandra uses the same by defualt or not. There are two formats between sometimes there is an extra value in here -- possibly from the logback equivalent of NDC.
- 02_spark_filter.conf - parses the logback formats that DSE delivers for spark. I see there are two formats I get here as well. Sometimes with a line number, sometimes without.
- 07_last_filter.conf - this is a multiline filter that recognizes java stacktraces and causes them to stay with the original message
- 10_output.conf - sends everything to elasticsearch
- Kill the logstash-forwarder:
sudo systemctl stop logstash-forwarder - Delete the logstash-forwarder metadata so it starts from scratch next time:
sudo rm /var/lib/logstash-forwarder/.logstash-forwarder - If you need to change any logstash configuration files, do that and then restart logstash:
sudo systemctl restart logstash - Delete your existing elastic search indexes (be careful with this!):
curl -XDELETE 'http://:9200/logstash-*' - Start the logstash-forwarder again:
sudo systemctl start logstash-forwarder