Sunday, June 7, 2015

Using ELK (elasticsearch + logstash + kibana) to aggregate cassandra and spark logs

On one of my current projects we are using Cassandra and Spark Streaming to do some somewhat-close-to-real time analytics. The good folks at Datastax have built a commercial packaging (Datastax Enterprise, aka DSE) of Cassandra and Spark that allow you to get this stack up and running with relatively few headaches. One thing the Datastax offering does not include is a way to aggregate logs across all of these components. There are quite a few processes running across the cluster, each producing log files. Additionally, spark creates log directories for each application and driver program, each with their own logs. Between the high count of log files and the fact that work happens on all the nodes different each time depending on how the data gets partitioned- it can become a huge time sink to hunt around and grep for the log messages that you care about.

Enter the ELK stack. ELK is made up of three products:

  • Elasticsearch - a distributed indexing and search platform.  It provides a REST interface on top of a fancy distributed, highly available full text and semi-structured text searching system.  It uses Lucene internally for the inverted index and searching.
  • Logstash - log aggregator and processor.  This can take log feeds from lots of different sources, filter and mutate them, and output them somewhere else (elasticsearch in this case).  Logstash is the piece that needs to know about whatever structure you have in your log messages so that it can pull the structure out and send that semi-structured data to elasticsearch. 
  • Kibana - web based interactive visualization and query tool.  You can explore the raw data and turn it into fancy aggregate charts and build dashboards.
All three of these are open-source, Apache 2 licensed projects built by Elastic, a company founded by the folks that wrote Elasticsearch and Lucene.  They have all of the training, professional services, and production support subscriptions, and a stable of products with confusing names that you aren't quite sure if you need or not...

So how does this look at a high level? Spark and Cassandra run co-located on the same boxes.  This is by design so that your Spark jobs can use RDDs that use a partitioning scheme that is aware of Cassandra's ring topology.  This can minimize over-the-wire data shuffles, improving performance.  This diagram shows at a high level where each of these processes sit in this distributed environment:
This only depicts two "analytics" nodes and one ELK node, but obviously you will have more of each.  Each analytics node will be producing lots of logs.  Spark writes logs to:
  • /var/log/spark/worker
  • /var/lib/spark/worker/worker-0/app-{somelongtimestamp}/{taskslot#}
  • /var/lib/spark/worker/worker-0/driver-{somelongtimestamp}

To collect all of these logs and forward, there is an agent process on each node called Logstash-Forwarder that is monitoring user specified folders for new log files and is shipping them over via TCP to the actual logstash server process running on the ELK node.  Logstash receives these incoming feeds, parses them and sends them to elasticsearch.  Kibana responds to my interactive queries and delegates all of the search work to elasticsearch.  Kibana doesn't store any results internally or have its own indexes or anything.

Others have already done a good job explaining how to setup ELK and how to use Kibana, and therefore I won't repeat all of that here.  I will only highlight some of the differences, and share my Logstash configuration files that I had to create to handle the out-of-the-box log file formats for Cassandra and Spark (as packaged in DSE 4.7 at least).

I installed elasticsearch from the repo, which already created the systemd entries to run it as a service.  Following the ELK setup link above, I created systemd entries for logstash and kibana.  I also created a systemd unit file for the logstash-forwarder running on each analytics node.  

The logstash-forwarder needs to be configured with all of the locations that spark and cassandra will put logfiles.  It supports glob syntax, including recursive folder searches like "whatever/**/*.log", but I ended up not using that because it was duplicating some of the entries due to a wonky subfolder being created under the spark driver program log folder called cassandra_logging_unconfigured.  My forwarder configuration picks up all of the output logs for the workers, the applications, and the driver and creates a different type for each: spark-worker for generic /var/log spark output, spark-app for app-* log folder, spark-driver for the driver programs (where most of the interesting logging happens).  My logstash-forwarder.conf is in the gist.

For logstash I setup a few files as a pipeline to handle incoming log feeds:
  • 00_input.conf - sets up the lumberjack (protocol the forwarder uses) port and configures certs 
  • 01_cassandra_filter.conf - parses the logback formats that DSE delivers for cassandra.  Im not sure if vanilla open-source cassandra uses the same by defualt or not.  There are two formats between sometimes there is an extra value in here -- possibly from the logback equivalent of NDC.
  • 02_spark_filter.conf - parses the logback formats that DSE delivers for spark.  I see there are two formats I get here as well.  Sometimes with a line number, sometimes without.
  • 07_last_filter.conf - this is a multiline filter that recognizes java stacktraces and causes them to stay with the original message
  • 10_output.conf - sends everything to elasticsearch

All of my configuration files are available through the links above in this gist.  Between the linked guides above and the configuration here that should get you going if you need to monitor cassandra and spark logs with ELK!

Quick tip: While you're getting things configured and working, you might need to kill the currently indexed data and resend everything (so that it can reparse and re-index). The logstash-forwarder keeps a metadata file called .logstash-forwarder in the working directory where you started the forwarder.   If you want to kill all of the indexed data and resend everything, follow these steps:
  1. Kill the logstash-forwarder:
    sudo systemctl stop logstash-forwarder
  2. Delete the logstash-forwarder metadata so it starts from scratch next time:
    sudo rm /var/lib/logstash-forwarder/.logstash-forwarder
  3. If you need to change any logstash configuration files, do that and then restart logstash:
    sudo systemctl restart logstash
  4. Delete your existing elastic search indexes (be careful with this!):
    curl -XDELETE 'http://:9200/logstash-*'
  5. Start the logstash-forwarder again:
    sudo systemctl start logstash-forwarder
Also note that by default the logstash-forwarder will only pickup files less than 24 hours old.  So if you're configuring ELK and playing with filters on stagnant data, make sure at least some files are touched recently so it will pick them up.  Check out the log file in /var/log/logstash-forwarder to see if it's skipping particular entries.  You can also run the logstash-forwarder with -verbose to see additional debug information.

Quick tip: use the wonderfully useful to test out your grow regex patterns to make sure they match and pull out the fields you want.

Sunday, August 24, 2014

Bushwhacker to help team members debug unit test failures

Right now I'm working on a project with a few people who have never worked in Java before. There have been times where they'll experience some problem that they can't easily figure out. They send me the stack trace and instantly I know what the problem is. I wanted a simple way to automate this.

So I created Bushwhacker a simple library to help provide more helpful messages at development time.  Senior members of the team edit and maintain a simple xml file that describes rules to detect particular exceptions and either replace or enhance the exception message in some way.  The rules are fairly sophisticated -- allowing you to, for example, only detect IllegalArgumentException if it was throw from a stack frame from MyClassWhatever.  Since the xml rules file is read off of the classpath, you can version the XML file along with your source code (in src/test/resources for example).  And you can even maintain multiple rules files for different modules and compose them all together.  So if there are common pitfalls in your corporate "commons" module you can keep the bushwhacker rules for it there, and put your team's additional rules in your module.

It's on Maven Central Repo so its easy to add to your project. Check out usage instructions here:

Here's a few of the rules that we're using right now:

Tuesday, June 24, 2014

Single record UPSERT in MSSQL 2008+

I guess I just don't use SQL's MERGE statement enough to remember its syntax off the top of my head. When I look at the Microsoft documentation for it, I can't find just a simple example to do a single record UPSERT. When I google, I don't get any brief blog posts that show it. So here's a brief one to add to the pile:


datakey varchar(16) not null primary key,
seqno bigint not null,
last_updated datetime not null

The table just keeps track of sequence numbers for things identified by 'datakey'.  The first time we want to increment a value for a key we need to insert a new record with starting with seqno = 1 then every time after that we just want to increment the existing row.


Sunday, March 30, 2014

Spring Test Context Caching + AspectJ @Transactional + Ehcache pain

Are you using AspectJ @Transactionals and Spring? Do you have multiple SessionFactory's maybe one for an embedded database for unit testing and one for the real database for integration testing? Are you getting one of these exceptions?
org.springframework.transaction.CannotCreateTransactionException: Could not open Hibernate Session for transaction; nested exception is org.hibernate.service.UnknownServiceException: Unknown service requested 
java.lang.NullPointerException at net.sf.ehcache.Cache.isKeyInCache( at org.hibernate.cache.ehcache.internal.regions.EhcacheDataRegion.contains(
Then you are running in to a problem where multiple, cached application contexts are stepping on each other. This blog post will describe some strategies to deal with the problems that we have encountered.


Spring's Text Context framework by default tries to minimize the number of times the spring container has to start by caching the containers. If you are running multiple tests that all use the same configuration, then you will only have to create the container once for all the tests instead of creating it before each test. If you have 1000's of tests and the container takes 10-15 seconds to startup, this makes a real difference in build/test time.

This only works if everyone (you and all of the libraries that you use) avoid static fields (global state), and unfortunately there are places where this is difficult/impossible to avoid -- even spring violates this! A few places that have caused us problems:
  • Spring AspectJ @Transactional support
  • EhCache cache managers
Aspects are singletons by design. Spring uses this to place a reference to the BeanFactory as well as the PlatformTransactionManager. If you have multiple containers each with their "own" AnnotationTransactionAspect, they are in fact sharing the AnnotationTransactionAspect and whichever container starts up last is the "winner" causing all kinds of unexpected hard to debug problems. 

Ehcache is also a pain here. The ehcache library maints a static list of all of the cache managers that it has created in the VM. So if you want to use multiple containers, they will all share a reference to the same cache. Spring Test gives you a mechanism to indicate that this test has "dirtied" the container and that it needs to be created. This translates to destroying the container after the test class is finished. This is fine, but if your container has objects that are shared by the other containers, then destroying that shared object breaks the other containers.


The easiest solution is to basically disable the application context caching entirely. This can be done simply by placing @DirtiesContext on every test or (better) you probably should use super classes ("abstract test fixtures") to organize your tests anyways, in which case just add @DirtiesContext on the base class. Unfortunately you also lose all of the benefit of caching and your build times will increase. 

There is no general mechanism for the spring container to "clean itself up", because this sharing of state across container is certainly an anti-pattern. The fact that they themselves do it (AnnotationTransactionAspect, EhCacheManagerFactoryBean.setShared(true), etc.) is an indication that perhaps they should add some support. If you want to keep caching, then step 1 is making sure that you don't use any "static field" singletons in your code. Also make sure that any external resources that you write to are separated so that multiple containers can co-exist in the same JVM.

To address the AspectJ problem, the best solution that I have found is to create a TestExecutionListener that "resets" the AnnotationTransactionAspect to point to the correct bean factory and PTM before test execution. The code for such a listener is in this gist.

To then use the listener you put @TestListeners on your base class test fixture so that all tests run with the new listener. Note that when you use the @TestListeners annotation you then have to specify all of the execution listeners, including the existing Spring ones. There is an example in the gist.

The workaround for Ehcache is to not allow CacheManager instances to be shared between containers. To do this, you have to ensure that the cache managers all have unique names. This is actually pretty easy to configure.

Related Issues

Here are some links to spring jira issues covering this problem:

Thursday, September 5, 2013

Java final fields on x86 a no-op?

I have always enjoyed digging in to the details of multi-threaded programming, and always enjoy that despite reading for years about CPU memory consistency models, wait-free and lock-free algorithms, the java memory model, java concurrency in practice, etc. etc. -- I still create multi-threaded programming bugs. It's always a wonderfully humbling experience that reminds me how complicated of a problem this is.

If you've read the JMM, then you might remember that one of the areas they strengthened was the guarantee of visibility of final fields after the constructor completes. For example,
public class ClassA {
   public final String b;

   public ClassA(String b) {
      this.b = b;
ClassA x = new ClassA("hello");

The JMM states that every thread (even threads other than the one that constructed the instance, x, of ClassA) will always observe x.b as "hello" and would never see a value of null (the default value for a reference field).

This is really great! That means that we can create immutable objects just by marking the fields as final and any constructed instance is automatically able to shared amongst threads with no other work to guarantee memory visibility. Woot! The flip-side of this is that if ClassA.b were not marked as final then you would have no such guarantee. And other threads could observe a x.b == null result (if no other "safe publication" mechanisms were employed to get visibility)

Well when they created the new JMM, everyone's favorite JCP member, Doug Lea, created a cookbook to help JVM developers implement the new memory model rules. If you read this, then you will see that the "rules" state that JIT compilers should emit a StoreStore memory barrier, right before the constructor returns. This StoreStore barrier is a kind of "memory fence". When emitted in the assembly instructions, it means that no memory writes (stores) after the fence can be re-ordered before memory writes that appear before the fence. Note that it doesn't say anything about reads -- they can "hop" the fence in either direction.

So what does this mean? well if you think about what the compiler does when you call a constructor:
String x = new ClassA("hello");
  get's broken down in to pseudo-code steps of:

1. pointer_to_A = allocate memory for ClassA 
    (mark word, class object pointer, one reference field for String b)
2. pointer_to_A.whatever class meta data = ...
3. pointer_to_A.b = address of "hello" string
4. emit a StoreStore memory barrier per the JMM
5. x = pointer_to_A
The StoreStore barrier at step 4 ensures that any writes (such as class meta-data and to field b are not re-ordered with the write to x on step 5. This is what makes sure that if x is visible to any other threads -- that that other thread can't see x without seeing x.b as well. Without the StoreStore memory barrier, then steps 3 and 5 could be re-ordered and the write to main memory for x could show up before the write to x.b and another cpu core could observe pointer_to_A.b to be 0 (null), which would violate the JMM.

Great news! However, if you look at that cookbook you'll see some interesting things: (1) a lot of people are writing JVMs on lots of processor architectures! (2) *all* of the memory barriers on x86 are no-ops except the StoreLoad barrier! This means that on x86 this StoreStore memory barrier above is a no-op and thus no assembly is emitted for it. It does nothing! This is because the x86's memory model is a strong "total store ordering" (TSO). X86 makes sure that all memory writes are observed as if they were all made in the same order. Thus, the write of 5 would never appear before 3 to any other thread anyways due to TSO, and there is no need to emit a memory fence. Other cpu architectures have weaker memory models which do not make such guarantees, and thus the StoreStore memory fence is necessary. Note that weaker memory models, while perhaps harder or less-intuitive to program against, are generally much faster as the cpu can re-order things to make more efficient use of cache writes and reduce cache coherency work.

Obviously you should continue to write correct code that follows the JMM. BUT it also means (unfortunately or fortunately) that forgetting this will not lead to bugs if you're running on I do at work.

To really drill this home and ensure that there are no other side effects that maybe aren't being described in the cookbook, I ran the x86 assembly outputter as described here and captured the output of calling the constructor for ClassA (with the final on the reference type field) and the constructor for a ClassB, which was identical to ClassA except without the final keyword on the class member. The x86 assembly output is identical. So from a JIT perspective, on x86 (not itanium, not arm, etc), the final keyword has no impact.

If you're curious what the assembly code looks like here it is below. Note the lack of any locked instructions. When Oracle's 7u25 JRE emits an x86 StoreLoad memory fence it is done through emitting lock addl $0x0,(%rsp) which just adds zero to the stack pointer -- a no-op, but since its locked -- that has the effect of a full fence (which meets the criteria of a StoreLoad fence). There are a few different ways in x86 to cause the effect of a full fence, and they are discussed in the OpenJDK mailing list. They observed that at least on nehelem intel the lock add of 0 was most space compact/efficient.
  0x00007f152c020c60: mov    %eax,-0x14000(%rsp)
  0x00007f152c020c67: push   %rbp
  0x00007f152c020c68: sub    $0x20,%rsp         ;*synchronization entry
                                                ; - com.argodata.match.profiling.FinalConstructorMain::callA@-1 (line 60)
  0x00007f152c020c6c: mov    %rdx,(%rsp)
  0x00007f152c020c70: mov    %esi,%ebp
  0x00007f152c020c72: mov    0x60(%r15),%rax
  0x00007f152c020c76: mov    %rax,%r10
  0x00007f152c020c79: add    $0x18,%r10
  0x00007f152c020c7d: cmp    0x70(%r15),%r10
  0x00007f152c020c81: jae    0x00007f152c020cd6
  0x00007f152c020c83: mov    %r10,0x60(%r15)
  0x00007f152c020c87: prefetchnta 0xc0(%r10)
  0x00007f152c020c8f: mov    $0x8356f3d0,%r11d  ;   {oop('com/argodata/match/profiling/FinalConstructorMain$ClassA')}
  0x00007f152c020c95: mov    0xb0(%r11),%r10
  0x00007f152c020c9c: mov    %r10,(%rax)
  0x00007f152c020c9f: movl   $0x8356f3d0,0x8(%rax)  ;   {oop('com/argodata/match/profiling/FinalConstructorMain$ClassA')}
  0x00007f152c020ca6: mov    %r12d,0x14(%rax)   ;*new  ; - com.argodata.match.profiling.FinalConstructorMain::callA@0 (line 60)
  0x00007f152c020caa: mov    %ebp,0xc(%rax)     ;*putfield a
                                                ; - com.argodata.match.profiling.FinalConstructorMain$ClassA::@6 (line 17)
                                                ; - com.argodata.match.profiling.FinalConstructorMain::callA@6 (line 60)
  0x00007f152c020cad: mov    (%rsp),%r10
  0x00007f152c020cb1: mov    %r10d,0x10(%rax)   ;*new  ; - com.argodata.match.profiling.FinalConstructorMain::callA@0 (line 60)
  0x00007f152c020cb5: mov    %rax,%r10
  0x00007f152c020cb8: shr    $0x9,%r10
  0x00007f152c020cbc: mov    $0x7f152b765000,%r11
  0x00007f152c020cc6: mov    %r12b,(%r11,%r10,1)  ;*synchronization entry
                                                ; - com.argodata.match.profiling.FinalConstructorMain::callA@-1 (line 60)
  0x00007f152c020cca: add    $0x20,%rsp
  0x00007f152c020cce: pop    %rbp
  0x00007f152c020ccf: test   %eax,0x9fb932b(%rip)        # 0x00007f1535fda000
                                                ;   {poll_return}
  0x00007f152c020cd5: retq   
  0x00007f152c020cd6: mov    $0x8356f3d0,%rsi   ;   {oop('com/argodata/match/profiling/FinalConstructorMain$ClassA')}
  0x00007f152c020ce0: xchg   %ax,%ax
  0x00007f152c020ce3: callq  0x00007f152bfc51e0  ; OopMap{[0]=Oop off=136}
                                                ;*new  ; - com.argodata.match.profiling.FinalConstructorMain::callA@0 (line 60)
                                                ;   {runtime_call}
  0x00007f152c020ce8: jmp    0x00007f152c020caa  ;*new
                                                ; - com.argodata.match.profiling.FinalConstructorMain::callA@0 (line 60)
  0x00007f152c020cea: mov    %rax,%rsi
  0x00007f152c020ced: add    $0x20,%rsp
  0x00007f152c020cf1: pop    %rbp
  0x00007f152c020cf2: jmpq   0x00007f152bfc8920  ;   {runtime_call}

Monday, July 8, 2013

Database intro for developers

Most of my academic background focused on low level database research. I wrote a dynamic programming based join-order optimizer for the open-source SQLite database as project in school. I have submitted a paper for publication in an academic conference that deals with optimizing low-level page structures and optimizer heuristics to improve throughput for certain kinds of workloads on solid-state drives. All in all, I've been a "nuts and bolts" database geek for a while -- more on the systems-level side than on the data mining and data warehousing side. I can talk about exotic fractal trees and various B*-tree variants for efficient indexing and lock-free versions ad nauseum ;)

In any case, the level of willful ignorance about the basics of how databases work among typical enterprise developers has always surprised me. The are the workhorses of most enterprise apps, and getting them to perform well requires some knowledge of how they work. At least a basic understanding of how indexing works is useful in building up your mental intuition about what will perform and what won't in production. Everyone should also at least know the basic processing pipeline: parsing, optimization, execution.

This talk was meant to be a simple introduction to some of the basics of how databases process your queries for you. I'd like to do more follow on talks to add more depth in the future. It is geared towards developers -- not DBAs. So there is nothing in here about best practices for backups or transaction log shipping.


Introduction to multi-threaded java programming

I gave a talk for our group about multi-threaded java programming for some of our junior members. I lifted a bulk of the slides from the interwebz, but added a fair amount of new content and commentary. It's not really that good of an introduction, but its something.