Wednesday, May 3, 2017

Elasticsearch. Logstash. R. What?!

Motivation

At bsidesNash, Chris Sanders gave a great talk on threat hunting.  One of his recommendations was to try out an ELK (Elasticsearch, Logstash, Kibana) stack for searching for threats in log data.  ELK is an easy way to stand up a distributed, scalable, stack capable of storing and searching full text records.  The benefit is it's easy ingestion (Logstash), schema-agnostic storage ability, (Elasticsearch), and robust search and dashboards (Kibana) makes it easy platform for threat hunters.

However, because of it's ease, ELK tends to be a one-size-fits-all solution for many tasks.  I had asked Chris about using other tools for analysis such as R by way of Rstudio and dplyr or Microsoft Power BI.  Chris hadn't tried it and, at the time, neither had I.  (My day job is mostly historic data analysis rather than operational monitoring.)

Opportunity

However, the DBIR Cover Challenge presented an opportunity.  For those who are unaware, each year there is a code or codes hidden on the DBIR cover.  That code then leads to a puzzle challenge which has resulted in some nice rewards for the winners; (iPad minis, auto-follow telescopes, Yeti coolers, quadcopters, 3D printers, and more).  The challenge has multiple puzzles of which players must complete 8.  So that they check their answers as they go, the site is a dynamic webapp hosted at Heroku.  Because it is dynamic, I can add my own log messages into the endpoint functions.

But I needed a place to store and search the logs.  Heroku provides some great plugins for this, but, given the conversation with Chris, I figured I'd try to roll my own, starting with ELK.  The first hurdle was that, though there is a lot of hosted Elasticsearch and Kibana, there was much less hosted Logstash (the part I really needed).  Elastic cloud didn't have it.  AWS had their own tools.  Finally I found logit.io which works perfectly.  They provide a full ELK stack as a cloud service for around $20 at the low end with a 14 day trial. I signed up for the trail and was up-and-running in minutes.  They even have an easy one-line instruction on how to set up a Heroku drain to send logs to a logit.io Logstash endpoint.  From there, it is automatically stored in Elasticsearch and searchable through Kibana.

Going beyond ELK

The problem I quickly found out, was that Kibana didn't have the robust manipulation I was used to using R.  While it could find entries and make basic dashboards, I simply couldn't cut the data like I wanted once I'd found the subset of data I was interested in.  I tried passing the data to PowerBI, but on first blush, the streaming API setup was too limited to ingest a heroku drain using the basic setup tools.  Finally, I decided to try and keep the Logstash and Elasticsearch underpinnings, but switch to R for analysis.  R allows for simple pipeline analysis of data as well as robust charting.

Doin it with R

The first step was to install the packages I'd need:
install.packages("dplyr") # for simple piped data processing
install.packages("elastic") # for talking to the Elasticsearch store
install.packages(flexdashboard) # for creating a dashboard to monitor
install.packages("DT") # for displaying a HTML data table in the dashboard
install.packages("stringr") # simple string manipulation
install.packages("ggmaps", "viridis", "rgeolocate", "leaflet") # geocoding IPs and displaying them on a map
install.packages("devtools", "treemap") # create treemaps
devtools::install_github("Timelyportfolio/d3treeR") # create treemaps
After installing packages, the next step was to set up the Elasticsearch connection:
elastic::connect(es_host="<my ES endpoint>", es_port=443, es_path="", es_transport_schema = 'https', headers=list(apikey="<my api key>"))
I also manually visited: "https://<my ES endpoint>/_cat/indices?v&apikey=<my API key>&pretty=true" to see what indexes Logstash was creating.  It appears to create an index per day and keep four indexes in the default logit.io setup.  I stored them into a variable and then ran a query, in this case for the line log line indicating a player had submitted a specific key:
indexes <- c("logstash-2017.04.28", "logstash-2017.04.29", "logstash-2017.04.30", "logstash-2017.05.01") # I should be able to get this from `elastic::cat_indices()`, but it did not apply my apikey correctly
query <- elastic::Search(index=indexes, q="logplex_message:submitted", size=10000)$hits$hits
The following thing we need to do is remove only the fields we want from the query.  The result is a list of query results, each itself a list of key:value pairs.  I used the `lapply` function to extract _just_ the logplex_message field.  (`lapply` takes a function and applies it to each item of a list in R.)  `lapply` returns a list and so I `unlist` the results and make them a column in a dataframe:
submissions <- data.frame(text = purrr::map_chr(query, ~ .$`_source`$logplex_message))
In our puzzle challenge, we have 'trainers' who use 'keys' to indicate they've caught Breachemon.  I can use my normal R skills to separate the trainer name and key from the log message and count how many times each trainer has submitted each key:
submissions <- submissions %>%
    mutate(trainer = gsub("Trainer ([^[:space:]]*).*$", "\\1", text)) %>% # extract 'trainer'
    mutate(key = gsub(".*submitted key (.*) to the bank.$", "\\1", text)) %>% # extract 'key'
    group_by(trainer, key) %>% # group each trainer-key pair
    tally() # short cut for `summarize(n=n())`.  For each trainer-key pair, create a column 'n' with the number of times that pair occurred
From there we can visualize the table with:
DT::datatable(submissions)
We could also visualize the total submissions per trainer:
submitters <- data.frame(text = purrr::map_chr(query, ~ .$`_source`$logplex_message)) %>% # extract the log message and produce a dataframe
mutate(trainer = gsub("Trainer ([^[:space:]]*).*$", "\\1", text)) %>% # extract the trainer
group_by(trainer) %>% # create a group per trainer
tally() # shortcut for `summarize(n=n())`. Count the events per group
d3treeR::d3tree2(treemap::treemap(submitters, "trainer", "n", aspRatio=5/3, draw = FALSE)) # produce a treemap of submissions per person

Dashboard Time

To wrap this all together, I decided to make a simple dashboard.  In the Rstudio menu, File->New File->R Markdown...  In the menu, choose 'From Template' and then Template: 'Flex Dashboard'.  You'll get something like:
---
title: "Untitled"
output:
  flexdashboard::flex_dashboard:
    orientation: columns
    vertical_layout: fill
---
```{r setup, include=FALSE}
library(flexdashboard)
```
Column {data-width=650}
-----------------------------------------------------------------------
### Chart A
```{r}
```
Column {data-width=350}
-----------------------------------------------------------------------
### Chart B
```{r}
```
### Chart C
```{r}
```
Lets add our two charts:
---
title: "Breachemon"
output:
  flexdashboard::flex_dashboard:
    orientation: columns
    vertical_layout: fill
---
```{r setup, include=FALSE}
library(flexdashboard)
library(dplyr)
elastic::connect(es_host="<my ES endpoint>", es_port=443, es_path="", es_transport_schema = 'https', headers=list(apikey="<my api key>"))
query <- elastic::Search(index=indexes, q="logplex_message:submitted", size=10000)$hits$hits
```
Column {data-width=650}
-----------------------------------------------------------------------
### Submissions
```{r fig.keep='none'}
submitters <- data.frame(text = purrr::map_chr(query, ~ .$`_source`$logplex_message)) %>% # extract the log message and produce a dataframe
mutate(trainer = gsub("Trainer ([^[:space:]]*).*$", "\\1", text)) %>% # extract the trainer
group_by(trainer) %>% # create a group per trainer
tally() # shortcut for summarize(n=n()).  Count the events per group
d3treeR::d3tree2(treemap::treemap(submitters, "trainer", "n", aspRatio=5/3, draw = FALSE)) # produce a treemap of submissions per person
```
### Submitters
```{r}
data.frame(text = unlist(lapply(query, function(l) {l$`_source`$logplex_message}))) %>%
    mutate(trainer = gsub("Trainer ([^[:space:]]*).*$", "\\1", text)) %>% # extract 'trainer'
    mutate(key = gsub(".*submitted key (.*) to the bank.$", "\\1", text)) %>% # extract 'key'
    group_by(trainer, key) %>% # group each trainer-key pair
    tally() # short cut for `summarize(n=n())`.  For each trainer-key pair, create a column 'n' with the number of times that pair occurred
  DT::datatable()
```
Column {data-width=350}
-----------------------------------------------------------------------

### Map
```{r}
ips <- data.frame(text = purrr::map_chr(query, ~ .$`_source`$msg_fwd))
geo <- rgeolocate::db_ip(as.character(unique(ips$text)), "<my free db-ip.com api key>") # geocode unique IPs, returns a list
geo <- do.call(rbind.data.frame, geo) # bind the list together as a dataframe
names(geo) <- c("IP", "Country", "State", "City") # set the dataframe column names
geo <- ips %>%
    group_by(text) %>%
    tally() %>% # count per IP
    rename(IP = text) %>%
    right_join(geo, by="IP") # join with geolocation
cities <- unique(as.character(geo$City)) # unique list of cities
cities <- cbind(ggmap::geocode(cities), cities) # geo code the cities
geo <- right_join(geo, cities, by=c("City" = "cities")) #join it back together
pal <- leaflet::colorFactor(viridis::viridis_pal(option = "C")(2), domain = geo$n) # create a color range
leaflet::leaflet(geo) %>% # make a map
  leaflet::addTiles() %>% # add some default shapes to it
  leaflet::addCircleMarkers(color = ~pal(n)) # add a circle with a color based on the count of submissions for each IP
```
Resulting in:

The last block pulls the msg_fwd field which contains the source IP adddress, splits it (as some have multiple), and stores it in a dataframe.  It then geolocates the IPs and binds the cities.  After that it geocodes latitude and longitude and joins it.  Finally it places the geolocated and coded IPs as dots on a map.

Wrapup

That's not to say there aren't hang-ups.  You _are_ pulling the data from the remote cluster to your local machine which is a relatively costly action.  (The queries I ran returned in a fraction of the second, but I can imagine querying a billion record store, returning tens of thousands of hits, would be slower.)  However, as Chris noted during his talk, not being selective in what you retrieve to search is one of the signs of a junior analyst.  Also, I have not automated retrieval of more than 10,000 records or the automatic tracking of indexes as they are created.  Finally, the dashboard must be refreshed manually.  There's a little button to do so in the Rstudio browser, however I think it may make more sense to provide a Shiny button to use to update all or selected portions instead.  Unfortunately, most of this goes beyond the few hours I was willing to put into this. proof of concept.

In the end, it was well worth the experimentation.  It required no hardware and brings the robust slicing and dicing of data that the R ecosystem provides to the easy and scalable storage of ELK. Though the logit.io service doesn't allow direct configurability of most of the ELK stack, they seem responsive to requests.  I'm actually not sure that the ES portion of ELK is really necessary.  If you are working with a limited number of well-defined data sources, a structured store such as Postgres, or a key:value store such as hive/hbase might make more sense.  R has nearly the repository of packages that Python does.  On my mac pro I can work with datasets in the 10's of millions of records, providing all sorts of complex analysis.  All in an easily-documentable and repeatable way.

In the future, I'd love to see the same thing done with MS PowerBI.  It's not a platform I know, but I think it would definitely be an interesting one to explore.  If anyone has any ideas on how to stream data to it, please let me know!

2 comments:

  1. The GitHub repo "timelyportfolio/d3tree2" doesn't exist. I think you are referring to "Timelyportfolio/d3treeR" ?

    ReplyDelete
  2. yes. Thanks. I'll update the post.

    ReplyDelete