Parallel apply in Python

Often in text analytics, we need to process many sentences as part of the text pre-processing (e.g. stemming, stop word removal, punctuation removal) prior to creating the DTM.

Processing the corpus serially would take quite a long time and often we would use SMP to help speed up the process. One of the most useful functions is the apply functions.

In R, this is relatively simple using parapply or dopar. We would just simply use these in place of the familiar apply family of functions. You can learn more about the apply family of functions from the links below:

However with Python, there is no out-of-the-box implementation of parallel apply. And while it is not difficult to create your own parallel apply function, a common limitation is that you cannot pass in a dynamic list of parameters (unlike R). And while there are different solutions available (see links below), they require a lot of wrapper functions and using the ZIP function may not allow you to pass in the partitioned data frame easily.

Note: There’s a fork of multiprocessing called pathos that looks to resolve this issue – however I have not been able to test it.

This short article will show you an example of how you can create your own parallel apply function that allows you to pass in a dynamic list of parameters – thereby making your code more generic and applicable across many different types of functions.

As this parallel apply function in Python will require only the Multiprocessing and Pandas package, it should be easily portable.

I have used code from two separate sites as references to create this tutorial and you can reference them here:

Base parApply Function

We start with the base parApply function as follows:

from multiprocessing import Pool, cpu_count

def parApply(dfGrouped, func):
    with Pool(cpu_count()) as p:
    ret_list = p.map(func, [group for name, group in dfGrouped])
    return pandas.concat(ret_list)

This function uses the Map function to execute “func” based on the partitioned dataframe. Once the execution is done, the partitions are then concatenated back to form the original dataframe.

However, one of the major issues is that “map” here does not allow multiple parameters as inputs to the function.

Create Class wrapper for dynamic parameter list

The following details the code to create a wrapper class to capture the dynamic parameter list


class WithExtraArgs(object):
    def __init__(self, func, **args):
        self.func = func
        self.args = args
    def __call__(self, df):
        return self.func(df, **self.args)

def applyParallel(dfGrouped, func, kwargs):
    with Pool(cpu_count()) as p:
        ret_list = p.map(WithExtraArgs(func, **kwargs), [group for name, group in dfGrouped])
    return pd.concat(ret_list)

The special syntax, *args and **kwargs in function definitions is used to pass a variable number of arguments to a function. The single asterisk form (*args) is used to pass a non-keyworded, variable-length argument list, and the double asterisk form is used to pass a keyworded, variable-length argument list.

Since it is possible that the list of arguments for functions may be unknown, it may be better to use a keyworded, variable-length argument list.

Using the applyParallel function

To use the applyParallel function, you first define the function to be applied (in this example remove_punct )

The remove_punct function strips out all the punctuation characters from a column with text data in the data frame and then creates the new column based on the index and with the new column name. Note that the function has 4 parameters with the first parameter as the partitioned data frame and 3 other parameters (destination col index, destination column name and source column name)


def remove_punct(df, dest_col_ind, dest_col_name, src_col_name):
    df.insert(dest_col_ind, dest_col_name, df.apply(lambda x: re.sub('['+string.punctuation+']', '',x[src_col_name]), axis=1, raw=True))
    return df

#kwargs = {"dest_col_ind": 4, "dest_col_name": "q1nopunct", "src_col_name": "question1"}

applyParallel(df_train.groupby(df_train.grpId), remove_punct, {"dest_col_ind": 4, "dest_col_name": "TargetCol", "src_col_name": "OriginalCol"})

Next you just simply call the applyParallel function with the source data frame, function, and a list of keyworded and variable value pairs.

Please note the following:

  • The keyword in the argument list and the function (i.e remove_punct) parameters have the same name. This allows automatic matching of the keyword to the parameter.
  • The remove_punct function has 4 parameters but only 3 are in the dynamic argument list. This is intentional because the following line in the wrapper class WithExtraArgs has the first parameter already mapped to the data frame
self.func(df, **self.args)

And so with just the above few lines of additional Python code, you are now able to parallelise the apply function for use with your functions without having to always redefine the applyParallel function.

You can download the source code on Github for your easy reference.

Hope this helps in your coding!

Simple Twitter Sentiment Analytics Using Apache Flume and Spark – Part 3

In my last 2 posts (Part 1 and Part 2), I outlined the steps to setup Hive tables (on HDFS) and described how to configure Flume to receive Twitter posts and store it in the Hive tables.

In the last part of this series, I will cover the details on the analytical portion of the Twitter sentiment analysis by explaining how the basic sentiment analysis work and how to write the PySpark file to do the processing.

In this tutorial, we will use Twitter feeds to determine the sentiment of each of the different candidates in the 2016 US Election.

Lets start!

Brief Discussion on Sentiment Analysis

There are many different methods and approaches to sentiment analysis. Here we cover only the most basic approaches to sentiment analysis. However, nothing prevents you from adopting a more sophisticated approach using NLP and other tools. The following diagram illustrates the approach taken in this how-to.

twitter-sentiment-spark-20

The sentiment figures will be a rough gauge of how positive or negative the tweets are about the subject. There are several key simplifications taken in the interest of time that you may want to explore further to improve the accuracy of sentiment analysis:

  • Retweets are considered as “having the same sentiment” – obviously, this may not be true but it would simplify the way tweets are processed
  • The presence of the subject will “take on” the sentiment of the tweet. This means that if Samsung S7 is mentioned in the tweet, the sentiment of the tweet is attributed to “Samsung S7”. Obviously, in order to be more accurate, we would need to do entity-name resolution, sentence parsing and aspect based sentiment analysis. But to stay on focus on this tutorial, we will not be covering these topics (perhaps in a subsequent post)
  • Sarcasm and word inflexions are not taken into account and may lead to the following incorrect sentiment scoring as shown below:

twitter sentiment spark-25.png

 

Nonetheless, this is not to say we cannot do sentiment analysis – but rather to highlight on the difficulties in getting the right sentiment – which will not be address in this tutorial.

 

Introduction to PySpark

It has been several years since the introduction of Hadoop and Spark and for a while there was some confusion in their roles as Big Data engineering tools. However, in recent times, their role as supporting components rather than competing products has been widely accepted. Nontheless, I am sharing some definitions of what Spark is from multiple sources which I found useful to highlight the key capabilities / purposes of Spark:

  • Apache Spark is a lightning-fast cluster computing technology, designed for fast computation. It is based on Hadoop MapReduce and it extends the MapReduce model to efficiently use it for more types of computations, which includes interactive queries and stream processing. The main feature of Spark is its in-memory cluster computing that increases the processing speed of an application. (https://www.tutorialspoint.com/apache_spark/apache_spark_introduction.htm)

The components of Spark are as shown below and we will be mainly using SparkSQL and PySpark.

twitter-sentiment-spark-21

To find out more about Spark you can refer to the following: http://spark.apache.org/docs/latest/index.html

PySpark 

Spark is written in Scala and Spark applications during compilation are translated into Java bytecode for execution by the JVM. However, the open source community has developed a toolkit to allow users to write programs in Python (which will still compile down to Java bytecode) called PySpark.

I recommend that you read through the following to understand what you will be coding in the following sections

 

Twitter Message and Candidate Attribution

To simplify this tutorial, how we determine if a tweet is to be attributed to a candidate / subject is by referencing the candidate / subjects handle. For example:

  • Tweet #2
    • @realDonaldTrump wining, lying, Donald Trump, does not make a president
  • Tweet #3
    • @Colonel_Ted: Even if @realDonaldTrump picked @tedcruz (MY fave) for VP I can NEVER in good conscience vote for this megalomaniac. https…

In the tweets above, we will identify the various named entities by their handles @realDonaldTrump, @Colonel_Ted, @tedcruz. Hence the sentiment for the tweet will be attributed to the entities referenced by their Twitter handles.

So in the example above, the sentiment for tweets #1 and #2 will be attributed to Donald Trump, while tweet #3 will be attributed to Donald Trump AND Ted Cruz

As we are taking a very simplistic approach to sentiment attribution, we can use this approach. However, for a more accurate sentiment analysis, I would strongly recommend to do NLP, ENR and aspect based sentiment analysis.

Sentiment Analysis Code

The following steps outline the approach in determining the sentiments of the tweets.

Step 1: Create a simple mapping to label the tweet name. Because different candidates will be referenced in the tweet differently we would need to map each candidate name to the different names they are referred by.

Step 2: Create a dictionary of sentiment words and its associated scores. This will be used to calculate the overall sentiment score of the tweet.

Step 3: For each tweet, calculate the sentiment score and total the score for each candidate

Copy the following code into a file “SentimentAnalysis.py” and save it on the ./SentimentAnalysis folder (refer to Tutorial 1 for the folder location)

#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Code to score tweets using AFINN and to generate a set of sentiment score for each person mentioned.
#   usage: ./bin/pyspark SentimentAnalysis.py
#

"""SentimentAnalysis.py"""

import math
import re
import sys

from StringIO import StringIO
from datetime import datetime
from collections import namedtuple
from operator import add, itemgetter

# Note - SparkContext available as sc, HiveContext available as sqlCtx.
from pyspark import SparkContext
from pyspark import HiveContext
from pyspark.streaming import StreamingContext

sc = SparkContext(appName="PythonSentimentAnalysis")
sqlCtx = HiveContext(sc)

# Read in the word-sentiment list and create a static RDD from it
filenameAFINN = "/home/training/TwitterSentimentAnalysis/AFINN/AFINN-111.txt"

# map applies the lambda function (create a tuple of word and sentiment score) to every item of iterable
# within [ ] and returns a list of results. The dictionary is used here to be able to quickly lookup the
# sentiment score based on the key value
afinn = dict(map(lambda (w, s): (w, int(s)), [ ws.strip().split('\t') for ws in open(filenameAFINN) ]))

# Read in the candidate mapping list and create a static dictionary from it
filenameCandidate = "file:///home/training/TwitterSentimentAnalysis/Candidates/Candidate Mapping.txt"

# map applies the lambda function
candidates = sc.textFile(filenameCandidate).map(lambda x: (x.strip().split(",")[0],x.strip().split(","))) \
				  	   .flatMapValues(lambda x:x).map(lambda y: (y[1],y[0])).distinct()

# word splitter pattern
pattern_split = re.compile(r"\W+")

# use sqlCtx to query the HIVE table
tweets = sqlCtx.sql("select id, text, entities.user_mentions.name from incremental_tweets")

#this python function will calculate the sentiment score of the entire tweet
def sentiment(text):
 words = pattern_split.split(text.lower())
 sentiments = map(lambda word: afinn.get(word, 0), words)
 if sentiments:
  sentiment = float(sum(sentiments))/math.sqrt(len(sentiments))
  #sentiment = float(sum(sentiments))
 else:
  sentiment = 0
 return sentiment

sentimentTuple = tweets.rdd.map(lambda r: [r.id, r.text, r.name]) \
			   .map(lambda r: [sentiment(r[1]),r[2]]) \
			   .flatMapValues(lambda x: x) \
			   .map(lambda y: (y[1],y[0])) \
			   .reduceByKey(lambda x, y: x+y) \
			   .sortByKey(ascending=True)

scoreDF = sentimentTuple.join(candidates) \
			.map(lambda (x,y): (y[1],y[0])) \
			.reduceByKey(lambda a,b: a+b) \
			.toDF()

scoreRenameDF = scoreDF.withColumnRenamed("_1","Candidate").withColumnRenamed("_2","Score")

sqlCtx.registerDataFrameAsTable(scoreRenameDF, "SCORE_TEMP")

sqlCtx.sql("INSERT OVERWRITE TABLE candidate_score SELECT Candidate, Score FROM SCORE_TEMP")

The statements above are self-explanatory but I will take some time to discuss the key statements in this entire script as follows:

sentimentTuple = tweets.rdd.map(lambda r: [r.id, r.text, r.name]) \
			   .map(lambda r: [sentiment(r[1]),r[2]]) \
			   .flatMapValues(lambda x: x) \
			   .map(lambda y: (y[1],y[0])) \
			   .reduceByKey(lambda x, y: x+y) \
			   .sortByKey(ascending=True)

scoreDF = sentimentTuple.join(candidates) \
			.map(lambda (x,y): (y[1],y[0])) \
			.reduceByKey(lambda a,b: a+b) \
			.toDF()

The following describes what each line in the above statement is doing.

sentimentTuple = tweets.rdd.map(lambda r: [r.id, r.text, r.name])

The result set from the query of tweets is referenced as a Resilient Data Set (.rdd) and then the lambda function is applied to all the rows in the RDD using the map method. The “map” method can be thought of as the “map” in “map-reduce”.

Note that the lambda function takes in a row of the RDD (in parameter r) and we create a list of 3-tuples (r.id, r.text, r.name)

[724232069421768704, u”@LewisHShupe I’m tired of being penalized for not having #Obamacare because I can’t afford it/ @BarackObama is a #POS #terrorist #TrumpTrain”, [u’Lewis Shupe’, u’Barack Obama’]]

r.id r.text r.name
724232069421768704 “@LewisHShupe I’m tired of being penalized for not having #Obamacare because I can’t afford it/ @BarackObama is a #POS #terrorist #TrumpTrain Lewis Shupe, Barack Obama
.map(lambda r: [sentiment(r[1]),r[2]]) \

With the list of 3-tuples, we again apply the lambda function on each row (now a 3-tuple) that calls the sentiment function earlier defined and returns a 2-tuple – sentiment score and r.text (r[2])

[1.0425720702853738, [u'Hillary Clinton']],
[-0.47140452079103173, [u'RTina']],
[-0.4082482904638631, [u'Lewis Shupe', u'Barack Obama']],
[-1.3093073414159544, [u'CJLB', u'Ted Cruz', u'Jim Hoft']]
sentiment(r[1]) r[2]
1.0425720702853738 Hillary Clinton
-0.47140452079103173 RTina
-0.4082482904638631 Lewis Shupe, Barack Obama
-1.3093073414159544 CJLB, Ted Cruz, Jim Hoft
.flatMapValues(lambda x: x) \

With the 2-tuple dataset, we apply a flatMapValues function to flatten out the structure within r[2]. This will make it easier to process in the subsequent steps

(1.0425720702853738, u'Hillary Clinton'), (-0.47140452079103173, u'RTina'), (0.0, u'Halli Casser-Jayne'), (0.0, u'Yvonne Slee '), (0.0, u'Raymond W Clarke'), (-1.3093073414159544, u'CJLB'), (-1.3093073414159544, u'Ted Cruz'), (-1.3093073414159544, u'Jim Hoft')
sentiment(r[1]) r[2]
1.0425720702853738 Hillary Clinton
-0.47140452079103173 RTina
-0.4082482904638631 Lewis Shupe
-0.4082482904638631 Barack Obama
-1.3093073414159544 CJLB
-1.3093073414159544 Ted Cruz
-1.3093073414159544 Jim Hoft
.map(lambda y: (y[1],y[0])) \

This is a simple step to swap the fields so that the first column is the name and the second is the sentiment score

y[1] y[0]
Hillary Clinton 1.0425720702853738
RTina -0.47140452079103173
Lewis Shupe -0.4082482904638631
Barack Obama -0.4082482904638631
CJLB -1.3093073414159544
Ted Cruz -1.3093073414159544
Jim Hoft -1.3093073414159544
.reduceByKey(lambda x, y: x+y) \

The reduceByKey is a function where the values of the 2-tuples are added together. By default the first column in the data set will be the key – hence this statement will add all the y[0] having the same key value in y[1]. The result of summing each name found in the entire set of tweets is as shown below (first 6 only, not sorted in any order)

(u'Ava Guthrie', -1.6329931618554523)
(u'K Kelly', 0.0)
(u'AUBURN FAN 4 RUBIO', -0.6255432421712244)
(u'Royal Jordanian', 0.47140452079103173)
(u'JOEY MANNARINO', 6.4236405483757295)
(u'Jimmy Kimmel', -0.75)
.sortByKey(ascending=True)

Finally the sortByKey method will sort the 2-tuple RDD in ascending order by name. In the sample extract below, the first 10 2-tuples sorted by name is as shown below:

(u'#$@%&+?', 0.47140452079103173)
(u'#1 Road Warrior Fan', 0.0)
(u'#4thewin', 0.8121063121025829)
(u'#Alwaystrump', -1.4278802916547295)
(u'#AngryMajorityNY', 1.6417736995057388)
(u'#BERNIEFACTS', 0.20062843367526345)
(u'#BLACKS FOR TRUMP!!!', 0.0)
(u'#BargainingChip', 0.4364357804719848)
(u'#BlackLivesMatter-LA', 0.0)
(u'#Chemtrails', -0.5345224838248488)
scoreDF = sentimentTuple.join(candidates) \
			.map(lambda (x,y): (y[1],y[0])) \
			.reduceByKey(lambda a,b: a+b) \
			.toDF()

The last step would be to do a left join with the candidates dataframe so that only the 2-tuple sentiment scores for the candidate names we are interested in will be returned and we then sum them up by the mapped name before returning the final sentiment score.

Row(_1=u’Hillary Clinton’, _2=2.508604778151232)
Row(_1=u’Donald Trump’, _2=76.03149029470246)
Row(_1=u’Ted Cruz’, _2=162.63942375987867)
Row(_1=u’Bernie Sanders’, _2=-10.372575217548247)

Copy Supporting Files

Ensure that the following are in the same folder as the “SentimentAnalysis.py” file.

  • AFINN folder with AFINN-96.txt, AFINN-111.txt and AFINN-README.txt
  • Candidates folder with Candidate Mapping.txt
  • hive-site.xml

The folder should look like the one below:

twitter-sentiment-spark-22

Running the PySpark Script

Before running the SentimentAnalysis.py script, ensure that you are at the /TwitterAnalysis/ folder.

twitter-sentiment-spark-23

You should eventually see the Spark job complete and you can then navigate back to the Hive Editor and execute the query:

select * from candidate_score to view the results of the Spark job.

twitter-sentiment-spark-24

Finally, you can then use your favorite visualization tool to plot the results.

 

Possible Errors Encountered

During the execution of the Python script you may encounter the following error:

Py4JJavaError: An error occurred while calling o44.sql.
: java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
......
Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
……
Caused by: javax.jdo.JDOFatalInternalException: Error creating transactional connection factory
NestedThrowables:
java.lang.reflect.InvocationTargetException

This could be caused by a missing hive-site.xml file in the /TwitterAnalysis/ folder or when running the script you are not running from /TwitterAnalysis/.

Resolution:

 

Final Words

I hope you have found the 3-part tutorial helpful in understanding Hadoop, Hive, Flume, Spark and PySpark. Here are some of my afterthoughts

 

Observations

  • Cloudera VM is preloaded with older versions of Spark and HIVE
  • Further configuration required to ensure SparkSQL works on HIVE files. Additional libraries and configuration filed required
  • Flume configuration is a non-trival activity and needs to find the correct libraries for Twitter integration (apache vs cloudera)

 

Further Architecture Improvements

  • HIVE on HDFS performance can be improved though partitioning by date – this will allow us to query the sentiment and compare across dates easier.
  • Consider using Impala (however needs to explore the configuration setup required, libraries, config files)
  • Move to Spark Streaming directly ingesting from HIVE to stream processing of the sentiment scoring (currently using batch processing)
  • HIVE supports ACID since 0.13 however, there is performance impact due to transaction management. Hence need to put in place workflow (e.g. Oozie) to process inbound messages in staging and move processed messages to processed table instead of transactional updates.

 

Further Analytic Improvements

  • Use StandfordNLP and Python NLTP to do entity based sentiment analysis. This would provide a more accuract sentiment scoring for each candidate
  • Plot and track sentiment scoring across time period to understand how each candidate is being view from the social media space

Simple Twitter Sentiment Analytics Using Apache Flume and Spark – Part 2

In my last post, on the same topic above, I outlined the steps, possible issues and how to overcome them when setting up Hive tables, Flume and getting to query the data through Hive.

I realized that there wasn’t any explanation on the structure of the Flume configuration file. Understanding how the Flume configuration file is structured enabled me to quickly configure and understand the basics of setting up Flume and I think this would be useful to anyone who wants to start working with Flume.

As per my experience on new tools (at least new to me), the documentation took me some time to understand and digest and I had to trawl through the web for examples for me to learn.

I will give a quick and brief description of the Flume configuration file so that at the least it will make sense for you when you work on this tutorial / exercise, this will be done section by section as follows.

Lets start!

Flume Sources, Channels and Sinks

A typical way to view how data will be ingested is to think in terms of Sources, Channels and Sinks. Sources define well sources of data. Sinks define the target by which the data will be persisted to. Channels refer to the way the data will be transferred between sources and sinks.

The first thing to do is to define the Sources, Channels and Sinks for a Flume connection.

# Naming the components on the current agent.
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

Typically, the above section details the source, channel and sink for Flume to operate on. Note that Flume can handle multiple sources, channels and sinks within a single configuration file. However, for this exercise / tutorial we will keep it simple with only 1 source, 1 channel and 1 sink.

Here, the string “TwitterAgent” refers to the name of the Flume agent that the properties of “sources”, “channels” and “sinks” belong to.

In essence, by specifying “TwitterAgent” as the string before “sources”, we are configuring the sources property of the Flume agent named “TwitterAgent”. This is important as we will refer to this agent name when running Flume.

Here we have specified the sources to be “Twitter”, using “MemChannel” as the channel and “HDFS” as the sink. The reason why I have put them in quotes will be revealed later. You could have also used any other string – e.g. “Twtr”, “M1”, “S1” etc … to refer to the sources, channels and sinks, but I chose this naming convention to ensure readability.

Generally, naming the components in Flume takes on the following convention:

<agent_name>.sources = <source_name>
<agent_name>.channels = <channel_name>
<agent_name>.sinks = <sink_name>

 

Describing the Source

A source will have a list of properties. The property “type” is common to every source, and it is used to specify the type of the source we are using. Examples of source types are: HTTP, Twitter, UDP etc.

# Describing/Configuring the source
TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.consumerKey = <consumer key>
TwitterAgent.sources.Twitter.consumerSecret = <consumer secret>
TwitterAgent.sources.Twitter.accessToken = <access token>
TwitterAgent.sources.Twitter.accessTokenSecret = <access token secret>
TwitterAgent.sources.Twitter.keywords = @realDonaldTrump, @HillaryClinton, @SenSanders, @BernieSanders, @tedcruz, #election2016, #hillaryclinton, #hillary, #hillary2016, #Hillary2016, #donaldtrump, #trump, #dumptrump, #pooptrump, #turdtrump, #sanders, #tedcruz, #feelthebern, #dontfeelthebern, #bernie2016, #trump2016, #whybother2016, #trumptrain, #notrump, #whichhillary, #voteforbernie, #sandersonly, #americafortrump, #berniecrats, #berniestrong, #berniesanders2016, #imwithher, #killary, #stepdownhillary, #stophillary, #vote2016

Some sources such as Twitter may require specific Java libraries such as:

com.cloudera.flume.source.TwitterSource
com.apache.flume.source.TwitterSource

You will need to find out if your desired source is supported and if there are any implementation specific libraries required.

Other than the property “type”, different sources may have other required properties of a particular source. For example, if you use the Twitter source as described above, you will be required to include the  consumerKey, consumerSecret, accessToken, accessTokenSecret properties. If you had chosen HTTP another different set of properties would then be required to be entered.

You will need to find out for each source type, what are the specific required properties you need to provide in the configuration. This will probably be the most challenging thing during the creation of the Flume config script.

In the above example, the keywords to filter the tweets are defined in the property “keywords”. This means that Flume will capture tweets with any of the above listed keywords.

Specifying the source properties in Flume takes on the following convention:

<agent_name>.sources.<source_name>.type = <value>
<agent_name>.sources.<source_name>.<property2> = <value>
<agent_name>.sources.<source_name>.<property3> = <value>

 

Describing the Sink

Similar to the source, each sink (in the case of multiple sinks) will have a separate list of properties. The property “type” is common to every source, and it is used to specify the type of the source we are using. Other than the property “type”, different sinks may have other required properties of a particular sink, as shown below.

# Describing/Configuring the sink
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = /twitteranalytics/incremental
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.filePrefix = twitter-
TwitterAgent.sinks.HDFS.hdfs.rollInterval = 0
TwitterAgent.sinks.HDFS.hdfs.rollSize = 524288
TwitterAgent.sinks.HDFS.hdfs.rollCount = 0
TwitterAgent.sinks.HDFS.hdfs.idleTimeout = 0
TwitterAgent.sinks.HDFS.hdfs.batchSize = 100
TwitterAgent.sinks.HDFS.hdfs.threadsPoolSize = 2
TwitterAgent.sinks.HDFS.hdfs.round = true
TwitterAgent.sinks.HDFS.hdfs.roundUnit = hour

In the example above, the sink type is “hdfs” and the properties belonging to the sink type HDFS are shown above. You can refer to this website for the list of properties and their definitions: http://hadooptutorial.info/flume-data-collection-into-hdfs/

Specifying the sink properties in Flume takes on the following convention:

<agent_name>.sinks.<source_name>.type = <value>
<agent_name>.sinks.<source_name>.<property2> = <value>
<agent_name>.sinks.<source_name>.<property3> = <value>

Note that for HDFS, the property format is hdfs.<property>

 

Describing the Channel

The channel configuration is similar to that of sources and sinks and is as shown below:

# Describing/Configuring the channel
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100

You can refer to this website for the list of properties and their definitions: http://archive.cloudera.com/cdh4/cdh/4/flume-ng/FlumeUserGuide.html#flume-channels

 

Bindings

Finally, to connect the source, channel and sinks together the following needs to be declared in the configuration file. The following configuration specifies that the Twitter source and HDFS sink are both using the same channel “MemChannel”. This effectively binds the source and the sink.

# Binding the source and sink to the channel
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channel = MemChannel

 

Starting Flume Agent

When starting Flume, you will now specify the specific agent and its relevant configuration to be started as follows:

flume-ng agent -f flume_process_twitter.conf Dflume.root.logger=DEBUG,console -n TwitterAgent

As you will notice, the –name, -n <name> command line parameter specifies the specific agent to execute. Since in this case, the agent name in the configuration file is TwitterAgent, hence all the configuration settings beginning with TwitterAgent will be applied.

Note that this allows us to specify multiple different agents within the same single configuration file.

That’s it!

We have covered a brief overview and walkthrough on configuring Flume. I hope this will help you understand the structure of the Flume configuration file and how to set it up for your next big data project!

Here are links to some of the websites and how-tos which I have found helpful in understanding how to setup Flume.

 

In the next post, I will go through in detail how to use Spark to execute the sentiment analysis.

 

Simple Twitter Sentiment Analytics Using Apache Flume and Spark – Part 1

Its been some time since my last post but am excited to be sharing about my learnings and adventures with Big Data and Data Analytics.

Recently I had the opportunity to do some simple Twitter sentiment analytics using a combination of HDFS, Hive, Flume and Spark and wanted to share how it was done.

While many other blogs do cover a great deal on how to do the above, I wanted to also share some of the errors I encountered and how to resolve them, hopefully saving you time from searching the web and trying all kinds of solutions.

You can download the source files in this how-to for your easy reference here. Remember to save them in your local folders on the Cloudera VM.

Ready? Lets start!

Step 1: Getting Cloudera Hadoop CDH5.4.3a ready

We begin by first setting up and installing Cloudera Hadoop CDH5.4.3a. Ensure that you run any preconfigured scripts to ensure that Flume, Spark, Python, HDFS, Hive, Hue, Impala, Zookeeper, Kafka, Slor are setup and configured.

For this exercise, I am using a pre-configured Hadoop stack setup from Cloudera. If you have another distribution, you should still be able to run this how-to. However the issues encountered in this tutorial may differ for different distributions.

The version of HDFS used in this tutorial is Hadoop 2.6.0-cdh5.4.3, however the instructions and steps here should be application for any subsequent versions.

This tutorial assumes that you are familiar with hdfs commands. If not, you can refer to this link here.

Step 2: Ensuring that Hive is working

In the VM environment, ensure that Hive2 server is started. Run the following command to start Hive2 server.

sudo service hive-server2 start

Once the server is successfully started, login to Hue and click on Query Editors > Hive to view the Query Editor page.

twitter sentiment spark-1

Step 3: Create HDFS Folder

In this project, we will access the Twitter API to download the tweets and the downloaded files will be saved onto HDFS and access through Hive tables. First, create the following directory structure in HDFS.

twitter sentiment spark-2

Run the above command instructs HDFS to create a folder “twitteranalytics” in the top level of the HDFS directory. A standard directory structure is used in HDFS that is similar to a typical file system in Unix. However, one of the key differences is that there is no concept of a current directory within HDFS. Hence HDFS files are referred to by their fully qualified name which is a parameter of many of the elements of the interaction between the Client and the other elements of the HDFS architecture. See this site for more details on the HDFS architecture.

If you do an “ls” command in HDFS you should see the directory you have just created as shown below.

twitter sentiment spark-11

Use the File Browser in Hue to view the folder you have just created.

twitter sentiment spark-3

Once done, you can now start to create the table schema from the Hive script – “Create Twitter Schema.hql” (Note: This can be found in the Github repository)

Step 4: Create Hive Tables

Before you can run the Hive script to create the tables, you must ensure that the JSON serdes (Serializer / Deseralizer) library is available otherwise you will get the following error:

“FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Cannot validate serde: com.cloudera.hive.serde.JSONSerDe”

Run the following command to copy the hive-serdes-1.0-SNAPSHOT.jar file to the Hive lib directory.

sudo cp hive-serdes-1.0-SNAPSHOT.jar /usr/lib/hive/lib

Next restart the Hive2 server with the following commands

sudo service hive-server2 stop
sudo service hive-server2 start

Run the Hive script in the command line to create the tables as follows

hive -f Create\ Twitter\ Schema.hql

The result should be as shown below and this confirms that the script has been successfully executed and the tables created.

twitter sentiment spark-4

Go back to Hue > Query Editors > Hive and refresh the database list. You should now see the following tables created against the default database. Note that one of the tables is actually a VIEW.

twitter sentiment spark-5

Congratulations! You have successfully created Hive tables on HDFS. Lets take a look at the tables in detail. In Hue, navigate to Data Browsers > Metastore Tables and click on base_tweets table.

twitter sentiment spark-6

The table structure can be viewed and you would notice that several columns have a struct as their definition. This is how a JSON file will be represented in Hive and that’s the reason why you would need a JSON SerDes library, to interprete and translate the JSON structure into a “query-able” schema.

For more information about JSON, Hive and HDFS, please click on the links below:

https://cwiki.apache.org/confluence/display/Hive/Json+SerD
http://stackoverflow.com/questions/14705858/using-json-serde-in-hive-tables
http://stackoverflow.com/questions/11479247/how-do-you-make-a-hive-table-out-of-json-data

The reason we are using JSON structure is because the Twitter feed is in the form of a JSON file. For more information on the Twitter JSON structure please refer to Twitter developer documentation here – https://dev.twitter.com/streaming/overview

Step 5: Configure Flume

The next step would be to create the Flume configuration file to connect to Twitter (source) and persist the JSON files on HDFS (sink). Conceptually the flow is as illustrated from the Apache Flume website:

devguide_image00

Create a local folder for this project and name it “TwitterSentimentAnalysis”. This folder can be in your home directory. Navigate to the folder and create a Flume configuration file as follows:

vi flume_process_twitter.conf

twitter sentiment spark-7

Copy and paste the following code and save the file.

# Naming the components on the current agent.
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

# Describing/Configuring the source
TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.consumerKey = <enter your consumer key>
TwitterAgent.sources.Twitter.consumerSecret = <enter your consumer secret>
TwitterAgent.sources.Twitter.accessToken = <enter your access token>
TwitterAgent.sources.Twitter.accessTokenSecret = <enter your access token secret>
TwitterAgent.sources.Twitter.keywords = @realDonaldTrump, @HillaryClinton, @SenSanders, @BernieSanders, @tedcruz, #election2016, #hillaryclinton, #hillary, #hillary2016, #Hillary2016, #donaldtrump, #trump, #dumptrump, #pooptrump, #turdtrump, #sanders, #tedcruz, #feelthebern, #dontfeelthebern, #bernie2016, #trump2016, #whybother2016, #trumptrain, #notrump, #whichhillary, #voteforbernie, #sandersonly, #americafortrump, #berniecrats, #berniestrong, #berniesanders2016, #imwithher, #killary, #stepdownhillary, #stophillary, #vote2016

# Describing/Configuring the sink
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = /twitteranalytics/incremental
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text

TwitterAgent.sinks.HDFS.hdfs.filePrefix = twitter-
TwitterAgent.sinks.HDFS.hdfs.rollInterval = 0
TwitterAgent.sinks.HDFS.hdfs.rollSize = 524288
TwitterAgent.sinks.HDFS.hdfs.rollCount = 0
TwitterAgent.sinks.HDFS.hdfs.idleTimeout = 0
TwitterAgent.sinks.HDFS.hdfs.batchSize = 100
TwitterAgent.sinks.HDFS.hdfs.threadsPoolSize = 2
TwitterAgent.sinks.HDFS.hdfs.round = true
TwitterAgent.sinks.HDFS.hdfs.roundUnit = hour

# Describing/Configuring the channel
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100

# Binding the source and sink to the channel
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channel = MemChannel

Please note that you would need to have a Twitter Dev account and create a Twitter App so as to get your consumerKey, consumerSecret, accessToken and accessTokenSecret.

Once the config file has been successfully created, enter the following Flume command to start the Flume agent.

flume-ng agent -f flume_process_twitter.conf Dflume.root.logger=DEBUG,console -n TwitterAgent

When trying to execute the flume agent, here are some possible errors you may encounter and how to resolve them:

  • Unable to load source type: com.cloudera.flume.source.TwitterSource
“ERROR node.PollingPropertiesFileConfigurationProvider: Failed to load configuration data. Exception follows.
org.apache.flume.FlumeException: Unable to load source type: com.cloudera.flume.source.TwitterSource, class: com.cloudera.flume.source.TwitterSource”

Ensure that “flume-sources-1.0-SNAPSHOT.jar” is copied to the following directories:

/var/lib/flume-ng/plugins.d/twitter-streaming/lib
/usr/lib/flume-ng/plugins.d/twitter-streaming/lib

If the specific folders are not created, please create them as per structure above

Refer to the StackOverflow threads here for more information: http://stackoverflow.com/questions/19189979/cannot-run-flume-because-of-jar-conflict

  • java.lang.NoSuchMethodError:twitter4j.FilterQuery.setIncludeEntities(Z)Ltwitter4j/FilterQuery
“ERROR lifecycle.LifecycleSupervisor: Unable to start EventDrivenSourceRunner: { source:com.cloudera.flume.source.TwitterSource{name:Twitter,state:IDLE} } - Exception follows.
java.lang.NoSuchMethodError: twitter4j.FilterQuery.setIncludeEntities(Z)Ltwitter4j/FilterQuery;”

This is probably because of a conflict in the twitter4j-stream libraries. You would need to rename the following jar files: twitter4j-stream-3.0.3.jartwitter4j-core-3.0.3.jar and twitter4j-media-support-3.0.3.jar

sudo mv /usr/lib/flume-ng/lib/twitter4j-stream-3.0.3.jar /usr/lib/flume-ng/lib/twitter4j-stream-3.0.3.jarx
sudo mv /usr/lib/flume-ng/lib/twitter4j-core-3.0.3.jar /usr/lib/flume-ng/lib/twitter4j-core-3.0.3.jarx
sudo mv /usr/lib/flume-ng/lib/twitter4j-media-support-3.0.3.jar /usr/lib/flume-ng/lib/twitter4j-media-support-3.0.3.jarx

Refer to the StackOverflow threads here for more information: http://stackoverflow.com/questions/24322028/cdh-twitter-example-java-error

Step 6: Monitoring Flume Agent and Querying Tweets

Once the Flume agent is successfully started, you would be able to see the console logs as shown below. The console will refresh as the tweets are being received by Flume and persisted in HDFS.

twitter sentiment spark-8

You can verify that Flume is reading from Twitter and creating the JSON files by navigating to Hue > File Browser > /twitteranalytics/incremental as shown below.

twitter sentiment spark-9

To verify that the tweet data can be viewed through Hive, you can navigate in Hue to Query Editors > Hive and on the query editor enter the following SQL

select id, entities.user_mentions.screen_name screen_name, text from incremental_tweets

The above SQL will query the Hive table incremental_tweets for the ID, screen_name field that is part of the user_mentions structure and the tweet text. You should get the following result:

twitter sentiment spark-10

The result is presented just like any SQL result set – with the exception of columns where the “[]” represent the JSON substructure.

That’s it! Well done!

You have successfully used Flume to receive streaming tweets, created Hive tables to store the data on HDFS and used SQL to retrieve the stored information.

Hope you have found this how-to useful! In the next post, we will create a Spark job in Python to determine the sentiment of the tweets.

You can download the Flume configuration and source files for your easy reference here.

rJava load error in RStudio/R after upgrading to OS X El Capitan

It appears that every time there is an upgrade of OS X, rJava will break. I had this experience with Yosemite and now El Capitan as well.

After many hours of figuring out what went wrong, it came down to the “Rootless” security feature in OS X.

Previously, I was able to run rJava on Yosemite following the instructions found on this forum post on Stack Overflow here. When you eventually run the command:

sudo ln -f -s $(/usr/libexec/java_home)/jre/lib/server/libjvm.dylib /usr/lib

the following error will appear:

rJava Issues-1

This is because the “Rootless” security feature in OS X El Capitan will no longer allow us to write to /usr/lib. Instead, you should be running the command

sudo ln -f -s $(/usr/libexec/java_home)/jre/lib/server/libjvm.dylib /usr/local/lib

Note that the path should now be /usr/local/lib. This will create the symbolic link that will allow R/RStudio to load the ibjvm.dylib and enable rJava to run properly.

Hope this will be helpful to those who are struggling with rJava on El Capitan!

Here are some other links about the Rootless security feature in OSX (courtesy of Arstechnica and Apple)

Cheaper & Greener – Transportation Multi-Objective Optimisation

My job as a Supply Chain / Transportation Solution Architect requires me to be working with the best in class transportation optimisation solutions in the market  (OTMSAP TM etc..) to model solutions for logistic companies, 3PLs and shippers. I have often wondered what actually goes behind the scenes under all the proprietary code and maths to determine the optimal solution considering a huge number of constraints.

So I was happy that as part of my Masters programme in Enterprise Data Analytics we learnt how to model real world optimisation problems and solve them using linear programming and multi-objective optimisation. It was a really interesting to finally take a peek under the hood and understand what is actually going on in some of the complex TMS tools used by logistics companies today to optimise their transportation fleets.

While this was a “classroom type” project, it definitely reveals some of the maths that is going on behind the scenes. This certainly does not mean that we can now go and build our own transport management solutions – but it certainly sheds some light to what was previously a black box :). Many of the leading TMSes may not be using the same approach as discussed here – but one common aspect is that the satisfaction of constraints are at the heart of such solution approaches.

So for this blog post, I will be sharing how we approached a “real world like” problem of optimising a fleet of mixed sized trucks for a supplier of automotive parts delivering into manufacturing assembly lines. The actual supplier and assembly line locations were masked but the volumes are scaled based on the actual volume received.

Because the tool we used for this exercise is a trial software, hence there were limitations to the size and complexity of the problem we could model. So to simplify the problem, the following assumptions were taken:

  • There are 4 (01 – 04) assembly lines the supplier needs to deliver the parts to
  • The supplier has a fleet of 3 different trucks types which they used to transport the auto parts to each of the four assembly lines. The parts can be loaded into a 5 Tonne (5T), 8 Tonne (8T) and 10 Tonne (10T) truck type based on the volume of the truck.
  • Each truck has a different rental cost and once a truck is used, regardless of the number of orders on the truck, the cost of the truck is expensed.
  • The cost of the truck would include driver hourly rate and fuel surcharge by distance per delivery trip.
  • The distance and time matrix between the supplier and the plant is given
  • Dimensional modelling was not considered and only pure volume was used

The supplier is looking at optimising the use of their fleet of trucks to meet the contracted demand of the car manufacturer to each assembly line. In addition, the supplier’s customer has a commitment to ensure CO2 is minimised because of the environmental policy of the customer.

truck fleet optimisation-4
This is definitely not good for the environment!

Hence the objectives are to minimise the cost of the truck and trip assignments as well as to minimise the CO2 emission. The key decision variables to this problem is:

  1. The optimal no. of trucks of each type (e.g. 2 x 5T)
  2. The optimal no. of trips each of the trucks need to make (e.g. 2 x 5T trucks with 3 trips each, resulting in 6 deliveries using 5T trucks)

And the result needs to consider the least no. of trucks and the lowest CO2 emission as a result of delivering all the orders.

Solution Modelling

The Table 1 below shows the necessary parameters and decisions variables.truck fleet optimisation-0

The first objective function of the proposed model given in Eq. (1) minimised the transportation cost of the total orders to each of the assembly lines.

truck fleet optimisation-0d

Cost of delivering the good using truck type  to assembly line  is dependent on the distance travelled to assembly line  based on the rate per distance for the truck and the fuel surcharge per distance to assembly line .

The second objective function proposed model minimises carbon dioxide (CO2) emissions. The second objective function is given in Eq. (2).

truck fleet optimisation-0e

Carbon cost of delivering the good using truck type  to assembly line  is dependent on the distance travelled to assembly line  and the carbon emission rate per KM.

truck fleet optimisation-3

Constraints (3) to (6) determines the maximum supply using each truck types. Eq. (7) ensures that the maximum number of trucks used for each type must be within the fleet x trip size. Eq (8) to (11) determines the maximum demand for each assembly line. Finally, Eq (12) to (18) ensures the non-negativity of variables.

Efficient Frontier Curve

When we solved for both objectives as part of the multi-objective optimisation, we plotted a efficient frontier curve as shown:

truck fleet optimisation-0f

The unusual shape of the frontier curve might be explained by the quadratic cost objective functions as well as the quadratic CO2 emission cost objective function.

But as observed from the chart, the transportation costs are relatively insensitive over the CO2 values over a certain value range; suggesting that unless there is a high penalty cost involved for the ensuring that CO2 emission level is maintained at certain standard, the company is likely to place greater emphasis in minimizing transportation cost and side-line the CO2 emission costs, if there is a need to choose / compromise one objective over the other. In other words, unless CO2 emission cost is high enough, there isn’t an incentive for the supplier to change the fleet configuration so as to further minimise the overall cost of transport and CO2 emissions.

Scenario Testing

We ran multiple scenarios through this model to determine the impact of the various key constraints modelled. The following details the scenarios as well as the interpretation of the results of the LP. The scenarios are detailed in the Transport Data and Matrix.xlsx – Scenarios tab. (refer to Github link below for the xls file)

Scenario: Truck Rental Change

With the reduction on the rental cost of 5 Tons trucks, the supplier should generally maximize on the 5 Tons trucks usage i.e. number of trucks by number of trips. The remaining auto-parts demand that cannot be fulfilled by the 5 Tons trucks are then delivered using either 8 Tons or 10 Tons trucks. However, the changed amount in the scenario might not be significant enough to cause a change in the allocation of supply among the different truck types given the two objectives.

Scenario: Fuel Surcharge Rate Change

With the reduction on the Fuel Surcharge Rate of 5 Tons trucks by 1.2% and an increase for both 8 Tons and 10 Tons truck, the supplier should maximize on the 5 Tons trucks usage i.e. number of trucks by number of trips, which offer benefits on the lower cost form both the truck rental and fuel usage for delivery. The remaining auto-parts demand that cannot be fulfilled by the 5 Tons trucks are then delivered using either 8 Tons or 10 Tons trucks.

Scenario: Fleet Size Change

With the reduction on the number of 5 Ton trucks and an increase in the 8 Ton truck, the supplier should still maximise on the 5 Tons truck usage. However, the remaining auto-parts demand should be allocated to the 8 Ton truck first where possible to benefit on the lower truck rental cost.

Scenario: Volume Change

With the increase in the capacity of 5 Tons truck by 3 cbm, the supplier should maximizes on the 5 Tons trucks usage i.e. number of trucks by number of trips, which offer benefits on the lower cost from both the truck rental with having higher delivery capacity. The remaining auto-parts demands that cannot be fulfilled by the 5 Tons trucks are then delivered using either 8 Tons or 10 Tons trucks.

However, no optimal solution that meet both the objectives can be obtained as no supply can be delivered to assembly line 2. In other words, the demand constraint from assembly line 2 is not met.

Scenario: Demand Change

With an increase in demand from assembly line 1 and line 2 and a decrease in demand from assembly line 3 and 4, it is suggested that we arranged for bigger trucks to fulfil the increased load from line 1 and 2 rather than resort to using more 5 Ton truck.

Scenario: CO2 Adjusted Emission Factor

When there is a change in the carbon dioxide (CO2) adjusted emission factor, the bigger impact is on the carbon dioxide emission cost and not so significant on the transportation cost most of the time. It is probably also dependent on the importance of social corporate responsibility that the supplier has and the penalty cost arising from environmental policy. Technology breakthrough would probably also play a part in moderating the impact of this parameter.

Results

From the scenario analysis, it seems to suggest that the optimisation models here can be used to support the decision making in the company’s new strategy or adapting to new environment factors. For example, the supplier must plan to expand their fleet of truck before they accept any new orders or in anticipation of expanding their business. By stimulating decision variable values for both the demand change and the new fleet size for the various truck types, the supplier can use the information in their decision making when conceptualising their expansion strategy.

I really enjoyed tackling this problem and I hope you have also enjoyed reading this very very long post. But hey, who says optimisation was easy anyway 🙂

You can download the files and play around with the various constraints to explore and see what the results show. All I kindly request is that you make a link back to this post (as an acknowledgement) if you use the code in any of your work or assignments.

Thanks for reading and keep optimising!

A Step by Step How To for Extracting Twitter Messages from R

I recently started a small hobby project to analyse accident frequency on Singapore roads. I decided to extract this information from the Singapore Land Transport Authority twitter feed. (although I could have gotten data through the DataMall initiative by the Singapore Government using Python, this would be the subject of another how-to later  )

I thought I would share my experience and steps to do this and hopefully you will find this useful.

So what are we waiting for? Let’s begin!

Step 1: Download the twitteR package

We need to ensure that the latest twitteR package is installed on your R environment. Run the following command in R Studio

install.packages (twitteR)
This will download and install the twitteR and all required packages.

Step 2: Setup a Twitter App

We need to create a Twitter App so that we can access the Twitter platform through this web API. Before you can create a Twitter App, you need to create an account first. You can do so on the Twitter Apps page.

Once you are done, you can start by clicking on the Create New App button.

Twitter Application Management landing page
Twitter Application Management landing page

Proceed to enter the required mandatory fields as shown below.

Enter required mandatory fields
Enter required mandatory fields

The Website address can be a temporary one for now. However, ensure that the Callback URL is left blank for now.

Acknowledge the developer agreement and click on the “Create your Twitter application” button. The following page will appear confirming that you have successfully create the web application.

Successfully created a Twitter web application
Successfully created a Twitter web application

Click on the Keys and Access Token tab to view the Consumer Key and Consumer Secret keys.

View keys and access tokens
View keys and access tokens

At this point, you have not created your Access Token yet. Hence click on “Create my access token” button to do so.

Create access token
Create access token

Your access tokens will be generated and displayed on the refreshed page.

Access token generated
Access token generated

Click on the Application Management icon above and you will see your new application created as shown below.

Twitter application successfully created
Twitter application successfully created

Step 3: Create R code to Access Twitter Feeds

Go back to RStudio and enter the following R code:

#install the necessary packages
library(twitteR)

#necessary file for Windows
#download.file(url="http://curl.haxx.se/ca/cacert.pem", destfile="cacert.pem")

#to get your consumerKey and consumerSecret see the twitteR documentation for instructions
consumer_key <- 'your consumer key'
consumer_secret <- 'your consumer secret key'
access_token access_secret <- 'your access secret’

setup_twitter_oauth(consumer_key, consumer_secret, access_token, access_secret)

Note that I have commented out the download.file command since I am running OS X in this example. I have not tested whether adding this download.file(…) code snippet will work.

Once you have entered the above, run the code and you will see the following prompt on the RStudio console

Running R code for twitter integration
Running R code for twitter integration

You can select 1 or 2 depending on your preference. Regardless of the choice, you should see the “>” on the next line on the console indicating that the setup_twitter_oauth command was successfully executed.

Step 4: Extract your Twitter Feed

Once you have completed the above step, enter the following R code.

ltaTwtr <- searchTwitter("LTATrafficNews + Accident", n=500)
length(ltaTwtr)

#make data frame
tmpDf <- do.call("rbind", lapply(ltaTwtr, as.data.frame))

The command searchTwitter will issue a search of Twitter based on a supplied search string – based on your subscribed twitter feeds. Because the return value of searchTwitter is a list, we would need to do.call(“rbind”…) function to convert it into a data frame for subsequent processing.

Data from twitter feed based on search string
Data from twitter feed based on search string

The above table is an example of the twitter messages that match my search criterion.

That’s it!

You can download my sample code on Github for those who want the code directly.

I hope this short how-to has help with your data science tasks! Happy coding!

What is analytics? A quick dip in the pool.

I came across this great visualisation that introduces basic concepts in analytics and the differences between Big Data and predictive analytics. I found this to be a good summary for the busy senior executive who wants to understand what analytics is all about.

Most certainly there is so much more to analytics than just this summary – but its a good dip in the water! 🙂

http://www.microsoft.com/en-gb/enterprise/it-trends/big-data/articles/getting-started-with-predictive-analytics.aspx?linkId=15086654#fbid=mq4jQRs-uCm

Thanks to Jesse Stanchak, Social Media Community Manager @ Microsoft!

Wearables – Can They Be Reliable?

My wife bought me a Polar RS300x 5 years ago after a close friend of mine died of a sudden heart attack after running. He was young and fit and had a young daughter less than a year old and was just expecting a second one. I used it and its usefulness became apparent as I could see that while even doing a simple 2km run, I was stressing my heart beyond the recommended limit and I wasn’t even feeling it. It was a great tool and helped me understand my body better.

However, I could never use this tool and its results to tell my doctor about how my heart is performing and that its in good shape. It was just a simple heart rate monitor (with GPS and a lot of other cool functionality that captured data I could not submit as evidence to a medical professional)

So when I wrote about clinical agreement and wearables using the example of the Apple Watch a few weeks ago, I mentioned about the importance of the reliability and consistency in capturing and communicating health data to healthcare providers.

It is again of interest that I read an article mentioning that the Apple Watch will capture your heart rate every 10 mins but won’t record it when you’re in motion or your arm is moving. You can read the article on Engadget here and Apple’s site here. You can still measure your heart rate continuously but only when running the Workout app.

Presumably this is to ensure battery performance and it is understandable why this approach was taken. However, it means that:

1) A consistent measurement of your heart rate cannot be done (even with 10 min intervals) since arm movement is almost always happening – while talking, walking, eating and even typing. I think the only time when your arm is not moving is probably when you are sleeping or watching a movie. That makes the heart rate measurements at 10 min intervals (or less) meaningless and possibly even misleading.

2) Tracking of your heart rate will be limited to activity / exercise based monitoring (for example through the Workout app)

Given these understandable constraints, the promise of benefits of healthcare management through wearables are at best at cursory the surface level and at the other end, dangerous and misleading, because when capturing health related information, it requires devices that are specifically built to specifications that healthcare practitioners can rely upon.

Wearables at this stage of development appears to be consumer based fashion devices that try to double up as a healthcare management tool. To truly revolutionise wearables and its applications we need to address:

1) Battery life.

The use of devices that capture healthcare information cannot be limited by battery life. They need to be able to capture, store and transmit information consistently over the daily routine of the patient.

2) Reliability, Accuracy and Consistency

The device must be able to to measure and capture information reliably across different segments of the population and not be limited. An example of this is where Apple Watch does not work with people who have tattoos on their wrists. With data that is accurate, reliable and consistent, this data would then be usable by the medical professionals to then truly transform healthcare management by reducing the cost of information capture.

3) Fashion, Size and Fit

The device must be an integral part of fashion and I think Google’s Project Jacquard is a great step in the right direction. Market forces that drive demand and encourage the supply of wearables will enable alot more people to adopt the technology and thus drive further innovations (better bio-data measurement capabilities) in healthcare wearables.

IoT and the ability to capture information from everyday devices to transform healthcare is a very real and tangible future we can all look forward to – and it is coming.

I am excited about it having seen the benefits from my RS300x 🙂

Would love to hear your thoughts and comments

Clinical Agreement – Potential Issues with Wearables?

I am currently reading about analytics for Pharmaceutical and Healthcare and the most recent topic I am learning about is Clinical Agreement.

Clinical Agreement as I understand it to be is how well measurements captured by multiple raters (a device, machine, person, technical method or trial) agree with each other. By ‘agreement’ I would mean – reliable, repeatable and consistent

And so it is with great interest I came upon the following post from MacRumors about users who reported that heart rate data captured on the Apple Watch was being sent sporadically to the iPhone.

Some Apple Watch Users Experiencing Issues With Inconsistent Heart Rate Data Following Update

While the issue discussed is about the inconsistency in sending data to the iPhone, it inevitably raises the question on how well wearable devices perform in the area of clinical agreement. I do wonder what the impact would be if:

  • Consumers start to provide doctors and healthcare professionals with information from wearables and devices that may not be reliable, consistent or repeatable.
  • Consumer wearable devices when reading biometric information do not record or transmit consistently.

While it is great that consumer electronic companies are now getting into the healthcare space by enabling devices and applications in the form of wearables and making the individual responsible for staying healthy, I do think that when it comes to the area of health, extra care and attention needs to be applied to such devices.

When we start to rely on such devices to track our health, these devices must not only stand up to the current expectations of consumer hardware (e.g. durability, performance, quality etc) but also be tested to ensure clinical reliability and safety.

I am excited about the future of wearables and its health applications – but I also anticipate that eventually consumer electronic companies would also need to work closely with established healthcare device manufacturers to learn and apply the same rigour when introducing such devices.