Twitter, Fire Hoses and Congress ... Oh My

Streaming Twitter Data to Kinesis Firehose
In this post we'll dive into data mining social media with Kinesis Firehose. Specifically, we'll obtain Twitter API keys and write a Python script to find tweets related to congressional activity by filtering with the word "congress".

Making a Twitter Application
The first thing we'll need is a Twitter account. After you login make sure you add your phone number to your account via the settings console here https://twitter.com/settings/. Be sure to use your real phone number because Twitter will text you to verify ownership. You can text back to stop all future text messages. If giving out your number to Twitter makes you nervous I can assure I have gotten no spam since attempting this experiment. The phone number turns out to be a necessary step because you can not register an app to your account without one. Now make your app at
https://apps.twitter.com/. Click the Create New App button and fill out the corresponding form. Only the name and description fields are required. Any dummy url can be used. Once this is done you'll have the required API keys to to start sending Twitter data to Kinesis Firehose.

Digging into Kinesis
Within AWS, Kinesis is a broad topic. We are going to specifically use Kinesis Firehose and stream tweets straight into an S3 bucket. Before you can start be sure to give yourself the AmazonKinesisFirehoseFullAccess role. Notice that this is different than the KinesisFullAccess role. Then from the KinesisFirehose console choose to create a new delivery stream and be sure to name it "twitter-stream". This is important if you are going to run the code I supply you here. Be sure to choose to stream your data in S3 rather than Redshift. While there are configurations in here for transforms, logging, and encryption you can leave them all disabled and create a new bucket for your destination. The only optimization I would recommend are the buffer settings for S3. I have mine set like this.

Towards the end of setup process you'll need to choose an IAM role to run the stream. The path of least resistance here is to let Amazon create a new role for you. This will quickly wrap up the Kinesis setup for you. To finish up, head over to S3 and see your new empty Kinesis bucket.

Python Streaming Code
We will be using Python for our application code and you can clone the repository with the command https://github.com/jdav999/twitter_stream.git. You can do this from an Ec2 instance or your local machine. However, you'll need to have pip installed and then pip install tweepy, pip install ConfigParser, and pip install boto3. Next you'll need to add your Twitter API keys to the config file. I've left a skeleton for all the api keys here

Once your are fully configured and authorized you are ready to kick off the stream. However, before you do it's worth browsing the code and noticing how it authenticates and then filters for the phrase "congress". This will give us congressionally relevant tweets. Check it out here

import boto3  
import random  
import time  
import json  
from tweepy.streaming import StreamListener  
from tweepy import OAuthHandler  
from tweepy import Stream  
from ConfigParser import SafeConfigParser

parser = SafeConfigParser()  
parser.read('api_auth.cfg')

access_token = parser.get('api_tracker', 'access_token')  
access_token_secret = parser.get('api_tracker', 'access_token_secret')  
consumer_key = parser.get('api_tracker', 'consumer_key')  
consumer_secret = parser.get('api_tracker', 'consumer_secret')

DeliveryStreamName = 'twitter-stream'

client = boto3.client('firehose')

class StdOutListener(StreamListener):

    def on_data(self, data):
        print data
        response = client.put_record(
            DeliveryStreamName=DeliveryStreamName,
                Record={
                    'Data': json.dumps(data)
        }
    )
        return True

    def on_error(self, status):
        print status


if __name__ == '__main__':


    l = StdOutListener()
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    stream = Stream(auth, l)
    stream.filter(track=['congress'])

Turning the Hose On
If you have everything set correctly then python twitter_firehose.py will start the stream and you'll see raw tweets fly by like this. More importantly, you'll see your S3 bucket start to populate like this

You can even open one of the objects from the buckets and inspect the raw tweet

I've highlighted some of the tweets actual payload to demonstrate how our code actually filtered for "congress". At this point quite a lot of data science could be thrown at this data and we can hunt for clues of future congressional plans. Relevant data in here includes mentions of congress, the disseminators name, the names of those who retweet congressional related information, and their followers. Incidentally there are some really cool big data tools in AWS to help sift through this kind of data, but that will have to wait for another time.

The last thing that is interesting to look at here is the stream in the Kinesis Console. In particular you can look at the monitoring graphs of your stream and see how it's performing. Here is how mine looks right now

Conclusions
I hope I introduced you to some new technology and showed you how to integrate with the Twitter Platform. Get your hands dirty and have some fun.