Starting a New Application in MUPD8


Table of Contents

Introduction
Install Java (10 minutes, incl download time)
Install Git (8 minutes, incl download time)
Install Maven 3.0+ (3 minutes)
Start Cassandra (4 minutes)
Build MUPD8 (4 minutes)
Write Application (2 minutes)
Run Application (1 minute)
In Review

Introduction

Getting a new application started in the JVM MUPD8 is easy.

First, for a conceptual overview of MapUpdate data-stream processing, please see the PVLDB Vol. 5 No. 12 paper or its corresponding presentation at VLDB 2012: Muppet: MapReduce-Style Processing of Fast Data.

This document details the steps to start a simple example application consisting of one Mapper and one Updater. The application will maintain live counts over an existing input stream of HTTP-request events by the filetype (specifically: the filename extension) of the requested URI.

The steps are straightforward: Fetch and install Apache Cassandra (a durable key-value store for MUPD8); fetch and install MUPD8; then write and run the new MUPD8 application. The commands enumerated can be followed verbatim in about 10–15 minutes.

Note

The commands that follow use the software listed below, and were last tested against a fresh install of Mac OS X 10.9.2. If you already have the listed software installed, please skip to the section titled "Start Cassandra (4 minutes)".

  • Java

  • Maven 3.0+

  • Git

  • curl (already ships with Mac OS X)

You may wish to have three terminals open at the directory where you'd like to run the following commands. (One terminal can run Cassandra, one can run MUPD8, and one can fetch slates to inspect results.) The steps that follow will populate the directory with Cassandra, MUPD8, and an example application written from scratch.

Install Java (10 minutes, incl download time)

To check if you have Java installed:

$ java -version
java version "1.7.0_51"
Java(TM) SE Runtime Environment (build 1.7.0_51-b13)
Java HotSpot(TM) 64-Bit Server VM (build 24.51-b03, mixed mode)

If you do not have Java installed, you'll get a pop-up notification. Click the "More Info" button to go to the Java downloads page, and download the JDK for Mac OS X. Once the download completes, install the JDK.

Install Git (8 minutes, incl download time)

To check if you have Git installed. This can be done while Java is downloading.

$ git --version
git version 1.8.5.2 (Apple Git-48)

If you do not have Git installed, you'll get a pop-up notification. Click the "Install" button to download and install the Apple Command Line Tools, which includes Git.

Install Maven 3.0+ (3 minutes)

Once you've verified that Java is installed, check if you have Maven installed. Java is required for Maven to run, so do not start this section if Java isn't yet fully installed.

$ mvn -v
Apache Maven 3.0.5 (r01de14724cdef164cd33c7c8c2fe155faf9602da; 2013-02-19 05:51:28-0800)
:

If you do not have Maven installed, you'll see "command not found". Follow the directions below to install Maven.

Note

Be sure your JAVA_HOME is not blank.

$ echo $JAVA_HOME
/Library/Java/JavaVirtualMachines/jdk1.7.0_51.jdk/Contents/Home

If "echo $JAVA_HOME" shows an empty path, you need to set your JAVA_HOME. Set your JAVA_HOME with these commands:

$ printf "\nexport JAVA_HOME=$(/usr/libexec/java_home)\n\n" >> ~/.profile
$ source ~/.profile

To install Maven globally for your system, start by downloading Maven from an Apache mirror

$ (cd /usr/share && sudo curl -L 'http://archive.apache.org/dist/maven/binaries/apache-maven-3.0.5-bin.tar.gz' -O)

Verify the download hash

$ openssl md5 /usr/share/apache-maven-3.0.5-bin.tar.gz
MD5(apache-maven-3.0.5-bin.tar.gz)= 94c51f0dd139b4b8549204d0605a5859

Decompress the download

$ (cd /usr/share && sudo tar xf apache-maven-3.0.5-bin.tar.gz)

Create a symlink to the Maven executable in a PATH-included location

$ sudo ln -s /usr/share/apache-maven-3.0.5/bin/mvn /usr/bin

Verify the install

$ mvn -v
Apache Maven 3.0.5 (r01de14724cdef164cd33c7c8c2fe155faf9602da; 2013-02-19 05:51:28-0800)
:

Start Cassandra (4 minutes)

In the directory where we'd like to fetch and install Cassandra locally (your home directory works fine), we can download the Cassandra distribution from an Apache mirror:

$ curl -L 'http://archive.apache.org/dist/cassandra/2.0.6/apache-cassandra-2.0.6-bin.tar.gz' -O

$ openssl md5 apache-cassandra-2.0.6-bin.tar.gz
MD5(apache-cassandra-2.0.6-bin.tar.gz)= c8da1f4f546ea31ab85cfb236374863b

Decompress the file:

$ tar xf apache-cassandra-2.0.6-bin.tar.gz

Configure Cassandra to write in the local directory (which does not require root as writing in /var would):

$ sed -i.original "s#/var#$PWD/var#" apache-cassandra-2.0.6/conf/{cassandra.yaml,log4j-server.properties}

Run Cassandra in one terminal (or run Cassandra in the background if you prefer)—

$ (cd apache-cassandra-2.0.6 && bin/cassandra -f)

—and create a keyspace for our MUPD8-application data in another terminal:

$ (cd apache-cassandra-2.0.6 && echo 'create keyspace Mupd8;' | bin/cassandra-cli --host localhost )
Connected to: "Test Cluster" on localhost/9160
Welcome to Cassandra CLI version 2.0.6

The CLI is deprecated and will be removed in Cassandra 3.0.  Consider migrating to cqlsh.
CQL is fully backwards compatible with Thrift data; see http://www.datastax.com/dev/blog/thrift-to-cql3

Type 'help;' or '?' for help.
Type 'quit;' or 'exit;' to quit.

[default@unknown] create keyspace Mupd8;
9ecde970-77bb-11e1-0000-242d50cf1ffd

Build MUPD8 (4 minutes)

Users of Maven 3.0+

$ mvn -v
Apache Maven 3.0.3 (r1075438; 2011-02-28 09:31:09-0800)
:

can simply fetch and build MUPD8 source:

$ git clone git://github.com/walmartlabs/mupd8.git

$ (cd mupd8/ && mvn install assembly:single)

Write Application (2 minutes)

To make the process of writing an application concrete, we write a simple application below. While the steps below use Maven to set up the build, MUPD8 applications can also be written and run without using Maven at all.

Set up the build for a new application mupd8_demo in package namespace com.walmartlabs.example (this example application uses SLF4J, as shown in the code below):

$ mvn archetype:generate \
  -DinteractiveMode=false \
  -DarchetypeGroupId=org.apache.maven.archetypes \
  -DarchetypeArtifactId=maven-archetype-quickstart \
  -DgroupId=com.walmartlabs.example \
  -DartifactId=mupd8_demo \
  -Dpackage=com.walmartlabs.example \
  -Dversion=1.0-SNAPSHOT

$ cd mupd8_demo/
$ patch pom.xml <<EOF
--- pom.xml	2014-03-28 19:33:21.000000000 -0700
+++ pom.xml	2014-03-30 18:07:04.000000000 -0700
@@ -21,5 +21,34 @@
       <version>3.8.1</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.walmartlabs.mupd8</groupId>
+      <artifactId>mupd8</artifactId>
+      <version>2.2.0</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.sun.jmx</groupId>
+          <artifactId>jmxri</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jdmk</groupId>
+          <artifactId>jmxtools</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.jms</groupId>
+          <artifactId>jms</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>1.6.4</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-simple</artifactId>
+      <version>1.6.4</version>
+    </dependency>
   </dependencies>
 </project>
EOF

Write the Mapper code for the new application:

$ cat > src/main/java/com/walmartlabs/example/PathMapper.java <<EOF

package com.walmartlabs.example;

import com.walmartlabs.mupd8.application.Config;
import com.walmartlabs.mupd8.application.binary.Mapper;
import com.walmartlabs.mupd8.application.binary.PerformerUtilities;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;

public class PathMapper implements Mapper {
    private final Logger logger = LoggerFactory.getLogger(PathMapper.class);
    private final Charset charset = Charset.forName("UTF-8");

    private String name;

    public PathMapper(Config config, String n) { name = n; }

    @Override
    public String getName() { return name; }

    @Override
    public void map(PerformerUtilities submitter, String stream, byte[] key, byte[] event) {
        String path = new String(key, charset);
        logger.info("Processing event of key "+path);
        String extension = "html";
        int lastDot = path.lastIndexOf('.');
	int lastSlash = path.lastIndexOf('/');
	if ((lastDot != -1) && ((lastSlash == -1) || (lastDot > lastSlash))) {
            extension = path.substring(lastDot+1);
        }
        try {
            logger.info("Publishing event to key "+extension);
            submitter.publish("request_by_extension", extension.getBytes(charset), event);
        } catch(Exception e) {
            logger.error("Could not publish event: "+e.toString());
        }
    }
}
EOF

Write the Updater code for the new application:

$ cat > src/main/java/com/walmartlabs/example/Counter.java <<EOF

package com.walmartlabs.example;

import com.walmartlabs.mupd8.application.Config;
import com.walmartlabs.mupd8.application.binary.PerformerUtilities;
import com.walmartlabs.mupd8.application.binary.Updater;
import com.walmartlabs.mupd8.application.SlateSizeException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;

public class Counter implements Updater {
    private final Logger logger = LoggerFactory.getLogger(Counter.class);
    private final Charset charset = Charset.forName("UTF-8");

    private String name;

    public Counter(Config config, String n) { name = n; }

    @Override
    public String getName() { return name; }

    @Override
    public void update(PerformerUtilities submitter, String stream, byte[] key, byte[] event, byte[] slate) {
        logger.info("Updating count for key "+new String(key, charset));
        int count = 0;
        try {
            count = Integer.parseInt(new String(slate, charset));
        } catch (NumberFormatException e) {
            count = 0;
        }
        logger.info("Count for key "+new String(key, charset)+": "+count);
        ++count;
        byte[] updatedSlate = Integer.toString(count).getBytes(charset);
        try {
            submitter.replaceSlate(updatedSlate);
        } catch (SlateSizeException e) {
            logger.error("Slate update for slate ["+ new String(slate, charset) +"] failed.");
        }
    }
}
EOF

Finally, configure the application by linking the Mapper and Updater in a directed graph. Notice that the Mapper code publishes to a stream named request_by_extension, so we connect that stream name accordingly in the configuration:

$ mkdir -p etc/ && cat > etc/100-application.cfg <<EOF
{
  "mupd8" : {
    "messageserver":{
      "host": "localhost",
      "port": 8888
    },
    "application" : {
      "mupd8_demo" : {
        "performers" : {
          "request_source" : {
            "mupd8_type"   : "Source",
            "publishes_to"  : [ "source_stream" ]
          },
          "path_map" : {
            "mupd8_type"   : "Mapper",
            "type"          : "java",
            "class"         : "com.walmartlabs.example.PathMapper",
            "subscribes_to" : [ "source_stream" ],
            "publishes_to"  : [ "request_by_extension" ],
            "#clone"         : "false",
            "#column_family" : "mupd8_demo"
          },
          "count_by_extension" : {
            "mupd8_type"   : "Updater",
            "type"          : "java",
            "class"         : "com.walmartlabs.example.Counter",
            "subscribes_to" : [ "request_by_extension" ],
            "publishes_to"  : [ ],
            "slate_ttl"     : 86400,
            "#clone"         : "false",
            "#column_family" : "mupd8_demo"
          }
        }
      }
    },
    "mupd8_status" : {
      "#http_port"      : 6001
    },
    "slate_store" : {
      "port"           : 9160,
      "keyspace"       : "Mupd8",
      "hosts"          : [ "localhost" ],
      "#write_interval" : 15
    },
    "system_hosts"     : [ "localhost" ],
    "#java_class_path"  : "share/java/*",
    "#java_setting"     : "-Xmx200M -Xms200M",
    "sources" : [
         {
             "name" : "source1",
             "source" : "com.walmartlabs.mupd8.JSONSource",
             "host" : "localhost",
             "performer" : "request_source",
             "parameters" : [ "file:sample.txt",  "request:requestUrl" ]
         }
    ]
  }
}
EOF

Note

The configuration keys above that begin with a # symbol simply show some default values for reference; the actual parameter omits the # prefix.

Each string in the mupd8:sources array describes how the application reads its input events: The source performer request_source on system host localhost connects to file:sample.txt to read one event value per line. The source performer parses the line as JSON and extracts its request's requestUrl value as the event key. The source performer publishes the resulting key-value pair as an event.

Finally, build the application:

$ mvn package
:
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 5.924s
[INFO] Finished at: Tue Apr 10 20:13:12 PDT 2012
[INFO] Final Memory: 20M/305M
[INFO] ------------------------------------------------------------------------

Run Application (1 minute)

Provide a little input for the application to verify its effect when run:

$ cat > sample.txt <<EOF
{"server":{"requestStartTime":1333683371266,"processId":"10811@localhost","processStartTime":1333683287172,"threadId":49,"threadRequestNumber":1,"localAddress":"127.0.0.1","localName":"127.0.0.1","localPort":60101,"requestEndTime":1333683371283,"applicationMicroseconds":16303},"request":{"remoteAddress":"127.0.0.1","remotePort":50543,"method":"GET","requestUrl":"/path/beacon.gif","queryString":"q=v","protocol":"HTTP/1.0","requestSize":0},"response":{"status":204,"responseHeaderList":[{"name":"Expires","valueList":["Thu, 01 Jan 1970 00:00:00 GMT"]},{"name":"Content-Type","valueList":["image/gif"]},{"name":"Connection","valueList":["close"]},{"name":"Cache-Control","valueList":["no-store, no-cache, must-revalidate"]}],"responseSize":0}}
EOF

$ wc -l sample.txt
1 sample.txt

Tell Cassandra about the new application before running the application for the first time:

$ (cd ../apache-cassandra-2.0.6 && \
      echo 'create column family mupd8_demo;' | \
      bin/cassandra-cli --host localhost --keyspace Mupd8 )

Run MUPD8:

    $ java -cp ~/.m2/repository/org/slf4j/slf4j-simple/1.6.4/slf4j-simple-1.6.4.jar:~/.m2/repository/org/slf4j/slf4j-api/1.6.4/slf4j-api-1.6.4.jar:../mupd8/target/mupd8-2.2.0.jar:../mupd8/target/mupd8-2.2.0-jar-with-dependencies.jar:target/mupd8_demo-1.0-SNAPSHOT.jar \
        com.walmartlabs.mupd8.Mupd8Main -d etc/ -pidFile mupd8.pid

While MUPD8 is running, we can see the result in another terminal:

$ curl http://localhost:6001/mupd8/slate/mupd8_demo/count_by_extension/gif
1

The URI path is /mupd8/slate/ followed by the name of the application (as named inside the application configuration), the name of the updater (as named within performers in the configuration), and the key.

In this case, the "entire" stream of one event (file:stream.txt) is fully processed so the count is stable. When connected to a live input stream (as hostname:port), the count would vary with each fetch as a reflection of the current slate contents.

In Review

What did this example application do?

Recall that a MUPD8 application is a directed graph of Map and Update performers:

  • Each performer takes an input key-value event from one of its input streams, and in response, publishes zero or more key-value events onto its output streams.

  • Each Updater's input also includes a slate associated with the input-event key, and the Updater may modify ("update") that slate in response to maintain its application state.

  • Each time the Updater gets an event of the same key, it gets its slate back as input (as last updated for the most recent event of this key).

This example application reads JSON objects describing HTTP requests and maintains live counters of requests by URI "filename" extension:

  1. The Source receives JSON for each HTTP request extracts the requestUrl field's value as the event key.

  2. A Mapper extracts the URI-filename extension from each request, and publishes each request as an event using the extension as the event key.

  3. An Updater receives the events keyed by extension, and increments a simple counter (as plaintext, for easy reading).

Tip

In more elaborate applications, a serialized data structure for the slate, e.g., JSON or Google Protocol Buffers, may be more convenient for maintaining additional data.

Finally, note these details about the application configuration:

  • Because these example performers are reentrant, the clone setting is (default) false. If set true, a separate (clone) instance of the performer is created for each thread in MUPD8. Cloning performer instances is wasteful of memory better used as slate cache and event queues, so please write reentrant performer methods instead of setting clone.

    Warning

    The clone setting is deprecated, so please update any code that requires it to be true.

  • Consider setting a finite slate TTL for every Updater. Doing so allows long-unmodified slates to expire at Cassandra, which prevents the application from growing its storage requirements without bound with stale or obsolete data; and allows unused slates in a changed application to eventually expire. Cassandra's expired-slate garbage collection is automatic; identifying and deleting select slates by manual sweep is much less so.