Skip navigation.

BI & Warehousing

Primeros Cursos de Rittman Mead en Español en América Latina

Rittman Mead Consulting - Fri, 2015-09-04 08:04

La semana pasada y en el contexto del OTN Tour LA 2015 dimos nuestros primeros cursos en Español en Quito, Ecuador y Cali, Colombia.

Se trató de un workshop en este caso de un día (8 horas) sobre Desarrollo Front-End de OBIEE 11g.

En esta oportunidad los cursos fueron presenciales, dónde todo el material teórico-práctico era en español y cada alumno tenía a su disposición durante el entrenamiento una máquina virtual exclusiva donde realizar la parte práctica contando con el apoyo y la asistencia del instructor en todo momento.

Hace muchos años que vengo dando cursos. El primer curso que di, fue allá por 1999 (ejem, sí, en el siglo pasado). Desde ahí he dado muchísimos más sobre distintos productos Oracle inclusive como instructora de Oracle University lo cuál he sido por más de 10 años. En total debo haber dado cursos a más de 500 personas en mi vida.

Hace un año y medio que estoy trabajando felizmente en Rittman Mead desarrollando y dictando cursos en Inglés, lo que ha sido un reto maravilloso. Pero volver a dar cursos en español fue como quien dice “cómo volver a casa”. Realmente una experiencia super linda.


El feedback que tuvimos del curso fue realmente muy bueno, y nos llena de alegría haber logrado una vez más exceder las expectativas de los  participantes. Esto son algunos ejemplos del feedback que hemos recibido.



Queremos agradecer a todos los asistentes del curso por su participación y también a nuestros partners en Ecuador Bluefrog y Refundation, y a los Grupos de Usuario de Oracle de Ecuador y Colombia por la organización.

Si quieres recibir nuestros cursos en tu propio idioma, ya sea en Español o Portugués o quieres convertirte en nuestro partner de entrenamientos, mándanos un correo a y nos pondremos inmediatamente en contacto contigo.

Categories: BI & Warehousing

Schema On Demand and Extensibility

Dylan's BI Notes - Fri, 2015-09-04 00:44
Today I see a quite impressive demo in the Global Big Data Conference. AtScale provides a BI metadata tool for data stored in Hadoop. At first, I thought that this is just another BI tool that access Hadoop via Hive like what we have in OBIEE.  I heard that that the SQL performance for BI […]
Categories: BI & Warehousing

UKOUG Partner of the Year Awards 2015

Rittman Mead Consulting - Thu, 2015-09-03 08:48


It’s that time of year again for the UKOUG Partner of the Year Awards. This year we have been nominated for 4 awards:

  • Engineered Systems Partner of the Year Award
  • Business Analytics Partner of the Year Award
  • Training Partner of the Year Award
  • Emerging Partner of the Year Award

The awards are decided by “end users of Oracle-related products or services” i.e. you, so we would like to ask you to vote for us by going to this link.

I would like to propose four reasons why I think we deserve these awards.

Research, development and sharing

The culture at Rittman Mead has always been to delve into the tools we use, push their boundaries and share what we learn with the community. Internally, we have formalised this by having our own in house R&D department. People like Robin Moffatt, Jordan Meyer and Mark Rittman spend a lot of time and effort looking at the core Oracle data and analytics toolset to determine the optimal way to use it and see which other leading edge tools can be integrated into it.

This has given rise to a huge amount of freely available information ranging from a whole series on OBIEE performance tuning to drinks cabinet optimisation.

We have also worked with Oracle to produce a new version of their reference architecture that was the first one to incorporate the new wave of big data technologies and approaches such as Hadoop and a data reservoir.


One of the main drivers for our R&D department is to make us more effective at delivering data and analytics projects.

We are continually investigating common and new approaches and design patterns found in the world of ETL, data warehousing, business intelligence, analytics, data science, big data and agile project delivery, and combining them with our experience to define optimal ways deliver projects.

Again, we share a lot of these approaches through talks at Oracle and community conferences, blog posts and routines shared on our GitHub repository.

Learning and education

Learning is key to everything we do in life, and as such, we see the provision of independent courses for Oracle business intelligence and data integration tools as key for the industry. We have developed our own training materials based on the different roles people play on projects, for example we have a Business Enablement Bootcamp aimed at end users and OBIEE Bootcamp aimed at developers. We know from our feedback forms how effective this training is.

To supplement the training materials we also wrote the official OBIEE Oracle Press book based around the same examples and data sets.


Our key role as an Oracle partner and member of the Oracle community is to optimise the value any organisation gets from investing in Oracle data and analytics related software and hardware.

This is something that requires a long term commitment, a high level of investment and a deep level of knowledge and experience, which is hopefully demonstrated above. To this end, we are prepared to often go beyond the level of information that Oracle can offer and in certain cases challenge their own understanding of the tools.

We were the first UK partner to buy an Exalytics server, for example, and have written a whole host of articles around the subject. Similarly we are the proud owner of a BICS pod and we are now evaluating how organisations can effectively use cloud in their strategic business intelligence architectures and then, if they do, the best approach to integrating it.

Finally, we are also investing heavily in user engagement, providing the capability to measure then optimise an organisation’s data and analytics systems. We believe user engagement is directly and measurably linked to the return organisations get from their investment in Oracle data and analytics software and hardware.

In Summary

So, in summary, I hope that the reasons that I outline above explain why we deserve some or all of the above awards, as they act as a great way to recognise the effort put in by all our staff over the course of the year. The voting link is here.

Categories: BI & Warehousing

Using Data Relationship Management to Maintain Hierarchies for BI Apps (1)

Dylan's BI Notes - Wed, 2015-09-02 10:52
DRM is a generic data management application. It provides a web based application that allows the deploying company to maintain the data. It is a collaboration tool that allows you to define the validation and set up the data security duties and share the maintenance. Earlier the tool was designed to maintain the account information.  However, […]
Categories: BI & Warehousing

The greatest OBIEE usage study ever…

Rittman Mead Consulting - Tue, 2015-09-01 09:03

Rittman Mead is excited to announce the first ever global OBIEE usage survey. Our goal is to provide deep analysis into how organisations use their OBIEE systems and to create a set of industry benchmarks. We will be releasing the results of this research for free to the OBIEE community.


The types of metrics we are looking at are:

  • DAU/MAU (daily average users/monthly average users) – this gives an approximation of utility;
  • Frequency of use – how often your users access dashboards;
  • Recency of use – when was the last time reports and dashboards were accessed;
  • Reach of system – how many users are accessing the system.

Here’s how it works: we need your Usage Tracking data. To make providing this data easier we can send you a script for your system administrator to run to extract this. We even have an option to obfuscate key attributes so we can’t see any usernames or sensitive details.

Once we receive your data, we will analyze your data individually and provide you with a free report designed to provide you unique insight into to your system’s usage, an example of which is available here.

We will also add your obfuscated, depersonalised and aggregated data to our benchmarking database and let you know how your system performs against industry standards.

Please note: you do have to be running the usage tracking feature of OBIEE for this to work. We strongly advise having this running in any system and can help you get it turned on, if required. Also any data you send to Rittman Mead is completely anonymous and holds no personal or sensitive attributes. It will only be used for benchmarking.

At the end of the survey we will perform a detailed analysis of the complete OBIEE usage database and publish the results.

How do I take part?

Please email us at and we will send full over the scripts and full instructions.

Why are we doing this?

We are currently focused on user engagement of BI and analytics systems and have been conducting research over the last few months. We have found very few tangible studies about enterprise BI usage, in particular OBIEE usage.

We are creating this database from OBIEE users around the world and will use this as the academic basis for furthering our research into user engagement and OBIEE.


Categories: BI & Warehousing

Three Easy Ways to Stream Twitter Data into ElasticSearch

Rittman Mead Consulting - Sat, 2015-08-29 00:46

For the past few months a friend has been driving me crazy with all his praise for Splunk. He was going on about how easy it is to install, integrate different sources and build reports. I eventually started playing around to see if it could be used for a personal project I’m working on. In no time at all I understood what he was on about and I could see the value and ease of use of the product. Unfortunately the price of such a product means it is not a solution for everyone so I started looking around for alternatives and ElasticSearch caught my eye as a good option.

In this post we will focus on how we can stream Twitter data into ElasticSearch and explore the different options for doing so. Storing data in ElasticSearch is just the first step but you only gain real value when you start analysing this data. In the next post we will add sentiment analysis to our Twitter messages and see how we can analyse this data by building Kibana dashboards. But for now we will dig a bit deeper into the following three configuration options:

We will look at the installation and configuration of each of these and see how we can subscribe to twitter using the Twitter API. Data will then get processed, if required, and sent to Elasticsearch.


Why Use Elasticsearch

Elasticsearch has the ability to store large quantities of semi-structured (JSON) data and provides the ability to quickly and easily query this data. This makes it a good option for storing Twitter data which is delivered as JSON and a perfect candidate for the project I’m working on.


You will need a server to host all the required components. I used an AWS free tier (t2.micro) instance running Amazon Linux 64-bit. This post assumes you already have an elasticsearch cluster up and running and that you have a basic understanding of elasticsearch. There are some good blog posts, written by Robin Moffatt, which were very useful during the installation and configuration stages.

Twitter Stream API

In order to access the Twitter Streaming API, you need to register an application at Once created, you should be redirected to your app’s page, where you can get the consumer key and consumer secret and create an access token under the “Keys and Access Tokens” tab. These values will be used as part of the configuration for all the sample configurations to follow.

The API allows two types of subscriptions. Either subscribe to specific keywords or to a user timeline (similar to what you see as a twitter user).


We'll start with logstash as this is probably the easiest one to configure and seems to be the recommended approach for integrating sources with elasticsearch in recent versions. At the time of writing this post, logstash only supported streaming based on keywords which meant it was not suitable for my needs but it’s still a very useful option to cover.

logstash installation

To install logstash you need to download the correct archive based on the version of elasticsearch you are running.

curl -O

Extract the archived file and move the extracted folder to a location of your choice

tar zxf logstash-x.x.x.tar.gz
mv logstash-x.x.x /usr/share/logstash/

logstash configuration

To configure logstash we need to provide input, output and filter elements. For our example we will only specify input (twitter) and output (elasticsearch) elements as we will be storing the full twitter message.

For a full list of logstash twitter input settings see the official documentation.

Using your favourite text editor, create a file called twitter_logstash.conf and copy the below text. Update the consumer_key, consumer_secret, oath_token and oath_token_secret values with the values from your Twitter Stream App created earlier.

input {
    twitter {
        # add your data
        consumer_key => "CONSUMER_KEY_GOES_HERE"
        consumer_secret => "CONSUMER_SECRET_GOES_HERE"
        oauth_token => "ACCESS_TOKEN_GOES_HERE"
        oauth_token_secret => "ACCESS_TOKEN_SECRET_GOES_HERE"
        keywords => ["obiee","oracle"]
        full_tweet => true
output {
    elasticsearch_http {
        host => "localhost"
        index => "idx_ls"
        index_type => "tweet_ls"

This configuration will receive all tweets tagged with obiee or oracle and store them to an index called idx_ls in elasticsearch.

To run logstash, execute the following command from the installed location

bin/logstash -f twitter_logstash.conf

If you subscribed to active twitter tags you should see data within a few seconds. To confirm if your data is flowing you can navigate to http://server_ip:9200/_cat/indices?v which will show you a list of indices with some relevant information.


With this easy configuration you can get Twitter data flowing in no time at all.

Twitter River Plugin

Next we will look at using the River Plugins to stream Twitter data. The only reason to use this approach over logstash is if you want to subscribe to a user timeline. Using this feature will show the same information as the Twitter application or viewing your timeline online.

Note!!Twitter River is not supported from ElasticSearch 2.0+ and should be avoided if possible. Thanks to David Pilato for highlighting this point. It is still useful to know of this option in the very rare case where it might be useful.

Twitter River Plugin installation

Before installing the plugin you need to determine which version is compatible with your version of elasticsearch. You can confirm this at and selecting the correct one.

To install you need to use the elasticsearch plugin installation script. From the elasticsearch installation directory, execute:

bin/plugin -install elasticsearch/elasticsearch-river-twitter/x.x.x

Then restart your Elasticsearch service.

Twitter River Plugin configuration

To configure the twitter subscriber we will again create a .conf file with the necessary configuration elements. Create a new file called twitter_river.conf and copy the following text. As with logstash, update the required fields with the values from the twitter app created earlier.

  "type": "twitter",
  "twitter" : {
        "oauth" : {
            "consumer_key" : "CONSUMER_KEY_GOES_HERE",
            "consumer_secret" : "CONSUMER_SECRET_GOES_HERE",
            "access_token" : "ACCESS_TOKEN_GOES_HERE",
            "access_token_secret" : "ACCESS_TOKEN_SECRET_GOES_HERE"
        "filter" : {
            "tracks" : ["obiee", "oracle"]
        "raw" : true,
        "geo_as_array" : true
  "index": {
    "index": "idx_rvr",
    "type": "tweet_rvr",
    "bulk_size": 100,
    "flush_interval": "5s"

This configuration is identical to the logstash configuration and will receive the same tweets from twitter. To subscribe to a user timeline instead of keywords, replace the filter configuration element:

"filter" : {
      "tracks" : ["obiee", "oracle"],

with a user type element

"type" : "user",

To start the plugin you need to execute the following from a terminal window.

curl -XPUT localhost:9200/_river/idx_rvr/_meta -d @twitter_river.conf

Depending on how active your subscribed tags are you should see data within a few seconds in elasticsearch. You can again navigate to http://server_ip:9200/_cat/indices?v to confirm if your data is flowing. Note this time that you should see two new rows, one index called _river and the other idx_rvr. idx_rvr is where your twitter data will be stored.

To stop the plugin (or change between keywords and user timeline), execute the following from a terminal window:

curl -XDELETE 'localhost:9200/_river/idx_rvr';


Finally we will look at the most flexible solution of them all. It is a bit more complicated to install and configure but, given what you gain, the small amount of extra time spent is well worth the effort. Once you have Tweepy working you will be able to write you own python code to manipulate the data as you see fit.

Tweepy installation

As Tweepy is a python package we will use pip to install the required packages. If you don't have pip installed. Execute one of the following, depending on your linux distribution.

yum -y install python-pip


apt-get install python-pip

Next we will install the Tweepy and elasticsearch packages

pip install tweepy
pip install elasticsearch

Tweepy configuration

Create a new file called and copy the following text to the file

import tweepy
import sys
import json
from textwrap import TextWrapper
from datetime import datetime
from elasticsearch import Elasticsearch



auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

es = Elasticsearch()

class StreamListener(tweepy.StreamListener):
    status_wrapper = TextWrapper(width=60, initial_indent='    ', subsequent_indent='    ')

    def on_status(self, status):
            #print '\n%s %s' % (, status.created_at)

            json_data = status._json
            #print json_data['text']


        except Exception, e:
            print e

streamer = tweepy.Stream(auth=auth, listener=StreamListener(), timeout=3000000000 )

#Fill with your own Keywords bellow
terms = ['obiee','oracle']


As with the river plugin you can subscribe to the user timeline by changing the subscription type. To do this replace the last line in the script with


To start the listener you need to execute the python file


Navigate to the elasticsearch index list again to ensure you are receiving data.


Getting Twitter data into Elasticsearch is actually pretty simple. Logstash is by far the easiest one to configure and if subscribing to keywords is your only requirement it should be the preferred solution. Now that we have the foundation in place, in the next post we will have a look at how we can enhance this data by adding sentiment analysis and how we can use this data to make decisions.

Categories: BI & Warehousing

Page Borders and Title Underlines

Tim Dexter - Wed, 2015-08-26 15:32

I have taken to recording screen grabs to help some folks out on 'how do I' scenarios. Sometimes a 3 minute video saves a couple of thousand words and several screen shots.

So, per chance you need to know:

1. How to add a page border to your output and/or

2. How to add an under line that runs across the page

Watch this!

If you need the template, sample data and output, get them here.

I'm taking requests if you have them.

Categories: BI & Warehousing

MRP process on standby stops with ORA-00600

Amardeep Sidhu - Thu, 2015-08-20 03:03

A rather not so great post about an ORA-00600 error i faced on a standby database. Environement was on Sun Super Cluster machine. MRP process was hitting ORA-00600 while trying to apply a specific archive log.

The error message was something like this

MRP0: Background Media Recovery terminated with error 600
Errors in file /u01/app/oracle/product/
ORA-00600: internal error code, arguments: [2619], [539], [], [], [], [], [], [], [], [], [], []
Recovery interrupted!

Some googling and MOS searches revealed that the error was due to corrupted archive log file. Recopying the archive file from primary and restarting the recovery resolved the issue. The fist argument of the ORA-600 is actually the sequence no of the archive it is trying to apply.

Categories: BI & Warehousing

Combining Spark Streaming and Data Frames for Near-Real Time Log Analysis & Enrichment

Rittman Mead Consulting - Sat, 2015-08-01 08:51

A few months ago I posted an article on the blog around using Apache Spark to analyse activity on our website, using Spark to join the site activity to some reference tables for some one-off analysis. In this article I’ll be taking an initial look at Spark Streaming, a component within the overall Spark platform that allows you to ingest and process data in near real-time whilst keeping the same overall code-based as your batch-style Spark programs.


Like regular batch-based Spark programs, Spark Streaming builds on the concept of RDDs (Resilient Distributed Datasets) and provides an additional high-level abstraction called a “discretized stream” or DStream, representing a continuous stream of RDDs over a defined time period. In the example I’m going to create I’ll use Spark Streaming’s DStream feature to hold in-memory the last 24hrs worth of website activity, and use it to update a “Top Ten Pages” Impala table that’ll get updated once a minute.


To create the example I started with the Log Analyzer example in the set of DataBricks Spark Reference Applications, and adapted the Spark Streaming / Spark SQL example to work with our CombinedLogFormat log format that contains two additional log elements. In addition, I’ll also join the incoming data stream with some reference data sitting in an Oracle database and then output a parquet-format file to the HDFS filesystem containing the top ten pages over that period.

The bits of the Log Analyzer reference application that we reused comprise of two scripts that compile into a single JAR file; a script that creates a Scala object to parse the incoming CombinedLogFormat log files, and other with the main program in. The log parsing object contains a single function that takes a set of log lines, then returns a Scala class that breaks the log entries down into the individual elements (IP address, endpoint (URL), referrer and so on). Compared to the DataBricks reference application I had to add two extra log file elements to the ApacheAccessLog class (referer and agent), and add some code in to deal with “-“ values that could be in the log for the content size; I also added some extra code to ensure the URLs (endpoints) quoted in the log matched the format used in the data extracted from our WordPress install, which stores all URLs with a trailing forward-slash (“/“).

package com.databricks.apps.logs
case class ApacheAccessLog(ipAddress: String, clientIdentd: String,
 userId: String, dateTime: String, method: String,
 endpoint: String, protocol: String,
 responseCode: Int, contentSize: Long, 
 referer: String, agent: String) {
object ApacheAccessLog {
val PATTERN = """^(\S+) (\S+) (\S+) \[([\w\d:\/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) ([\d\-]+) "([^"]+)" "([^"]+)"""".r
def parseLogLine(log: String): ApacheAccessLog = {
 val res = PATTERN.findFirstMatchIn(log)
 if (res.isEmpty) {
 ApacheAccessLog("", "", "", "","", "", "", 0, 0, "", "")
 else {
 val m = res.get
 val contentSizeSafe : Long = if ( == "-") 0 else
 val formattedEndpoint : String = (if ( == "/") else"/"))
 ApacheAccessLog(,,,,, formattedEndpoint,,, contentSizeSafe,,

The body of the main application script looks like this – I’ll go through it step-by-step afterwards:</>

package com.databricks.apps.logs.chapter1

import com.databricks.apps.logs.ApacheAccessLog
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SaveMode
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{StreamingContext, Duration}

object LogAnalyzerStreamingSQL {
  val WINDOW_LENGTH = new Duration(86400 * 1000)
  val SLIDE_INTERVAL = new Duration(10 * 1000)

  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Log Analyzer Streaming in Scala")
    val sc = new SparkContext(sparkConf)

    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    val postsDF = sqlContext.load("jdbc", Map(
                  "url" -> "jdbc:oracle:thin:blog_refdata/",
                  "dbtable" -> "BLOG_REFDATA.POST_DETAILS"))

    val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL)

    val logLinesDStream = streamingContext.textFileStream("/user/oracle/rm_logs_incoming")

    val accessLogsDStream =

    val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL)

    windowDStream.foreachRDD(accessLogs => {
      if (accessLogs.count() == 0) {
        println("No logs received in this time interval")
      } else {

        // Filter out bots 
        val accessLogsFilteredDF = accessLogs
                                      .filter( r => ! r.agent.matches(".*(spider|robot|bot|slurp|bot|monitis|Baiduspider|AhrefsBot|EasouSpider|HTTrack|Uptime|FeedFetcher|dummy).*"))
                                      .filter( r => ! r.endpoint.matches(".*(wp-content|wp-admin|wp-includes|favicon.ico|xmlrpc.php|wp-comments-post.php).*")).toDF()
        val topTenPostsLast24Hour = sqlContext.sql("SELECT p.POST_TITLE, p.POST_AUTHOR, COUNT(*) as total FROM accessLogsFiltered a JOIN posts p ON a.endpoint = p.POST_SLUG GROUP BY p.POST_TITLE, p.POST_AUTHOR ORDER BY total DESC LIMIT 10 ")                 
        // Persist top ten table for this window to HDFS as parquet file
       "/user/oracle/rm_logs_batch_output/topTenPostsLast24Hour.parquet", "parquet", SaveMode.Overwrite)      


The application code starts then by importing Scala classes for Spark, Spark SQL and Spark Streaming, and then defines two variable that determine the amount of log data the application will consider; WINDOW_LENGTH (86400 milliseconds, or 24hrs) which determines the window of log activity that the application will consider, and SLIDE_INTERVAL, set to 60 milliseconds or one minute, which determines how often the statistics are recalculated. Using these values means that our Spark Streaming application will recompute every minute the top ten most popular pages over the last 24 hours.

package com.databricks.apps.logs.chapter1
import com.databricks.apps.logs.ApacheAccessLog
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SaveMode
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{StreamingContext, Duration}
object LogAnalyzerStreamingSQL {
 val WINDOW_LENGTH = new Duration(86400 * 1000)
 val SLIDE_INTERVAL = new Duration(60 * 1000)

In our Spark Streaming application, we’re also going to load-up reference data from our WordPress site, exported and stored in an Oracle database, to add post title and post author values to the raw log entries that come in via Spark Streaming. In the next part of the script then we define a new Spark context and then a Spark SQL context off-of the base Spark context, then create a Spark SQL data frame to hold the Oracle-sourced WordPress data to later-on join to the incoming DStream data – using Spark’s new Data Frame feature and the Oracle JDBC drivers that I separately download off-of the Oracle website, I can pull in reference data from Oracle or other database sources, or bring it in from a CSV file as I did in the previous Spark example, to supplement my raw incoming log data. 

def main(args: Array[String]) {
 val sparkConf = new SparkConf().setAppName("Log Analyzer Streaming in Scala")
 val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
 import sqlContext.implicits._
 val postsDF = sqlContext.load("jdbc", Map(
 "url" -> "jdbc:oracle:thin:blog_refdata/",

Note also how Spark SQL lets me declare a data frame (or indeed any RDD with an associated schema) as a Spark SQL table, so that I can later run SQL queries against it – I’ll come back to this at the end).

Now comes the first part of the Spark Streaming code. I start by defining a new Spark Streaming content off of the same base Spark context that I created the Spark SQL one off-of, then I use that Spark Streaming context to create a DStream that reads newly-arrived files landed in an HDFS directory  – for this example I’ll manually copy the log files into an “incoming” HDFS directory, whereas in real-life I’d connect Spark Streaming to Flume using FlumeUtils for a more direct-connection to activity on the webserver. 

val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL)
val logLinesDStream = streamingContext.textFileStream("/user/oracle/rm_logs_incoming")

Then I call the Scala “map” transformation to convert the incoming DStream into an ApacheAccessLog-formatted DStream, and cache this new DStream in-memory. Next and as the final part of this stage, I call the Spark Streaming “window” function which packages the input data into in this case a 24-hour window of data, and creates a new Spark RDD every SLIDE_INTERVAL – in this case 1 minute – of time.

val accessLogsDStream =
val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL)

Now that Spark Streaming is creating RDDs for me to represent all the log activity over my 24 hour period, I can use the .foreachRDD control structure to turn that RDD into its own data frame (using the schema I’ve inherited from the ApacheAccessLog Scala class earlier on), and filter out bot activity and references to internal WordPress pages so that I’m left with actual page accesses to then calculate the top ten list from.

windowDStream.foreachRDD(accessLogs => {
 if (accessLogs.count() == 0) {
 println("No logs received in this time interval")
 } else {
// Filter out bots 
 val accessLogsFilteredDF = accessLogs
 .filter( r => ! r.agent.matches(".*(spider|robot|bot|slurp|bot|monitis|Baiduspider|AhrefsBot|EasouSpider|HTTrack|Uptime|FeedFetcher|dummy).*"))
 .filter( r => ! r.endpoint.matches(".*(wp-content|wp-admin|wp-includes|favicon.ico|xmlrpc.php|wp-comments-post.php).*")).toDF()

Then, I use Spark SQL’s ability to join tables created against the windowed log data and the Oracle reference data I brought in earlier, to create a parquet-formatted file containing the top-ten most popular pages over the past 24 hours. Parquet is the default storage format used by Spark SQL and is suited best to BI-style columnar queries, but I could use Avro, CSV or another file format If I brought the correct library imports in.

val topTenPostsLast24Hour = sqlContext.sql("SELECT p.POST_TITLE, p.POST_AUTHOR, COUNT(*) as total FROM accessLogsFiltered a JOIN posts p ON a.endpoint = p.POST_SLUG GROUP BY p.POST_TITLE, P.POST_AUTHOR ORDER BY total DESC LIMIT 10 ") 
 // Persist top ten table for this window to HDFS as parquet file"/user/oracle/rm_logs_batch_output/topTenPostsLast24Hour.parquet", "parquet", SaveMode.Overwrite) 

Finally, the last piece of the code starts-off the data ingestion process and then continues until the process is interrupted or stopped.


I can now go over to Hue and move some log files into the HDFS directory that the Spark application is running on, like this:


Then, based on the SLIDE_INTERVAL I defined in the main Spark application earlier on (60 seconds, in my case) the Spark Streaming application picks up the new files and processes them, outputting the results as a Parquet file back on the HDFS filesystem (these two screenshots should display as animated GIFs)


So what to do with the top-ten pages parquet file that the Spark Streaming application creates? The most obvious thing to do would be to create an Impala table over it, using the schema metadata embedded into the parquet file, like this:

CREATE EXTERNAL TABLE rm_logs_24hr_top_ten
LIKE PARQUET '/user/oracle/rm_logs_batch_output/topTenPostsLast24Hour.parquet/part-r-00001.parquet'
LOCATION '/user/oracle/rm_logs_batch_output/topTenPostsLast24Hour.parquet';

Then I can query the table using Hue again, or I can import the Impala table metadata into OBIEE and analyse it using Answers and Dashboards.


So that’s a very basic example of Spark Streaming, and I’ll be building on this example over the new few weeks to add features such as persistent storing of all processed data, and classification and clustering the data using Spark MLlib. More importantly, copying files into HDFS for ingestion into Spark Streaming adds quite a lot of latency and it’d be better to connect Spark directly to the webserver using Flume or even better, Kafka – I’ll add examples showing these features in the next few posts in this series.

Categories: BI & Warehousing

Data Mining Scoring Development Process

Dylan Wan - Thu, 2015-04-02 23:39

I think that the process of building a data mining scoring engine is similar to develop an application.

We have the requirement analysis, functional design, technical design, coding, testing, deployment, etc. phases.

Categories: BI & Warehousing