Spark Streaming is used to filter live tweets coming in, only accepting those that are classified as the specified cluster (type) of tweets. It takes the following arguments:
This program is very simple - this is the bulk of the code below. First, load up a Spark Streaming Context. Second, create a Twitter DStream and map it to grab the text. Third, load up the K-Means model that was trained in step 2. Finally, apply the model on the tweets, filtering out only those that match the specified cluster, and print the matching tweets.
println("Initializing Streaming Spark Context...")
val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
val ssc = new StreamingContext(conf, Seconds(5))
println("Initializing Twitter stream...")
val tweets = TwitterUtils.createStream(ssc, Utils.getAuth)
val statuses = tweets.map(_.getText)
println("Initializing the KMeans model...")
val model = new KMeansModel(ssc.sparkContext.objectFile[Vector](
modelFile.toString).collect())
val filteredTweets = statuses
.filter(t => model.predict(Utils.featurize(t)) == clusterNumber)
filteredTweets.print()
Now, run Predict.scala:
% ${YOUR_SPARK_HOME}/bin/spark-submit \
--class "com.databricks.apps.twitter_classifier.Predict" \
--master ${YOUR_SPARK_MASTER:-local[4]} \
target/scala-2.10/spark-twitter-lang-classifier-assembly-1.0.jar \
${YOUR_MODEL_DIR:-/tmp/tweets/model} \
${CLUSTER_TO_FILTER:-7} \
--consumerKey ${YOUR_TWITTER_CONSUMER_KEY} \
--consumerSecret ${YOUR_TWITTER_CONSUMER_SECRET} \
--accessToken ${YOUR_TWITTER_ACCESS_TOKEN} \
--accessTokenSecret ${YOUR_TWITTER_ACCESS_SECRET}