Visualizing the Twitter Tweet Stream (Observability)


Build it Yourself and Analyze Your Heart Out.


This project assumes no knowledge of how to do any of this beyond navigate a Linux command line. This is Part 1 of a larger project to bring data observability and monitoring into an areana that more people can understand. It is easy to get lost in the tech speak of data stores, data lakes, observability, machine learning, artificial intelligence, and so on so I am trying to create a middle road, written in a more relaxed way. As I worked through the ideas, and the data, and the code, I took over 15 pages worth of notes. This first project is the culmination of that work.

Observability is a word that is thrown around a lot these days when it comes to monitoring, securing, and optimizing pretty much anything that produces data. Broken down to its simplest forms, this word just means the ability to observe. It does not matter what we are observing. It is just the ability to do so.

After working for more than twenty years in the Information Technology industry, I have gained a lot of insight across all aspects of IT infrastructure. During this IT tenure, I designed a monitoring and provisioning system for cable modems based completely on Perl and MySQL. I built monitoring systems to monitor the monitoring systems using Perl for one use case and C# for another use case. I have worked with small regional ISP's with dozens of customers up to supporting observability platforms that took near a thousand processing nodes to handle the amounts of data that were being ingested and queried. I have also handled one node data stores and everything in between. There is not much I do not know about enriching, ingesting, querying, and using data of any type. This post is the beginning of my sharing of this knowledge.

The main hope is that after reading this post the reader will have a better understanding of what Observability is and how to build the simple platform that is Part 1 of this project. All of the code that will be used in this project is available at the GitHub repo Twitter Feed Repo. If you follow along through the whole process, we will eventually download this repo directly from the zip link here Twitter Feed Source. The first step is to set up the machine that we will use to build this project.

There are other posts on the burner that mesh directly with this one. These range from adding unique websocket control that allows each user to manipulate the system directly on to adapting the polling server and websocket code to poll any data that you have access to. Those are much more advanced topics, but we have to start somewhere, and this is where we start, basic data with basic visualizations.

When we wrap up this part of the project, anyone who follows along should be fairly versed in setting up a system like this and understand what each part is doing. This includes being able to setup a system to produce at least the D3 visualizations shown below.

Fig.1 Working Twitter Visualizations

I hope you enjoy this post and find it as interesting as I did while building and testing all of this. The ability to take a bunch of JSON formatted text and turn it into what you see above is an exhilarating project if you love data. First, we must build the system this will all run on.

But even before that you need to have a Twitter developer key. Please head to this link and sign up for developer key then create a project. If you already have a Twitter developer account then you can use an existing key or create a new project.

Setting Up The Project Platform

When I started this project, I hosted everything, including a 4 node Elasticsearch cluster on a VMWare vSpere based home lab. As I approached the end of coding and testing, then finally pushing everything to GitHub, I started to think about those that may not have access to a cluster but still want to work with these projects. So we will go the route of installing a single node elasticsearch "cluster" on the same host as the rest of the project. This way, it is all compacted onto one host. These instructions are based on Ubuntu 20.04 server, so we will reference the aptitude package manager a lot. However, this should work on any Linux distribution as long as the needed packages are installed. So let's get started, assuming that you have a brand new Ubuntu 20.04 system sitting at the prompt, ready to use.

Basic System Preparation

For this project I have built a VM with 4 processor cores 4GB of Ram and 20GB of disk space. If you wish to add more resources to your VM feel free to do so, this would let you work with a more active Twitter Stream, but as long as we stick with the stream options that will be presented the specifications above should work just fine.

Setting The Ip Address

If you already know the IP address of the machine you will be using for this project or have some other IP addressing scheme you can skip this part. This is only for those who are new to Ubuntu and want or need to set a static IP address.

In my setup, I will let the project machine find its own IP address using DHCP and then statically assign once I can log into the machine. If you do not have DHCP available, you will have to statically set the IP address by setting it in the file /etc/netplan/00-installer-config.yaml while still at the install console. There are many other ways to set a static IP address on a Ubuntu install, even more if there is a desktop such as Gnome installed. Since one goal of this project is to understand how all of this works, we will keep things as raw and close to the root system as possible we will just edit the configuration file directly. Of course, if you are using a different Linux distribution setting a static IP address can be completely different. In the end, we just need to make sure that the machine has an IP address that will not change through the course of this project. Follow the instructions below if you are using Ubuntu 20.04 and want to manually set the static IP address.

Get the current IP address of the server by running the command ip route. This command will provide an output similar to what is below.

default via 192.168.2.1 dev ens160 proto dhcp src 192.168.2.248 metric 100
192.168.2.0/24 dev ens160 proto kernel scope link src 192.168.2.248
192.168.2.1 dev ens160 proto dhcp scope link src 192.168.2.248 metric 100
        

The IP address assigned to this server will be the src address on the default route, the first line. In this line, you can also see that the address was assigned through DHCP. So, in this case, 192.168.2.248 is the source IP address of his server. This line also shows us that the ethernet device is named ens160, and the default route is 192.168.2.1. We will need all of this to update the network configuration file at /etc/netplan/00-installer-config.yaml. If the address was assigned through DHCP, the contents of this file will look like what is below.

# This is the network config written by 'subiquity'
network:
  ethernets:
    ens160:
      dhcp4: true
  version: 2

        

This verifies that the interface ens160 is setup to request an address from dhcp4, which we verified in the output of the ip route command. Now lets plug in the information that we collected from that command and make a static configuration as shown below.

# This is the network config written by 'subiquity'
network:
  ethernets:
    ens160:
      addresses:
        - 192.168.2.248/24
      gateway4: 192.168.2.1
      nameservers:
          addresses: [192.168.2.149, 8.8.8.8]
        

For your configuration, change everything to match your network. Everything except the nameservers would have come from the ip route command. These name servers point to my internal caching DNS server and one google server as a backup. Set your information accordingly and then run sudo netplan apply. If all goes well, running ip route now should provide the output below.

192.168.2.0/24 dev ens160 proto kernel scope link src 192.168.2.248
        

UPDATING THE OPERATING SYSTEM AND LIBRARIES

Since we are starting with a newly installed Ubuntu 20.04 machine we first need to do the standard updates that should always be done on a new system, the kernel and library updates. From the command prompt run the commands shown below.

sudo apt update
sudo apt upgrade
sudp apt install unzip
        

Follow the instructions and let the packages update. These updates are for security, stability, and sometimes add new features. It is good to do this for the security angle if nothing else. For people that have been working in IT as long as I have these updates just become second nature. It should become second nature to everyone all the time. The final line installs the unzip decompressor which we will use to unzip the project package downloaded from GitHub later.

Once these updates are one we move on to installing the core components of this project, Elasticsearch, NodeJS, and NGINX.

Installing Elasticsearch


**** Warning ****

These Elasticsearch Installiation instructions are for versions before 8 ONLY. None of this will work with versions after 8.

Everything after these install instructions is basically the same so if you are going to test this on Elasticsearch 8 or after please refer to the Elasticsearch document Install Elasticsearch with Debian Package.


After much thought about this project, I decided I wanted anyone to be able to build this project with no external server dependencies. Though my work on this project was based around a medium power 4 node Elasticsearch cluster, this project can be tested on a single node development cluster running on the same server. If you already have access to a cluster or at some point want to use an eternal cluster, it only takes one configuration change to point to a different cluster. We will get into that when we start going over the dotenv settings used by the JavaScript later in this post.

Elasticsearch always has a robust installation guide that explains how to install Elasticsearch. You can always find that document here Elasticsearch Debian Install Guide. You should check this document when installing a new Elasticsearch server. This project is based on Elasticsearch version 7.17.0 so I will quickly run through the install instructions for this version below.

wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
sudo apt-get install apt-transport-https
echo "deb https://artifacts.elastic.co/packages/7.x/apt stable main" | sudo tee /etc/apt/sources.list.d/elastic-7.x.list
sudo apt-get update
sudo apt-get install elasticsearch
        

We can't just start Elasticsearch after the installation is complete. There are a few configuration options that we need to set first. As I mentioned above, if you already have a cluster to use or would rather install a full cluster, you are free to do that. We will just change the elasticsearch host when we get into the configuration later.

If you would like to install a complete cluster, I put together a video tutorial that steps through every part of installing and securing a cluster. The Elasticsearch version in the video is a little older, but the install instructions are still the same. This starts with building the required VM's using VMWare vSphere all the way through to a completely functioning cluster with SSL transport. You can check it out here if needed.

1hour 46 minutes full cluster install from scratch

Now that Elasticsearch is installed it is time for a bit of configuration. Again, this is only needed if you are installing a new single-node cluster for this project. If you already have a cluster do not do any of this.

Configuring Elasticsearch

Since later versions of Elasticsearch automatically configure the Java HEAP memory settings, now there is only one configuration file that we have to work with for this project. /etc/elasticsearch/elasticsearch.yml. The node may start without any changes, but it is good to know what you are working with, so we will make a few configuration changes for this project.

cluster.name: observability-project
node.name: node-1
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
#bootstrap.memory_lock: true
network.host: localhost
#http.port: 9200
#discovery.seed_hosts: ["host1", "host2"]
#cluster.initial_master_nodes: ["node-1", "node-2"]
#action.destructive_requires_name: true
        

By default Elasticsearch will now use 50% of the system ram with no changes need to the jvm.options file. If you do need to make adjustments to HEAP ram usage this can still be done by using a custom configuration at /etc/elasticsearch/jvm.options.d. If you do feel that you need to make changes here please see this document Setting Advanced JVM options available at the Elastic website.


**** End Warning ****

These Elasticsearch Installiation instructions are for versions before 8 ONLY. None of this will work with versions after 8.

Everything after these install instructions is basically the same so if you are going to test this on Elasticsearch 8 or after please refer to the Elasticsearch document Install Elasticsearch with Debian Package.



Now lets start up elasticsearch with sudo service elasticsearch start. This may take a few seconds since it is a new install and needs to build its file trees. Once the prompt returns run the command curl localhost:9200, if everything is working you should see an output similar to the one below.

{
  "name" : "node-1",
  "cluster_name" : "observability-project",
  "cluster_uuid" : "UhRnZ5a6R1mTNT2hG-x3ZQ",
  "version" : {
    "number" : "7.17.0",
    "build_flavor" : "default",
    "build_type" : "deb",
    "build_hash" : "bee86328705acaa9a6daede7140defd4d9ec56bd",
    "build_date" : "2022-01-28T08:36:04.875279988Z",
    "build_snapshot" : false,
    "lucene_version" : "8.11.1",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}
        

This lets us know that the Elasticsearch single node cluster is answering. Now, lets run one more curl command curl localhost:9200/_cat/health. This should print an output that looks just like what is below.

1644203705 03:15:05 observability-project green 1 1 3 3 0 0 0 0 - 100.0%
        

This consists of a timestamp, the cluster name, the status of the cluster, some index and shard information and then says that 100% of the shards are available. Since this is a single node cluster this is sort of misleading because of the way index sharding works but it is fine for this project.

By this point the operating system and libraries are updated and we have installed Elasticsearch, now we move on to the other two core components of this project Node, and NGINX.

Installing Node

The Javascript Server

Now we move on to installing Node.js. Node.js is a JavaScript-runtime environment that allows JavaScript to be executed outside of the web browser. Back in the olden days of web browsing, only browsers had Javascript processors. In 2009 Ryan Dahl started the Node.js project out of frustration with the way the popular web servers of the time handled concurrent connections and how they executed code in a blocking manner.

In many ways, JavaScript is more suited to server-side web programming than many other languages. Though the processing core runs in a single CPU thread, JavaScript will run multiple process threads on that single CPU thread. This allows it to easily serve many requests at once while internally handling all of the forking and thread reclaiming. This is especially handy when working with WebSockets, one of the web technologies that this homebrew observability project is based on.

The Node environment that I use consists of Node.js, the node package manager otherwise known as npm, and PM2 or Process Manager 2, which is used to execute, monitor, and repair running processes if possible.

The aptitude stock version of Node.js on Ubuntu 20.04 is always a few revisions behind. Since this project has been tested with node v17.4.0 this is what we will install. I always follow the vendor documentation for installing Node.js. That document is available here at the NoseSource README. Below, I have included the short version that installs the latest version of v17.x. This flow installs nodejs first which also installs npm, then we used npm to install pm2.

# Using Ubuntu
curl -fsSL https://deb.nodesource.com/setup_17.x | sudo -E bash -
sudo apt-get install -y nodejs
sudo npm install pm2 -g
        

The nodejs installer installs node and npm but the PM2 process manager has to be installed separately using npm. Going this route makes sure that you have the latest version of each utility. Once these installs are complete verify everything is installed by running the commands below and verifying a similar output

bvest@observability-project:~$ node -v
v17.4.0
bvest@observability-project:~$ npm -v
8.3.1
bvest@observability-project:~$ pm2 -v

                        -------------

__/\\\\\\\\\\\\\____/\\\\____________/\\\\____/\\\\\\\\\_____
 _\/\\\/////////\\\_\/\\\\\\________/\\\\\\__/\\\///////\\\___
  _\/\\\_______\/\\\_\/\\\//\\\____/\\\//\\\_\///______\//\\\__
   _\/\\\\\\\\\\\\\/__\/\\\\///\\\/\\\/_\/\\\___________/\\\/___
    _\/\\\/////////____\/\\\__\///\\\/___\/\\\________/\\\//_____
     _\/\\\_____________\/\\\____\///_____\/\\\_____/\\\//________
      _\/\\\_____________\/\\\_____________\/\\\___/\\\/___________
       _\/\\\_____________\/\\\_____________\/\\\__/\\\\\\\\\\\\\\\_
        _\///______________\///______________\///__\///////////////__


                          Runtime Edition

        PM2 is a Production Process Manager for Node.js applications
                     with a built-in Load Balancer.

                Start and Daemonize any application:
                $ pm2 start app.js

                Load Balance 4 instances of api.js:
                $ pm2 start api.js -i 4

                Monitor in production:
                $ pm2 monitor

                Make pm2 auto-boot at server restart:
                $ pm2 startup

                To go further checkout:
                http://pm2.io/


                        -------------

[PM2] Spawning PM2 daemon with pm2_home=/home/bvest/.pm2
[PM2] PM2 Successfully daemonized
5.1.2
        

If everything looks like the flow above then this part is complete. We can now move onto the final core component, NGINX.

Installing NGINX

Why go through the trouble of installing NGINX if we could use node to serve the pages using HTTP and express? Because it's easy. Yes, we could serve the pages using node programming with the express and HTTP modules. But why do this if we don't have to? NGINX is a proven web server and proxy engine. It requires very few resources and can be easily installed with most Linux package managers. It already handles HTTP and HTTPS failure and redirection, and last but not least, it is a fast and lean compiled program. So with my banter out of the way. See below to install NGINX.

sudo apt install nginx
        

That was simple. We will do some configuration on this later after we download the github repo.

SYSTEM PREPARATION WRAP-UP

At this point we have completed all of the core setup for the things that are needed to complete this project. Now we get to move on to the parts that are much more exciting The Software.

The Software

Disclaimer
I am not a professional programmer in any language. I write what I need for whatever project I am working on in whatever language fits. This is what I call idea code. Use it to form ideas and better understand how the code works. So many things are based on frameworks these days I think it is a good idea to look at the vanilla JavaScript that does the work behind the framework. I do not suggest copying any of this code directly into production.
Disclaimer

Installing The Software

So now we move on to the main part and also the more fun part of this project. Installing the system and explaining how it works. We will step through the software in order of flow, starting with getting the stream data from the Twitter API. As we step through each script, it will be chunked by line number and explained in detail. I believe that more than just installing and running this package, it is always great to understand the data and what the scripts do with the data.

Now is when you should download the github package that holds all of this code and some extras that will be needed to configure the system. Follow along as we download, decompress, and setup this package. As should always be done this should be downloaded to a temporary folder before being moved to the active folders. We could use the global /tmp/ folder but for this project we will create one in our home folder. Follow the command flow below to make this folder, download the package and uncompress it.

mkdir ~/tmp/
cd ~/tmp/
wget https://github.com/b-vest/twitterfeed/archive/refs/heads/main.zip
unzip main.zip
        

Once all of these commands are complete there will be a folder in ~/tmp/ named twitterfeed-main. This is where the whole project lives. As this is a test project that could be ran on any Linux server and is only meant to be a temporary project we will just leave the whole thing in this folder. The only change outside of the code its self is the nginx configuration. This configuration is available in the twitterfeed-main/extras folder but I am also including it here for reference. See below.

server {
    listen 80 default_server;
    listen [::]:80 default_server;
    root /home/yourusername/tmp/twitterfeed-main/html;
    index index.html index.htm index.nginx-debian.html;

    server_name _;

    location / {
        try_files $uri $uri/ =404;
    }
}
        

The only change you should need to make here is to change yourusername to the name of the user that you are running this project as. More or less, the owner of the home folder we have been working with. Once this is complete, we need to put it in place. If you have been following along and installed Nginx fresh, then after making the username change, just copy this configuration to /etc/nginx/sites-available/default. This will overwrite the default configuration, so only do this if this NGINX server is only being used for this project. If there are other configurations and you have experience with Nginx, then add this where needed.

You will notice that this configuration does not mention SSL. I left SSL out of the lab version of this as it will be much easier for many to work on local IP addressing schemes without worrying about self-signed certificates. In a future chapter we push this onto a public facing server with both the site and the websocket being SSL using certbot.

Once you have replaced the NGINX default config with the one from above, it is good to test the configuration before starting NGINX. Do this by running the command sudo nginx -t as shown below.

>:~/tmp$ sudo nginx -t
nginx: the configuration file /etc/nginx/nginx.conf syntax is ok
nginx: configuration file /etc/nginx/nginx.conf test is successful
        

If the output looks exactly like this then everything is good to go and we can restart NGINX with this command sudo service nginx restart. At this point open a web browser and navigate to the IP address of this host. You should see the screen below.

Blank Project Web Page

This is the simple interface. Of course it is blank because there is no data to query and no socket to send it. But it is there, just waiting for the data that we will ingest, query, and send it. This process starts with gathering the data. To get started with this part move to the folder ~/tmp/twitterfeed-main/twitter-stream where we will go through the code and then setup the configuration.

Decoding The Software

This code on the site and at the GitHub repository is heavily commented. There really is no better way to explain code. It can get confusing if you try to chunk it down into smaller pieces. Since this is follow along project code this seems to be the best way to me.

It should be part of the knowledge set for anyone that works with data on any level to know exactly what that data says and exactly what is happening with it when it is converted from raw data to enriched ingested data and then on to queried data and finally sent to the user.

twitter-stream.js

This is the script that will connect to the Twitter API stream and ingest that data into Elasticsearch. For this project I have heavily documented this code. Any explainations that are needed are included below the code.

          
//twitter-stream.js Read twitter Stream and ingest it into Elasticsearch
//Load Modules
const Twitter = require('twitter');
const elastic = require('elasticsearch');
require('dotenv').config()

//Build Elasticsearch Client using dotenv variables
var elasticClient = new elastic.Client({
  host: process.env.ELASTICSEARCH_USER+":"+process.env.ELASTICSEARCH_PASSWORD+"@"+process.env.ELASTICSEARCH_HOST,
  log: 'info',
    sniff: true,
    apiVersion: '7.x',
});

//Build Twitter Client using dotenv Variables
var twitterClient = new Twitter({
  consumer_key: process.env.TWITTER_CONSUMER_KEY,
  consumer_secret: process.env.TWITTER_CONSUMER_SECRET,
  access_token_key: process.env.TWITTER_TOKEN_KEY,
  access_token_secret: process.env.TWITTER_TOKEN_SECRET
});

//Array to hold what will be sent to Elasticsearch
var sendArray = [];
//Count the tweets to compare to ELASTICSEARCH_BATCH_SIZE
var tweetCounter = 0;

twitterClient.stream('statuses/filter',{lang: process.env.TWITTER_LANG, track: process.env.TWITTER_TRACK, follow: process.env.TWITTER_FOLLOW},  function(stream) {
  stream.on('data', function(tweet) {
    //Add Tweet to Array
    sendArray.push(tweet);
    if(tweetCounter >= process.env.ELASTICSEARCH_BATCH_SIZE){
      //Send Tweet Array
      bulkIndexTweets(sendArray);
      //Tweets have been ingested, reset array and counter
      sendArray = [];
      tweetCounter = 0;
    }
    ++tweetCounter;
    //Set nowDate for checking checkLogFrequency() to exit the process if the stream stalls
    nowDate = Date.now();

  });
  //Handle twitter stream errors
 stream.on('error', function(error) {
    console.log(error);
  });
});

//set now Date for Stream health checking
var nowDate = Date.now();
//start interval time to check stream every 20 seconds
var myVar = setInterval(checkLogFrequency, 20000);

async function bulkIndexTweets(sendArray){
  try{
      //create flatmap out of array, this adds the _index: field to every element of the array created above.
      const body = sendArray.flatMap(doc => [{ index: { _index: process.env.ELASTICSEARCH_INDEX } }, doc])
      //Send the body flatmap to Elasticsearch and wait for response, this is why this is an async function
      const { body: bulkResponse } = await elasticClient.bulk({ refresh: true, body })
      //if Rsponse then it mayh be an error. Check for that here.
      if (bulkResponse) {
        const erroredDocuments = []
        bulkResponse.items.forEach((action, i) => {
            const operation = Object.keys(action)[0]
            if (action[operation].error) {
              erroredDocuments.push({
                  status: action[operation].status,
                  error: action[operation].error,
                  operation: body[i * 2],
                  document: body[i * 2 + 1]
              })
            }
        })
      }
    return sendArray;
  }catch(error){
    console.log(error);
  }
}

function checkLogFrequency(){
  var checkDate = Date.now();
  //subtract above date from date set at line 41.
  var diffrence = checkDate - nowDate;
  //if difference is greater than TWITTER_SOCKET_TIMEOUT exit the process
  //we depend on pm2 to restart it if this happens.
  if(diffrence >= process.env.TWITTER_SOCKET_TIMEOUT){
    console.log("Exiting. Time difference is > 120000ms")
    process.exit(1)

  }
return
}

        

This code is heavily commented to the point that I think it explains its self. There is really no data parsing or enrichment going on here. This is just a pipline to get Twiter data ingested into Elasticsearch. We will do all of the parsing and enrichment when we query the data later. But it would be a good idea to take a look at the data that we are ingesting. Data from tweets is quite large and robust. This allows us to do a lot of linkage between users and topics. The data from one takes up a whole page, especailly if it is a retweet.


Full Tweet Record with Retweet (Long)

{
  created_at: 'Wed Feb 09 23:47:27 +0000 2022',
  id: 1491559399944503300,
  id_str: '1491559399944503299',
  text: 'RT @porteressays: Pay us to do or write your \n' +
    '#Assignments \n' +
    '#calculus \n' +
    '#homework\n' +
    '#Homeworkhelp \n' +
    '#summerclasses \n' +
    '#Onlineclass \n' +
    '#essayhelp \n' +
    '#…',
  source: 'Udemy Offer',
  truncated: false,
  in_reply_to_status_id: null,
  in_reply_to_status_id_str: null,
  in_reply_to_user_id: null,
  in_reply_to_user_id_str: null,
  in_reply_to_screen_name: null,
  user: {
    id: 811058560868159500,
    id_str: '811058560868159488',
    name: 'ProgrammingBuddyClub',
    screen_name: 'programminbuddy',
    location: 'United States',
    url: 'http://www.programmingbuddy.club/',
    description: 'Learning Should be free.. Free courses from Udemy, Coursera, Udacity and many more!',
    translator_type: 'none',
    protected: false,
    verified: false,
    followers_count: 1200,
    friends_count: 96,
    listed_count: 119,
    favourites_count: 52,
    statuses_count: 60588,
    created_at: 'Tue Dec 20 04:00:04 +0000 2016',
    utc_offset: null,
    time_zone: null,
    geo_enabled: false,
    lang: null,
    contributors_enabled: false,
    is_translator: false,
    profile_background_color: '000000',
    profile_background_image_url: 'http://abs.twimg.com/images/themes/theme1/bg.png',
    profile_background_image_url_https: 'https://abs.twimg.com/images/themes/theme1/bg.png',
    profile_background_tile: false,
    profile_link_color: 'ABB8C2',
    profile_sidebar_border_color: '000000',
    profile_sidebar_fill_color: '000000',
    profile_text_color: '000000',
    profile_use_background_image: false,
    profile_image_url: 'http://pbs.twimg.com/profile_images/811059415646425088/jc_6H5t8_normal.jpg',
    profile_image_url_https: 'https://pbs.twimg.com/profile_images/811059415646425088/jc_6H5t8_normal.jpg',
    profile_banner_url: 'https://pbs.twimg.com/profile_banners/811058560868159488/1482820838',
    default_profile: false,
    default_profile_image: false,
    following: null,
    follow_request_sent: null,
    notifications: null,
    withheld_in_countries: []
  },
  geo: null,
  coordinates: null,
  place: null,
  contributors: null,
  retweeted_status: {
    created_at: 'Wed Feb 09 23:46:43 +0000 2022',
    id: 1491559214132650000,
    id_str: '1491559214132649988',
    text: 'Pay us to do or write your \n' +
      '#Assignments \n' +
      '#calculus \n' +
      '#homework\n' +
      '#Homeworkhelp \n' +
      '#summerclasses \n' +
      '#Onlineclass… https://t.co/qwpEzaeMK8',
    display_text_range: [ 0, 140 ],
    source: 'Twitter for Android',
    truncated: true,
    in_reply_to_status_id: null,
    in_reply_to_status_id_str: null,
    in_reply_to_user_id: null,
    in_reply_to_user_id_str: null,
    in_reply_to_screen_name: null,
    user: {
      id: 1288142066082558000,
      id_str: '1288142066082557954',
      name: 'PORTER ASSIGNMENT, HOMEWORK & ONLINE CLASS HELP',
      screen_name: 'porteressays',
      location: 'Texas',
      url: null,
      description: 'Assignment,Homework&Onlineclasses Help\n' +
        'Email: @essaysporter34@gmail.com\n' +
        '\n' +
        '                                             WhatsApp : +1 (763) 309-4996',
      translator_type: 'none',
      protected: false,
      verified: false,
      followers_count: 2179,
      friends_count: 299,
      listed_count: 2,
      favourites_count: 204214,
      statuses_count: 7428,
      created_at: 'Tue Jul 28 15:59:59 +0000 2020',
      utc_offset: null,
      time_zone: null,
      geo_enabled: false,
      lang: null,
      contributors_enabled: false,
      is_translator: false,
      profile_background_color: 'F5F8FA',
      profile_background_image_url: '',
      profile_background_image_url_https: '',
      profile_background_tile: false,
      profile_link_color: '1DA1F2',
      profile_sidebar_border_color: 'C0DEED',
      profile_sidebar_fill_color: 'DDEEF6',
      profile_text_color: '333333',
      profile_use_background_image: true,
      profile_image_url: 'http://pbs.twimg.com/profile_images/1461703754743484423/KvN1L4Pe_normal.jpg',
      profile_image_url_https: 'https://pbs.twimg.com/profile_images/1461703754743484423/KvN1L4Pe_normal.jpg',
      profile_banner_url: 'https://pbs.twimg.com/profile_banners/1288142066082557954/1607497298',
      default_profile: true,
      default_profile_image: false,
      following: null,
      follow_request_sent: null,
      notifications: null,
      withheld_in_countries: []
    },
    geo: null,
    coordinates: null,
    place: null,
    contributors: null,
    is_quote_status: false,
    extended_tweet: {
      full_text: 'Pay us to do or write your \n' +
        '#Assignments \n' +
        '#calculus \n' +
        '#homework\n' +
        '#Homeworkhelp \n' +
        '#summerclasses \n' +
        '#Onlineclass \n' +
        '#essayhelp \n' +
        '#Essaydue \n' +
        '#essaywrite \n' +
        '#Essays\n' +
        '#Music\n' +
        '#Art\n' +
        '#Law \n' +
        '#Javascript\n' +
        '#Python\n' +
        '#Programming\n' +
        '#CodeNewbies \n' +
        '#100daysofcode\n' +
        '#webdevelopment \n' +
        '#Datascience\n' +
        '\n' +
        'Dm @porteressays https://t.co/TUbBU83L9S',
      display_text_range: [Array],
      entities: [Object],
      extended_entities: [Object]
    },
    quote_count: 0,
    reply_count: 0,
    retweet_count: 4,
    favorite_count: 1,
    entities: {
      hashtags: [Array],
      urls: [Array],
      user_mentions: [],
      symbols: []
    },
    favorited: false,
    retweeted: false,
    possibly_sensitive: false,
    filter_level: 'low',
    lang: 'en'
  },
  is_quote_status: false,
  quote_count: 0,
  reply_count: 0,
  retweet_count: 0,
  favorite_count: 0,
  entities: {
    hashtags: [
      [Object], [Object],
      [Object], [Object],
      [Object], [Object],
      [Object]
    ],
    urls: [],
    user_mentions: [ [Object] ],
    symbols: []
  },
  favorited: false,
  retweeted: false,
  filter_level: 'low',
  lang: 'en',
  timestamp_ms: '1644450447437'
}

Everywhere in this data output that you see Object means that there is an object more than two levels deep which the console.log() command will not dig into. If we were to print this with JSON.stringify() then we could see those deeper nestings but for this display this version works best.

Out of all of that data we are only using a few of the keys for this project. Those keys are shown in the section below.

Bucket Keys

user.screen_name
This is the key that will build the aggregate bucket called ScreenName.

entities.hashtags.text
This is the key that creates the Hashtags bucket that holds all of the hashtags used by ScreenName

timestamp_ms
This is the a timestamp in milliseconds that tells us when the tweet was first created.
This is not displayed but is used in the query to set the range filter.

Though we will touch on it again later, since we are talking about the data lets quickly touch on buckets. When you perform an an aggregate query with Elasticsearch the data that is reurned is compiled in a bucket. Each bucket can contain many other buckets and those buckets can contain other buckets, and so on. The depth of aggregations can be quite complex or just as simple as one bucket in one bucket. That is what we will be working with here. In javaScript object/array notation the bucket fo this project will look like this Object{ Array[ Object{ Array[ Object{ } ] } ] }. Or, an Array inside of an object that is an Array inside of an object.

The first bucket of the aggregate query finds all terms that match and sums them up in the bucket, this is what the doc_count in the query output tells you. Then it looks at the second key and counts how many match the first key and the second key and subs that which becomes the doc_count for that bucket.

As a JSON object this looks like the JSON I have provided below. With elasticsearch the array sections of an aggregation always start with an array called buckets.

  "aggregations" : {
    "ScreenName" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 178,
      "buckets" : [
        {
          "key" : "BendevBot",
          "doc_count" : 10,
          "Hashtags" : {
            "doc_count_error_upper_bound" : 0,
            "sum_other_doc_count" : 56,
            "buckets" : [
              {
                "key" : "100DaysOfCode",
                "doc_count" : 2
              },
              {
                "key" : "AI",
                "doc_count" : 2
              },
              {
                "key" : "DataScience",
                "doc_count" : 2
              },
              {
                "key" : "python",
                "doc_count" : 2
              }
            ]
          }
        },
        {
          "key" : "_FunBot",
          "doc_count" : 8,
          "Hashtags" : {
            "doc_count_error_upper_bound" : 0,
            "sum_other_doc_count" : 42,
            "buckets" : [
              {
                "key" : "AI",
                "doc_count" : 2
              },
              {
                "key" : "DataScience",
                "doc_count" : 2
              },
              {
                "key" : "JavaScript",
                "doc_count" : 2
              },
              {
                "key" : "github",
                "doc_count" : 2
              }
            ]
          }
        },
        {
          "key" : "botfordev",
          "doc_count" : 5,
          "Hashtags" : {
            "doc_count_error_upper_bound" : 0,
            "sum_other_doc_count" : 10,
            "buckets" : [
              {
                "key" : "Analytics",
                "doc_count" : 1
              },
              {
                "key" : "Assignmentdue",
                "doc_count" : 1
              },
              {
                "key" : "BigData",
                "doc_count" : 1
              },
              {
                "key" : "JAVA",
                "doc_count" : 1
              }
            ]
          }
        },
        {
          "key" : "full_bugs",
          "doc_count" : 5,
          "Hashtags" : {
            "doc_count_error_upper_bound" : 0,
            "sum_other_doc_count" : 25,
            "buckets" : [
              {
                "key" : "100DaysOfCode",
                "doc_count" : 1
              },
              {
                "key" : "AI",
                "doc_count" : 1
              },
              {
                "key" : "Analytics",
                "doc_count" : 1
              },
              {
                "key" : "BigData",
                "doc_count" : 1
              }
            ]
          }
        },
        {
          "key" : "suhmiren",
          "doc_count" : 5,
          "Hashtags" : {
            "doc_count_error_upper_bound" : 0,
            "sum_other_doc_count" : 0,
            "buckets" : [ ]
          }
        }
      ]
    }
  }
      

When we process this data with the next script, twitter-websocket.js, you will see where the conversion happens to take this data from the Elasticsearch output seen above into formats that D3 can use to build visualizations. More or less it will loop through this array of arrays and match the ScreenName key to the HashTags key and use the doc_count values to give D3 the numbers that it needs to build visualizations. The network graph visualization which is part of this project does not work exactly the way that a standard key/integer does. With the network graph you have to link objects to objects. We will dive deeper into that when after we take a peek at the next scipt twitter-websocket.js.

twitter-websocket.js

        
// twitter-websocket.js elasticsearch to browser query engine
const WebSocket = require('ws')
const elastic = require('elasticsearch');
require('dotenv').config()
//set wss var here since it could be used for SSL or NO SSL
var wss;
//This script uses the .env setting SSL to decide if the scipt should load the SSL
//processing code or not. In a lab environment we can run all of this without SSL
//on the public web SSL is required. I wanted this to be compatible with either.
if(process.env.SSL === "NO"){
  wss = new WebSocket.Server({ port: process.env.WS_SOCKET_PORT || 8080 })
}else{
  //fs modules required to process SSL keys
  const fs = require('fs')
  const util = require('util');
  const readFile = util.promisify(fs.readFile);
  //fs modules
  //handles http and https for the websocket
  const express = require('express');
  const http = require('http');
  const serverPort = process.env.SERVER_PORT || '9889';
  const app = express();
  const server = app.listen(serverPort, () => console.log(`Server listening on ${serverPort}: `));
  const SocketServer = require('ws').Server;
  //https websocket
  //load SSL certificates and set to credentials variable
  var privateKey = fs.readFileSync('ssl-cert/privkey.pem', 'utf8');
  var certificate = fs.readFileSync('ssl-cert/fullchain.pem', 'utf8');
  var credentials = { key: privateKey, cert: certificate };
  //load https module
  var https = require('https');
  //create https object
  var httpsServer = https.createServer(credentials);
  //load websocket module
  var WebSocketServer = require('ws').Server;
  //start https server
  httpsServer.listen(8553);

//open web socket, route through https server 
  wss = new WebSocketServer({
    server: httpsServer
  });
}

//build elasticsearch client with values from process.env
var elasticClient = new elastic.Client({
  host: process.env.ELASTICSEARCH_USER+":"+process.env.ELASTICSEARCH_PASSWORD+"@"+process.env.ELASTICSEARCH_HOST,
  log: 'info',
    sniff: true,
    apiVersion: '7.x',
});

//create work object. I use this work object layout when working with async variables.
//this is a global variable that pass all process information around to the functions
var workObject = {
  queries:{
    //The only query used for this part of the project. Index is set from .env
    userHashtagAggregateQuery: {"index":process.env.ELASTICSEARCH_INDEX,"body":{"aggs":{"ScreenName":{"terms":{"field":"user.screen_name","order":{"_count":"desc"},"size":process.env.QUERY_USER_LIMIT},"aggs":{"Hashtags":{"terms":{"field":"entities.hashtags.text","order":{"_count":"desc"},"size":process.env.QUERY_TAG_LIMIT}}}}},"size":0,"fields":[{"field":"retweeted_status.timestamp_ms","format":"date_time"},{"field":"timestamp_ms","format":"date_time"}],"script_fields":{},"stored_fields":["*"],"runtime_mappings":{},"_source":{"excludes":[]},"query":{"bool":{"must":[],"filter":[{"range":{"timestamp_ms":{"format":"strict_date_optional_time","gte":"now-"+process.env.QUERY_SPAN,"lte":"now"}}}],"should":[],"must_not":[]}}}}
  },
  groupCounter: 1,
  runningQuery: "userHashtagAggregateQuery", // here we set the first query to run and track process progress. For future expansion.
  network:{
    nodes:[],
    links:[],
    barchart:[]
  }
};

//refresh the data in memory on load
refreshData(workObject);
//start web socket processing
//currently we do nothing here as every update will be sent to all clients
//by looping wss.clients.forEach
wss.on('connection', ws => {
  //refreshData(workObject);
  ws.on('message', message => {
  })
});

setInterval(refreshData, process.env.DATA_UPDATE_INTERVAL, workObject);

//We will use this async function to actually run the function chain
//so we can call it from an Interval timer.
async function refreshData(workObject){
  try{
    runElasticsearchQuery(workObject).
    then((workObject => prepareData(workObject).
    then((workObject => sendToClients(workObject)))));
  }catch(error){
    console.log(error);
  }
}


async function runElasticsearchQuery(workObject){
  try{
    // send query to elasticsearch, wait for response. 
    // elasticsearch query errors are handled by catch(error)
    const response = await elasticClient.search(workObject.queries[workObject.runningQuery]);
    workObject.response = response.aggregations
    return workObject;
  }catch(error){
    console.log(error);
  }
}

async function prepareData(workObject){
  try{
    //Shorten bucket variable inside this function
    var data = workObject.response;
    //Create name object so the same names do not end up in nodes twice
    //We will simply set the node name with a value of 1 to mark it as used.
    //In this project only the hastag nodes could be duplicated so it will
    //be used in that loop
    var usedNodes = new Object();
    //Clear the workObject.network key
    workObject.network = {
      nodes: [],
      links: [],
      barchart: []
    }
    if(workObject.runningQuery === "userHashtagAggregateQuery"){
      //Loop through the buckets
      for (const object of data.ScreenName.buckets){
        //We only want to send data that has hashtags.
        //Since the response is in an array we can check
        //the length of the array to see if we should
        //process this bucket
        if(object.Hashtags.buckets.length > 0){
          //this is added to the barchart array that will be sent to the client
          // for barcharts you only need key and a value.
          var barChart = {User: object.key, Value: object.doc_count};
          //push to barchart array
          workObject.network.barchart.push(barChart);
          //this is added to the network.nodes array for use by the network graph
          var nodeData = {
            id: object.key,
            group: workObject.groupCounter, //Each screen_name will be its own group
          };

          //push networ.nodes array
          workObject.network.nodes.push(nodeData);
          //Loop through the hash tag bucket
          for (const tagObject of object.Hashtags.buckets){
            //the hashtags also need a node
            //these we have to check for duplicates
            if(!usedNodes[tagObject.key]){
              //this is used by the d3 network graph
              var nodeData = {
                id: "#"+tagObject.key, 
                group: workObject.groupCounter,
              }//push to the networ.nodes array
              workObject.network.nodes.push(nodeData);
              //Add entry to duplicate checker
              usedNodes[tagObject.key] = 1;
            }else{
              ++usedNodes[tagObject.key]
            }
            //this is needed by the d3 network graph
            var linkData = {
              source: object.key, //the source will be the user set above
              target: "#"+tagObject.key, //this is the destination hashtag
              value: Math.round(object.doc_count/9) //This is optional. It could just be 1, sets the width of the connecting line.
            };
            //push to network.links array
            workObject.network.links.push(linkData);
          }
          //Increment the group counter since we are done with it
          ++workObject.groupCounter
        }else{
        }
      }
    }
    return workObject;
  }catch(error){
    console.log(error);
  }
}

async function sendToClients(workObject){
  try{
    //We will actually send another object
    //so we can add a few things that
    //can change depending on the query.
    var sendObject = {};
    if(workObject.runningQuery === "userHashtagAggregateQuery"){
      sendObject.data = workObject.network;
      sendObject.function = "userHashtagAggregateQuery"
    }
    //loop through wss clients to send new JSON packet to all connected hosts.
    wss.clients.forEach(function(client) {
      client.send(JSON.stringify(sendObject));  
    });
    return workObject;
  }catch(error){
    console.log(error)
  }
}
        
      

As you can see, this code is also heavily commented. I looked around the web for the best way to explain code, looking for something newer and flashier. But in the end, it is easier just to comment the hell out of the code and then summarize what it is doing and digging into more detailed parts if needed. This script does not really enrich the data it just converts it from one format to another. There are many ways to do this and many languages to do it in. I have chosen JavaScript because is still very current, is multi thread by default, and has a very active community. JavaScript is going to be here for a while. On top of that, when used in conjunction with Node.js, can be the client and the server. Soon we will jump into that client code, where all of the visual magic happens.

We already looked at the raw data and the query response data above. Let's summarize it here. We are taking the raw tweet data from Twitter, ingesting it into Elasticsearch and then having Elastic calculate aggregate screen_name buckets full of aggregate hashtag counts. This data is then parsed into three arrays called nodes, links, and barchart inside of an object named network. This data is then pushed across the websocket to every connected browser every DATA_UPDATE_INTERVAL milliseconds. Now say that in one breath.

We are almost ready to test these scripts, but we need to do a bit of setup and configuration first. These scripts have a dependancy valid package.json file that can be used to install of the required Node.js modules. Lets head on over to the setup instructions.

Script Setup and Configuration

Hopefully the descriptions above help you understand the data and the scripts that we will be using to process this data. We are almost ready to start these scripts and work on visualizing the data. We just need to do a bit of configuration first.

Elasticsearch Index Template

Elasticsearch will define many field types through a best effort guess. It is pretty good at detecting field times but sometimes it needs a little help. This is where the Elasticsearch template comes in. For this project there are also a few other settings besides the field types that we need to configure. This template like the nginx.conf file that we worked with earlier is from the repo at ~/tmp/twitter-feed/extras/twitter-stream-template.json. Below I have added comments directly to the template for ease of explaining.

        
{
  "index_patterns": ["twitter-stream*"], //Only use on indices that match this
  "settings": {
    "number_of_shards": 1, //the default, added for reference
    "number_of_replicas": 0 //this is set because we are using 1 node.
                            //replicas would never be provisoned because
                            //of the 1 shard per node rule.
  },
    "mappings": {
      "dynamic_templates": [ //dynamic mappings apply to every field that matches
        {                    //in this case we need keyword for aggregations
                             //So everything that looks like a string will use
                             //field type keyword
          "strings_as_keyword": {
            "match_mapping_type": "string",
            "mapping": {
              "ignore_above": 1024,
              "type": "keyword"
            }
          }
        }
      ],
      "properties": {
        "retweeted_status": { 
          "properties": {
            "timestamp_ms": { //tell elastic that retweeted_status.timestamp_ms is a date field
              "type": "date"
            }
          }
        },
        "timestamp_ms": { //tell elastic that timestamp_ms is a date field
          "type": "date"
        }
      }
    }
}
        
      

This bit of JSON that is the template is commented inline. Do not copy paste this into your project. If you want direct access to the template access it at the file mentioned above. Below I have included a full curl command to apply this template to the local elasticsearch host that we built earler.

curl -X PUT "localhost:9200/_template/twitter_stream_template?pretty" -H 'Content-Type: application/json' -d'{"index_patterns":["twitter-stream*"],"settings":{"number_of_shards":1,"number_of_replicas":0},"mappings":{"dynamic_templates":[{"strings_as_keyword":{"match_mapping_type":"string","mapping":{"ignore_above":1024,"type":"keyword"}}}],"properties":{"retweeted_status":{"properties":{"timestamp_ms":{"type":"date"}}},"timestamp_ms":{"type":"date"}}}} '

If there are no errors this command should return the message below.

{
  "acknowledged" : true
}
      

Now, all we have to complete is the dotenv configuration, and we should be able to run these scripts. At one time these two scripts were split into separate project folders. In this fourth revision of this project, I decided to put the Twitter streamer and the WebSocket into the same folder for ease of node module and configuration management. This change came from a lesson learned moment when I figured out that .env would overwrite the variable stack in memory if loaded from two different scripts. Also, the more I thought about it, I want these projects to be as easy to run as possible, so it is easier to run npm install once in one folder versus navigating around and running it twice.

A sample .env file is included with this project at ~/tmp/twitter-feed/servers/default.env. All of the default settings work just fine except for the TWITTER key settings. You have to have these for the Twitter streamer to work. If you do not have one please check out the Twitter Developer site. I have included the default.env configuration below.

SOCKET_DEBUG="1"
STREAM_DEBUG="1"
SSL="NO"

TWITTER_CONSUMER_KEY="longencryptedstring"
TWITTER_CONSUMER_SECRET="longencryptedstring"
TWITTER_TOKEN_KEY="longencryptedstring"
TWITTER_TOKEN_SECRET="longencryptedstring"

TWITTER_TRACK="javascript, programming, bigdata"
TWITTER_FOLLOW=""
TWITTER_LANG="en"

ELASTICSEARCH_HOST="localhost:9200"
ELASTICSEARCH_INDEX="twitter-stream"
ELASTICSEARCH_USER="elasticuser"
ELASTICSEARCH_PASSWORD="elasticpassword"
ELASTICSEARCH_BATCH_SIZE="5"

QUERY_SPAN="15m"
QUERY_USER_LIMIT="10"
QUERY_TAG_LIMIT="10"
      

If you are following the project from the beginning these settings will all work fine. Except of course for the Twitter keys. SOCKET_DEBUG enables debug for twitter-websocket.js, this prints the fresh workObject to console every time it is generated. STREAM_DEBUG enables debug for twitter-stream.js, this prints DEBUG: running bulkIndexTweets every time a batch is sent to Elasticsearch. These are the only things DEBUG does. For our project setup SSL="NO" is what we need. As mentioned above, in another chapter we will get to deploying this on a public facing web server and aplying SSL to both the domain and the websocket using certbot.

The Twitter keys are fairly self explainatory. The next three TWITTER settings are stream filter options. TWITTER_TRACK is a comma delimited list of keywords that you would like to, more or less, search for. The three included with default.env will give a decent tweet stream without overloading our small 1 node data store. TWITTER_FOLLOW is a comma delimited list of user id's that you wish to follow. This needs to be the numerical ID of the user. You can use this website to get that number for any Twitter screen name https://tweeterid.com. We can get the same data out of the tweet data but would not find the ID of a user that the system has not seen a tweet from. You can also use the Twitter API to do the same conversion, those things are just a bit beyond where I want to go with this project right now.

The ELASTICSEARCH section will work just fine for our localhost install. If you are working with an existing cluster or decide to build a full cluster, you will adjust the HOST, USER, and PASSWORD to match. The INDEX is, of course, where we will write and read the Twitter stream. The BATCH_SIZE is how many records twitter-streamer will ingest into the system at once. For this project, 5 seems to work fine. This is very low for most production situations but works fine for this project and lets us look at the data in smaller chunks versus seeing a log output of thousands of failed ingests. This setting also slightly delays ingestion if it is set too high. You can not query the document until it has been ingested into the system and verified. If you have a slow stream that only gets 1000 tweets every few minutes, the client will be waiting for a few minutes before sending them. You can use timeouts to force your client to send the batch early, but it is best to just have your system tuned to the expected ingest load. Happily the BATCH size is a client setting that can be quckly adjusted.

The last three options have to do with the queries that the websocket runs. QUERY_SPAN is the time span to run the aggregation on. The notation is based on number of seconds, minutes, hours, days, and so on. For this project, 15 minutes is a good timeframe to start with so the value is 15m. QUERY_USER_LIMIT tells the query to only reutrn that many users. QUERY_TAG_LIMIT tells the query how many individual hashtags to return for each user. These two default limits could potentially return 100 records, 10 users and 10 hashtags per user. This query runs in descending mode so you can see this as the top 10 hashtags of the top 10 tweet senders. How much stress this will put on our elastic node has to do with how active your twitter stream is and how long the QUERY_SPAN is. You can adjust these all the way down to 1 even with a very large QUERY_SPAN and it would still work. But as you increase the QUERY_SPAN more and more and more documents have to be queried so performance will eventually suffer. Though if you are using a full cluster or an ultra powerful VM for this project you have a lot more room to adjust the query.

Now, just one last thing that we need to do before we can run this script. We have to install the needed node modules with npm. This is much more simple since I moved both server scripts into one folder. If you have followed along up until now you should be able to navigate to the servers folder by running the command cd ~/tmp/twitterfeed-main/servers/. From inside this folder we only need to run npm install. This will install 6 modules; elasticsearch,twitter,ws,express,http, and https. This is all we need to test the first script. Launch the script with node twitter-stream.js. If everything is working you should see output similar to what is below.

>~/tmp/twitterfeed-main/servers$ node twitter-stream.js
DEBUG ENABLED 1
Elasticsearch INFO: 2022-02-10T14:29:42Z
  Adding connection to http://localhost:9200/

DEBUG: running bulkIndexTweets
SAMPLE:
Tweet: BashNano -> Thu Feb 10 14:29:37 +0000 2022
DEBUG: running bulkIndexTweets
SAMPLE:
Tweet: AbereAllan -> Thu Feb 10 14:29:43 +0000 2022
DEBUG: running bulkIndexTweets
SAMPLE:
Tweet: cacareliefange1 -> Thu Feb 10 14:29:47 +0000 2022
      

Since STREAM_DEBUG is enabled this list of users that tweeted and the time that the tweeted will run forever. This is good for now, but we will need to turn this off when we turn it over to PM2 or it could create huge log files. If you do run into PM2 log size problems you can check the folder ~/.pm2/logs, this is also where you would check for script errors once they are running under PM2.

So now that twitter-stream.js is running lets open another terminal window to this project host and see what Elasticsearch has to say about this data. In a new terminal run this command curl 'localhost:9200/_cat/indices/twitter-stream?v&pretty'

curl 'localhost:9200/_cat/indices/twitter-stream?v&pretty'
health status index          uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   twitter-stream uQR-x14DRmGbpjnfLjJrxA   1   0         11            0    115.3kb        115.3kb

      

This can be a little hard to follow even using a pre tag so I have replicated this output in a table below.

HeaderValueDescription
healthgreenOverall Health of the Index
statusopenCurrent status of queried index
indextwitter-streamName of the queried index
uuiduQR-x14DRmGbpjnfLjJrxAUnique ID of the index, used internally. Can be used externally.
pri1Number of primary index shards
rep0Number of replica index shards
docs.count11Number of documents available in this index
docs.deleted0Number of documents marked as deleted but not actually removed from the index.
store.size115.3kbIndex size across all shards, primary and replica. If replicas were active this would be roughly twice the size of pri.store.size
pri.store.size115.3kbIndex size across only primary shards.

Through this we can see that data is being ingested from twitter-stream.js into Elasticsearch. If for some reason you want to delete this index and start over, at any time you can run the command curl -X DELETE "localhost:9200/twitter-stream?pretty" and this will completely dump the twitter-stream index. Because of the template that we entered earlier all you need to do is start the twitter-stream.js script and a new index will be built as soon as the first batch of tweets is sent.

Now we need to start the twitter-websocket.js script. We already installed the modules and completed the configuration when we setup twitter-stream.js. So the only thing that we have to do here is run the script with node twitter-websocket.js. First we need to check that the twitter-stream.js script is still running, the socket does timeout sometimes. As I have gone back to check mine, it did timeout. The output looks like the example below.

DEBUG: running bulkIndexTweets
SAMPLE:
Tweet: opensource_orgs -> Thu Feb 10 14:41:02 +0000 2022
DEBUG: running bulkIndexTweets
SAMPLE:
Tweet: py_data_sci -> Thu Feb 10 14:41:05 +0000 2022
DEBUG: running bulkIndexTweets
SAMPLE:
Tweet: sails_casts -> Thu Feb 10 14:41:06 +0000 2022
DEBUG: running bulkIndexTweets
SAMPLE:
Tweet: Market4_success -> Thu Feb 10 14:41:11 +0000 2022
DEBUG: running bulkIndexTweets
SAMPLE:
Tweet: XeronBot -> Thu Feb 10 14:41:15 +0000 2022
Exiting. Time difference is > 120000ms
      

If you see this, and you will eventually, it means that the Twitter socket has timed out. I am not sure why this happens. But I have found that the easiest way to work with this is to just exit the script. Since we will be using pm2 it can handle restarting the script. This is the quick and dirty way out, there are ways to write error catching routines that will restart the socket internally and eventually we will upgrade this project to go that route. But for now we are just worried about getting data from one place to another.

Weather this script has been running all of this time or has now timed out lets turn this one over to pm2 so we can focus on the rest of the project. First we want to turn off stream debugging. Edit the .env file and change STREAM_DEBUG="1" to STREAM_DEBUG="0". If you run it again after this change you should see the output below.

Elasticsearch INFO: 2022-02-10T16:18:38Z
  Adding connection to http://localhost:9200/
      

This will always show up as it is our marker that the server has started. The only other thing that will show up will be the notification a timeout when it happens. Once started with pm2 the logs for this script will be available at ~/.pm2/logs. If the script is running cancel it with CRTL-C and then run pm2 start twitter-stream.js. You should see output similar to the image below.

Output of -- pm2 start twitter-stream.js

Just to be sure it started we can tail the log at ~/.pm2/logs/twitter-stream-out.log and ~/.pm2/logs/twitter-stream-error.log. We can also see if the script is working by checking the status of the twitter-stream index by running the curl command we used before curl 'localhost:9200/_cat/indices/twitter-stream?v&pretty'. If you run this a few times and keep an eye on the docs.count column you should see this number increasing if the stream is working. If everything looks good here lets finally start the websocket server.

The websocket server still has debug turned on so we are going to start it without pm2 the first time so we can take a look at the output of the workObject since this is what we will be working with when we get to the section on working with the web interface. Just run node twitter-websocket.js from this same fodler and it should start. Below is a sample of the data that will be output when you run this script with debugging turned on.

        
         network: {
    nodes: [
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object],
      ... 173 more items
    ],
    links: [
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object], [Object], [Object],
      [Object], [Object], [Object], [Object],
      ... 777 more items
    ],
    barchart: [
      [Object], [Object], [Object],
      [Object], [Object], [Object],
      [Object], [Object], [Object],
      [Object], [Object], [Object],
      [Object], [Object], [Object],
      [Object], [Object], [Object],
      [Object], [Object], [Object],
      [Object], [Object], [Object],
      [Object]
    ]
  }

      

I chose to display the console.log() compressed version of this data sice I think it gives a better visual to what the data actually looks like. The actual data if printed using JSON.strigify() would show a huge chunk of almost undecipherable data which would not get the point across of what we are working with. So as you can see, this is an object with three arrays named nodes, links, and barchart. Each of these Array's are full of objects that tell D3 what to do with the data. We saw how this data is built earlier when we looked at the twitter-websocket.js script. This was all processed at the server so all the client has to do is apply it to the proper function and the visuals will be rendered.

Since we have an exmaple of the data lets go ahead and turn control of twitter-websocket.js over to pm2. If the script is running stop it with CTRL-C. Then we need to turn of SOCKET_DEBUG. Edit the .env file and change SOCKET_DEBUG="1" to SOCKET_DEBUG="0". Now we can let pm2 take over by starting this script with the command pm2 start twitter-websocket.js

The Web Interface

To quickly get a look at what all of our hard work has accomplished lets hop over to the html folder and make a few changes. You can get there quickly with this command cd ~/tmp/twitterfeed-main/html/. In here you will see index.html, the css folder, and the js folder. Before we can look at what we have created we have to make a change to the js/twitter-client.js file. We have to tell the browser code where it can connect to the web socket. I do not know of any way to pass this to the client as it has to know where to connect first. Any way that you do this, this value has to be read locally so we just edit the code. On the second line modify var ws = new WebSocket("ws://localhost:8080/");, change localhost to match the ip address of the VM that this project is running on. Once this is done, refresh the browser window we opened way back at the beginning of this document or open a new browser and navigate to http://ip_of_this_vm. You should see something similar to the screenshot below.

Working Twitter Stream Website

If you watch this page long enough you will see that the bar chart updates automatically yet the network graph does not. The network graph uses force and gravity settings to create the network map by pushing points away from each other. One update can move a node to the other side of the window, this could get confusing. In Part 2 of this project, adding user interaction to the queries, we will give the user an option to update the network graph manually. The bar chart on the other hand is easier to follow if it changes and somewhat more useful this way. How one updates and not the other is explained in the twitter-client.js commented code below.

        
          //set to client websocket. For this project this will be the websocket we created with twitter-websocket.js
var ws = new WebSocket("ws://192.168.2.192:8080/");
//Set new graph variable for testing if a new network graph needs to be built
var newGraph;
//Do this when new message comes from websocket

ws.onopen = function(e) {
//send websocket message that we want the data so 
//we don't have to wait for the websocket server update timer
  ws.send("start");
};
ws.onmessage = function(evt) {
    var received_msg = evt.data;
    console.log("Message is received...");
    //convert data from websocket to JSON object
    var jsonData = JSON.parse(received_msg);
    console.log(jsonData);
    if(!newGraph){
        //if this is the first data packet build the network graph
        buildD3NetworkGraph(jsonData.data)
        //set newGraph variable so this will only be built once per session
        newGraph = 1;
    }
    //always build the bar chart.
    buildD3BarChart(jsonData.data.barchart)

};

function buildD3BarChart(d3Data) {
    //clear the barchart div otherwise new axis will stack on top of old axis
    d3.selectAll("#userBarDiv > *").remove();
    //create the svg object and give it the bargraph div
    const svg = d3.select("#userBarDiv"),
        margin = {
            top: 20,
            right: 20,
            bottom: 80,
            left: 40
        },
        //height and width are hardset. 
        //For some reason reading the dimensions of the div does not work here
        width = 800, 
        height = 300,
        // set the x and y scales based on width and height
        x = d3.scaleBand().rangeRound([0, width]).padding(0.2),
        y = d3.scaleLinear().rangeRound([height, 0]),
        g = svg.append("g")
        .attr("transform", `translate(${margin.left},${margin.top})`);
    console.log(width, height)
    var data = d3Data;

    //these set the max extent of the. On the x domain it is a count of the number of users, 
    //hence the number of bars that will be drawn
    //the y domain is the maximum value that a single bar can be, this sets that maximum.
    x.domain(data.map(d => d.User));
    y.domain([0, d3.max(data, d => d.Value)]);

    //This sets the html object attributes for the graph
    g.append("g")
        .attr("class", "axis axis-x")
        .attr("transform", `translate(0,${height})`)
        .call(d3.axisBottom(x))
        .selectAll("text")
        .style("text-anchor", "end")
        .attr("dx", "-.8em")
        .attr("dy", ".15em")
        .attr("transform", "rotate(-65)");;

    g.append("g")
        .attr("class", "axis axis-y")
        .call(d3.axisLeft(y).ticks(10));

        //Besides setting maximums on the x and y domains this is where the 
        //data we prepared comes into play. This will draw a standard vertical
        //bar chart. the x values are read from the User key and the y, or height
        //of the bars will be set by the Value key that we sent from the websocket.
        //the .attr height sets the height of the bar based on the Value
    g.selectAll(".bar")
        .data(data)
        .enter().append("rect")
        .attr("class", "bar")
        .attr("x", d => x(d.User))
        .attr("y", d => y(d.Value))
        .attr("width", x.bandwidth())
        .attr("height", d => height - y(d.Value));

}

function buildD3NetworkGraph(d3Data) {
    console.log(d3Data)

    //As complex as this function might seem our data
    //only comes into play in three places. Everything else
    //are functions to assist the zoooming and the dragging of
    //network graph.
    var width = document.getElementById("networkSVG").offsetWidth;
    var height = document.getElementById("networkSVG").offsetHeight;

    var svg = d3.select("#networkSVG")
        .append("svg")
        .attr("width", width)
        .attr("height", height)
        .call(d3.zoom().on("zoom", function() {
            svg.attr("transform", d3.event.transform)
        }))
        .append("g")

    var color = d3.scaleOrdinal(d3.schemeCategory20);

    //This is the first place that our data is used.
    //here it is used to calculate the force simulation between
    //the nodes and the links. 
    const simulation = d3.forceSimulation(d3Data.nodes)
        .force('charge', d3.forceManyBody().strength(-100))
        .force('link', d3.forceLink(d3Data.links).id(d => d.id)
            .distance(35))
        .force('center', d3.forceCenter(width / 2, height / 2))

    //The data is used again here to actually draw the links that were calculated above.
    var link = svg.append("g")
        .attr("class", "links")
        .selectAll("line")
        .data(d3Data.links)
        .enter().append("line")
        .attr("stroke-width", function(d) {
            return Math.sqrt(d.value);
        });
    //The data isused again here to create the node object that will be turned into a circle
    //below at var circles.
    var node = svg.append("g")
        .attr("class", "nodes")
        .selectAll("g")
        .data(d3Data.nodes)
        .enter().append("g")
    //add drag capabilities  

    var circles = node.append("circle")
        .attr("r", 3)
        .attr("fill", function(d) {
            return color(d.group);
        });

    // Create a drag handler and append it to the node object instead
    var drag_handler = d3.drag()
        .on("start", dragstarted)
        .on("drag", dragged)
        .on("end", dragended);

    drag_handler(node);
    //Create label for adding text to node
    var lables = node.append("text")
        .text(function(d) {
            return d.id;
        })
        .attr('x', 4)
        .attr('y', 4);

    node.append("title")
        .text(function(d) {
            return d.id;
        });
        //Start the simulation of nodes.
    simulation
        .nodes(d3Data.nodes)
        .on("tick", ticked);

    var g = svg.append("g")
        .attr("class", "everything");
    var drag_handler = d3.drag()
        .on("start", drag_start)
        .on("drag", drag_drag)
        .on("end", drag_end);

    drag_handler(node);


    //add zoom capabilities 
    var zoom_handler = d3.zoom()
        .on("zoom", zoom_actions);

    zoom_handler(svg);

    function drag_start(d) {
        if (!d3.event.active) simulation.alphaTarget(0.3).restart();
        d.fx = d.x;
        d.fy = d.y;
    }

    //make sure you can't drag the circle outside the box
    function drag_drag(d) {
        d.fx = d3.event.x;
        d.fy = d3.event.y;
    }

    function drag_end(d) {
        if (!d3.event.active) simulation.alphaTarget(0);
        d.fx = null;
        d.fy = null;
    }

    //Zoom functions 
    function zoom_actions() {
        g.attr("transform", d3.event.transform)

    }

    function ticked() {
        link
            .attr("x1", function(d) {
                return d.source.x;
            })
            .attr("y1", function(d) {
                return d.source.y;
            })
            .attr("x2", function(d) {
                return d.target.x;
            })
            .attr("y2", function(d) {
                return d.target.y;
            });

        node
            .attr("transform", function(d) {
                return "translate(" + d.x + "," + d.y + ")";
            })
    }

    function dragstarted(d) {
        if (!d3.event.active) simulation.alphaTarget(0.3).restart();
        d.fx = d.x;
        d.fy = d.y;
    }

    function dragged(d) {
        d.fx = d3.event.x;
        d.fy = d3.event.y;
    }

    function dragended(d) {
        if (!d3.event.active) simulation.alphaTarget(0);
        d.fx = null;
        d.fy = null;
    }
}
        
      

D3 is a feature rich visualization library that can do much more than what we have done with it in this simple project. Though as the other parts of this project move forward we will use more features of D3. For a better understanding of how all of D3 works it is best to check out their documentation here D3 V4 API Reference.

And we have come to the end of the twitter-websocket.js script that sends data to the webpage and realtime updates to a bar chart. This is technically the end of this project. You should have a working, single VM, data observability test platform. The great thing is that the process is the same for visualizing all data. The hard work always comes in at the part where you have to convert the data from one format to another, which we will do on a few different data sets through this project and others at this site.

Stay tuned for Part 2 where we add user controls to the interface which will have us change the script to track web sockets. Also we will be adding system metrics to the dashboard page that are also collected through the websocket.

Deep Projects