Live Twitter Data Analysis and Visualization using Python and Plotly Dash

Introduction

Twitter is a platform that embraces tons of information flow in every single second, which should be fully utilized if one wants to explore the real-time interaction between communities and real-life events. I think it would be cool to have an application that is capable of collecting, storing, analyzing, and finally, visualizing Twitter data in real-time so that we could know what is precisely happening by the time people run the application.

Having this exciting idea in mind, I implemented this app and deployed it to the cloud. The app itself can be accessed through this link (Figure 1). The app collects all the tweets related to the gaming community posted and visualizes some statistics. The first panel on the left-hand side is a line plot of the word-count trend, showing the changing pattern of the top-5 most frequently mentioned words. The top panel on the right-hand side is a bar chart showing the top-10 words for better comparison. The figure below is a time-shifting scatter plot of the averaged real-time sentiment score for all the tweets grouped by the top-5 mentioned words.

With the predefined tracking keywords, the user can get the corresponding real-time reaction from the Twitter community (for the gaming community in this example) about a specific topic. It can also be integrated into a recommendation system that requires short reacting time, or a Twitter surveillance system during an event, etc. In this post, I will talk about the framework and tools that I used to build this app.

Figure 1. A demo showing the interface of the live Twitter streaming and visualization app.
Figure 1. A demo showing the interface of the live Twitter streaming and visualization app.

Application Framework

I introduced how to stream tweets with Python using Tweepy package in one of the old posts. But previously, the process of data collection, data migration, storage, and processing are handled in individual steps, or even totally different timelines. For example, if the goal is to collect all the tweets that tweeted in the next two days, we will have to wait for 48 hours for the streaming to finish, then we can start to process the data. No matter how fast we finish the downstream work, there is still at least a two-day gap between the time when we collected the first record and the time we gain insight from the data.

For this live streaming app, however, the traditional workflow is divided into two independent pipelines working together (Figure 2). In detail, data processing starts right after the first line of record is received, followed by data analyzing and results-visualization, etc. While the visualization server (Plotly Dash) is handling data-processing, the streaming server (Tweepy), on the other hand, will bring in the next line of newly generated data. Each pipeline keeps looping at its own pace. Figure 2 shows the underlying framework of all the processes described: 

Figure 2. The underlying framework for the live app.
Figure 2. The underlying framework for the live app.

As shown in Figure 2, to make the application react promptly, we break the traditional data science pipeline into two modules that are taken care of by their corresponding local servers. Specifically, Tweepy is responsible for streaming data, i.e., control the flow of tweets, and we add extra functionalities to the trigger behavior, such as pushing data into a database or deleting old records from the database (optional). Note that the extra functionalities are triggered whenever there is a tweet falls under the predefined tracking conditions thus heard by the server. Therefore, for the Tweepy server,  the intervals between the last operation and the next are random, as shown in Figure 2.

Meanwhile, data processing and visualization are carried out by another platform, Dash by Plotly. Dash controls the rendering and refreshing of the visualization on the browser; hence, a constant time interval for this trigger is preferred. Depends on the data throughput, the time window can be ranging from 0.5 seconds to 5 seconds (2 seconds for this app). In a big picture, two running servers are executing their loops independently, and data is flowing in between them in the predefined time interval to update the application’s graphical interface. 

Data Ingestion

For the data collecting module, we use Tweepy to set up a local server to hear tweets from the Internet. First, create a file with name streaming.py; which sets up the authentication and trigger conditions, the code for streaming.py; is shown below:

# import packages
from tweepy import OAuthHandler
from tweepy import API
from tweepy import Stream
from sqlalchemy import create_engine
from sqlalchemy_utils import database_exists, create_database
from urllib3.exceptions import ProtocolError
from slistener import SListener
from key_secret import consumer_key, consumer_secret
from key_secret import access_token, access_token_secret

In the script above, we first import all the necessary packages, including Tweepy and Sqlalchemy. We also import ProtocalError from urllib3.exceptions so the app can automatically resume the streaming even if a connection error occurred later on. The following slistener and key_secrete object are just two other Python scripts in the same folder. Specifically, the key_secrete.py contains all the credential information such as keys and tokens. And slistener.py contains a child SListener class object inherited from Tweepy’s standard StreamListener class (more on this in a bit).

# consumer key authentication
auth = OAuthHandler(consumer_key, consumer_secret)
# access key authentication
auth.set_access_token(access_token, access_token_secret)
# set up the API with the authentication handler
api = API(auth)
# instantiate the SListener object
listen = SListener(api)
# instantiate the stream object
stream = Stream(auth, listen)
# set up words to hear
keywords_to_hear = ['#Fortnite',
                    '#LeagueOfLegends',
                    '#ApexLegends']

Following importing packages, the next five lines of code are the standard routine to set up a streaming API using the authentication and predefined SListener object. keywords_to_hear is a list of keywords we want to catch, in other words, only if the tweets have one or more of the keywords in the list, would our streaming server capture it.

# create a engine to the database
engine = create_engine("sqlite:///tweets.sqlite")
# if the database does not exist
if not database_exists(engine.url):
    # create a new database
    create_database(engine.url)
# begin collecting data
while True:
    # maintian connection unless interrupted
    try:
        stream.filter(track=keywords_to_hear)
    # reconnect automantically if error arise
    # due to unstable network connection
    except (ProtocolError, AttributeError):
        continue

Next, we use the create_engine function from Sqlalchemy to create a database to store our data if the database does not exist yet. We create the database here since we want a database ready to receive data before the stream is launched. Lastly, stream.filter(track=keywords_to_hear) launches the server and start to stream tweets that contain the keywords we specified in the list. We put this line of code into a while-loop since we do not want the streaming to be terminated if the network connection was randomly interrupted. The streaming will only be terminated if we stop the server manually.  

Once the streaming is launched, tweets will be flowing into our localhost machine. The way how the raw data should be handled is further specified by the SListener object. And the code below is a sample script for how to define the SListener object. Create a new file called slistener.py and put the following blocks of code into it. Begin with importing:

# import packages
from tweepy.streaming import StreamListener
import json
import pandas as pd
from sqlalchemy import create_engine

First of all, we import the base StreamListener class object from tweepy so we can add some extra functionalities to it later on. Next, to process the raw Twitter statuses object, JSON package is needed to parse the statuses object into a JSON object to select the useful fields. Pandas is also necessary since it provides a convenient bridge connecting our Python working environment to the database.

# inherit from StreamListener class
class SListener(StreamListener):
    # initialize the API and a counter for the number of tweets collected
    def __init__(self, api = None, fprefix = 'streamer'):
        self.api = api or API()
        # instantiate a counter
        self.cnt = 0
        # create a engine to the database
        self.engine = create_engine('sqlite:///tweets.sqlite')
    # for each tweet streamed
    def on_status(self, status): 
        # increment the counter
        self.cnt += 1
        # parse the status object into JSON
        status_json = json.dumps(status._json)
        # convert the JSON string into dictionary
        status_data = json.loads(status_json)        

The rest of the script is for customizing our StreamListener class. First, in addition to the parent class provided by tweepy, we also initialize a database engine in the __init__ method, and the cnt variable is simply an indicator for the number of tweets streamed so far. The on_status method is the most essential part of the whole class. Whenever a new tweet is heard, this on_status method would be triggered. The first 3 lines of code parse the incoming raw data into a python-accessible data structure, JSON dictionary. Basically, the raw object is converted into a dictionary containing all the information of that specific tweet, such as the timestamp when it was created, the user, its text, etc. 

        # initialize a list of potential full-text
        full_text_list = [status_data['text']]
        # add full-text field from all sources into the list
        if 'extended_tweet' in status_data:
            full_text_list.append(status_data['extended_tweet']['full_text'])
        if 'retweeted_status' in status_data and 'extended_tweet' in status_data['retweeted_status']:
            full_text_list.append(status_data['retweeted_status']['extended_tweet']['full_text'])
        if 'quoted_status' in status_data and 'extended_tweet' in status_data['quoted_status']:
            full_text_list.append(status_data['quoted_status']['extended_tweet']['full_text'])
        # only retain the longest candidate
        full_text = max(full_text_list, key=len)

The rest of the script is for customizing our StreamListener class. First, in addition to the parent class provided by tweepy, we also initialize a database engine in the __init__ method, and the cnt variable is simply an indicator for the number of tweets streamed so far. The on_status method is the most essential part of the whole class. Whenever a new tweet is heard, this on_status method would be triggered. The first 3 lines of code parse the incoming raw data into a python-accessible data structure, JSON dictionary. Basically, the raw object is converted into a dictionary containing all the information of that specific tweet, such as the timestamp when it was created, the user, its text, etc. 

        # extract time and user info
        tweet = {
            'created_at': status_data['created_at'],
            'text':  full_text,
        }
        # uncomment the following to display tweets in the console
        print("Writing tweet # {} to the database".format(self.cnt))
        print("Tweet Created at: {}".format(tweet['created_at']))
        print("Tweets Content:{}".format(tweet['text']))
        #print("User Profile: {}".format(tweet['user']))
        print()
        # convert into dataframe
        df = pd.DataFrame(tweet, index=[0])
        # convert string of time into date time obejct
        df['created_at'] = pd.to_datetime(df.created_at)
        # push tweet into database
        df.to_sql('tweet', con=self.engine, if_exists='append')

The rest of the two fields (the timestamp and the user’s profile) can be accessed more straightforwardly as we generate the dictionary tweet. We use print functions to help verify if the streaming is working fine. Then, we convert the dictionary into a pandas DataFrame. Since there is just one observation (row) to be converted, we tell pandas that we only have one index for the row by passing in a list with 0 in it as a parameter. Then we turn the timestamp from a string object into a datetime object for time-series-analysis, and then push the data into a database using the engine we initialized in the __init__ method. 

        with self.engine.connect() as con:
            con.execute("""
                        DELETE FROM tweet
                        WHERE created_at IN(
                            SELECT created_at
                                FROM(
                                    SELECT created_at, strftime('%s','now') - strftime('%s',created_at) AS time_passed
                                    FROM tweet
                                    WHERE time_passed >= 60))""")

For our case, since the application is hosted on the cloud 24/7, I certainly do not want all the tweets to stay in the remote machine forever since there is a storage limit on the host machine. So I choose to connect to the database again and remove the records that are older than 60 seconds from my database using SQL query. If you are running this application on your local machine or you do not care about the storage limit, you can remove the query. 

Now, to launch the streaming, simply run python streaming.py in the console, and you should be able to get something like Figure 3 (make sure you have SQLite 3 installed):

Figure 3. Tweets flow in once the stream is launched.
Figure 3. Tweets flow in once the stream is launched.

Data Visualization

The framework we use to visualize the data is Dash by Plotly, which is a Python framework written on top of Flask, Plotly.js, and React.js, for building analytical web applications. It allows us to implement complicated web functionalities with much fewer code compares to Flask or Django. The two most essential libraries for creating an interactive visualization app with Dash is the dash_core_components and dash_html_components, they come in automatically once Dash is installed in the current working environment. dash_core_components provides components that can be added to the interactive webpage, such as a graph, a slider, or a dropdown menu. On the other hand, the general layout of the webpage is specified using dash_html_components, which contains functions for generating all HTML tags so that we can create the web content in pure Python environment with a similar code structure to an HTML file.

To kick things off, create a new file app.py. The code inside app.py can usually be divided into two parts, the first part specifies the layout, and the second part specifies the interactive behavior (i.e., callbacks). Start with importing:

# import basic packages
import os, datetime, re, nltk
import numpy as np
# import data ingestion packages
from collections import deque, Counter
from data_gathering.api import get_tweet_data
# import Dash packages
import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Output, Input, State
# import Plotly Package
import plotly.graph_objs as go
# import NLTK packages
from nltk import word_tokenize
from nltk.corpus import stopwords
from nltk.sentiment.vader import SentimentIntensityAnalyzer

For this app specifically, we need to import some other packages to handle the extra functionalities, such as datetime for accessing current time, nltk for word counting and sentiment analysis, etc. We also need to import plotly.graph_objs, which contains all the graph objects, for instance, scatter plot, bar chart, and pie chart.

# download nltk dependencies
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('vader_lexicon')
# stop words for the word-counts
stops = stopwords.words('english')
stops.append('https')
# global refresh interval for the application, ms
GRAPH_INTERVAL = os.environ.get("GRAPH_INTERVAL", 2000)
# initialize a sentiment analyzer
sid = SentimentIntensityAnalyzer()
# keywords for the tweets
keywords_to_hear = ['#Fortnite',
                    '#LeagueOfLegends',
                    '#ApexLegends']
# X_universal is the x-axis with time stamps
X_universal = deque(maxlen=30)
# initialize dictionaries to store Y axis values for plots
scatter_dict = {}
sentiment_dict = {}

Next up are some pre-configurations. First, we download some NLTK dependencies, which are essential components for tokenization. For example, punkt is a package that contains pre-trained tokenizers that detects sentence boundaries. stopwords is a package with lists of stopwords in different languages. Here we specify that we only need the stopwords in English, and we also append https to the stopwords since it is a high-frequency word that does not contain much information. Then we get the application fresh-rate GRAPH_INTERVAL from the environment variables and set it to 2000 ms (2 seconds) if it is not found. Next, the x- and y-axis of the scatter plot is initialized by allocating deque objects with a maximal length of 30 since we only want to show 30 timestamps on the plot.

# initialize the app and server
app = dash.Dash(__name__, meta_tags=[{"name": "viewport", "content": "width=device-width, initial-scale=1"}])
server = app.server
# add layout to the app
app.layout = html.Div(
    [
        # header
        html.Div(...
        ),
        # graph layout
        html.Div(
            [
                # left hand side, word count line plot
                html.Div(...
                ),
                # right hand side, bar plot and scatter chart
                html.Div(...
                ),
            ],
            className="app__content",
        ),
    ],
    className="app__container",
)

The code above shows how to initialize the application and its server with some default meta settings, followed by the layout object. Note that here I replace the detail layout setting with “…” to give you an overview of the webpage structure. The layout is straight-forward to specify and does not require a lot of coding skills. If you are curious about the details, you can refer to this page to see the raw code for the layout of this application. 

Next, we add some helper functions and callback functions to the application:

def hashtag_counter(series):...
def bag_of_words(series):...
def preprocess_nltk(row):...
# define callback function for number_of_tweets scatter plot
@app.callback(
    Output('number_of_words', 'figure'),
    [Input('query_update', 'n_intervals')])
def update_graph_scatter(n):...
# define callback function for word-counts
@app.callback(
    Output('word_counts', 'figure'),
    [Input('query_update', 'n_intervals')])
def update_graph_bar(interval, slider_value):...
# define callback function for sentiment_score
@app.callback(
    Output('sentiment_score', 'figure'),
    [Input('query_update', 'n_intervals')])
def update_graph_sentiment(interval):...
# define callback function indicating number of tweets
@app.callback(
    Output("num_tweets", "children"),
    [Input("query_update", "n_intervals")])
def show_num_tweets(n_intervals):...
# run the app
if __name__ == '__main__':
    app.run_server(debug=True, host='0.0.0.0')

Again, I hide the details of the functions to give you a general idea of the structure and make it less overwhelming to follow through. Helper functions can always differ for different applications, refer to the link above if you want to see the details. Basically, hashtag_counter takes a pandas series and return the number of rows fall into each keyword category and return a dictionary with all the counts. bag_of_words and preprocess_nltk are text-cleaning tools.

For the callback functions, we use Python decorators to add extra functionalities to the original app.callback function, including different input/output, and different refresh behaviors. Here I use the callback function for the line plot as an example: 

@app.callback(
    Output('number_of_tweets', 'figure'),
    [Input('query_update', 'n_intervals')])
def update_graph_scatter(n):
    # query tweets from the database
    df = get_tweet_data()
    # get the word count
    cnt = bag_of_words(df['text'])
    # get the current time for x-axis
    time = datetime.datetime.now().strftime('%D, %H:%M:%S')
    X_universal.append(time)
    # initialize a list of items to pop
    to_pop = []
    # loop through the dictionary to filter out outdated keywords
    for keyword, cnt_queue in scatter_dict.items():
        # if the count queue for current keyword is not empty
        if cnt_queue:
            # pop all the outdated count values
            while cnt_queue and (cnt_queue[0][1] < X_universal[0]):
                cnt_queue.popleft()
        # if the queue for the keyword is empty
        else:
            # append it to the pop list
            to_pop.append(keyword)
    # pop all the outdated keywords
    for keyword in to_pop:
        scatter_dict.pop(keyword)
    # update the top_N keywords
    top_N = cnt.most_common(num_tags_scatter)
    # loop through the new top_N to update the dictionary
    for keyword, cnt in top_N:
        # if the current keyword is newly appeared
        if keyword not in scatter_dict:
            # initialize a new deque for it and append its count and time
            scatter_dict[keyword] = deque(maxlen=30)
            scatter_dict[keyword].append([cnt, time])
        # if it is not a new one, just append the count and time
        else:
            scatter_dict[keyword].append([cnt, time])
    # update the colors for the new dictionary
    new_colors = chart_colors[:len(scatter_dict)]
    # plot the scatter plot
    data=[go.Scatter(
        x=[time for cnt, time in cnt_queue],
        y=[cnt for cnt, time in cnt_queue],
        name=keyword,
        mode='lines+markers',
        opacity=0.7,
        marker=dict(color=color, size=15)
    ) for color, (keyword, cnt_queue) in list(zip(new_colors, scatter_dict.items()))]
    # specify the layout
    layout = go.Layout(
            xaxis={
                'automargin': False,
                'range': [min(X_universal), max(X_universal)],
                'title': 'Current Time',
                'nticks': 8},
            yaxis={
                'type': 'log',
                'autorange': True,
                'title': 'Number of Tweets'},
            height=700,
            plot_bgcolor=app_color["graph_bg"],
            paper_bgcolor=app_color["graph_bg"],
            font={"color": app_color["graph_font"]},
            autosize=False,
            legend={
                'orientation': 'h',
                'xanchor': 'center',
                'yanchor': 'top',
                'x': 0.5,
                'y': 1.025},
            margin=go.layout.Margin(
                l=75, r=25, b=45, t=25, pad=4))
    return go.Figure(
        data=data,
        layout=layout)

While the majority of the code above explains itself by those comments, there are a few extra notes:

  1. The input/output of the callback process is denoted in the decorator, specifically, the Output object tells Dash that this callback is assigned to a figure called number_of_words, and the Input object tells Dash that the callback should be triggered by a module called query_udpate.
  2. get_tweet_data() is another helper function that sends a SQL query to the database to fetch all the records from the table we pushed all the tweets to (previously mentioned in SListener.on_status()). Since the database is deleting old records while receiving new records, it is okay for me to query all the current records. If that is not what you want, you can specify a time-window to query instead.
  3. X and Y coordinates of the scattering plot are updated by passing in the newly calculated x and y values into the go.Scatter() function which creates a scatter plot. The returned object is then embedded into a list called data, which specifies the returning data of this callback function. Similarly, the layout is set by the go.Layout() function. 
  4. Last but not least, the returned object of the callback function is a go.Figure object that contains the newly updated data and layout

The other callback functions can be written in a similar format. 

Conlusion

The last line of code in the app.py sets the debug mode to True so that, after the server is launched, changes will be automatically applied without restarting the server. If the application is ready to deploy, you might want to set debug mode to False. Set the host IP address to 0.0.0.0 to allow access from outside the localhost.

Now if you open a terminal and execute python app.py, the server should be up and running on your local workstation at 0.0.0.0:8050 (default port for Dash is 8050).

For dockerizing and deploying the software to the cloud, please refer to my previous post about Docker Containers and Amazon Web Services. You can also access all the code for this application here (including Docker files). And the application is available here.

Please feel free to reach out to me if you have any questions.

Thanks for reading 🙂