ACID refers to a set of properties necessary for a transaction system.
Atomicity – This refers to transactions completing fully or failing without any partial changes. If a bank account is debited then another account needs to be credited. Atomicity means that either both operations succeed or both fail. There should be no scenario in which one succeeds and the other fails.
Consistency – This means that the database should permit only those transactions that comply with the rules. For instance if the rule is that account balance should not be negative and a transaction tries to debit an amount greater than the available balance then it should be rejected. The database must be in a consistent state before and after the transaction.
Isolation – This refers to one transaction not affecting another. If multiple transactions occur concurrently then the end result would be as if they occurred in sequence one after another. If a bank account has a balance of 500 Dollars and two transactions execute concurrently – one debiting 100 Dollars and another crediting 300 Dollars then the result would be a balance of 700 dollars (as if the transactions occurred one after another). Durability – This means that all committed transactions must be retained even if the system faces a hardware or software failure.
ACID and Druid
Druid is a database for building a modern Analytics application. Druid cannot be used as a system of record and does not support transactions. Given the focus on analytics,ACID guarantees are not fully relevant to Druid. However, it is interesting to consider some of the capabilities of Druid in this light
Atomicity – There are two scenarios in which everything is committed or nothing is committed
Batch ingestion – In batch ingestion, the ingestion succeeds and commits data to deep storage only if all rows in the ingestion succeed. Otherwise the entire ingestion fails and no rows are committed.
Real time ingestion – Real time ingestion proceeds from a checkpoint for one hour (configurable) and creates another checkpoint and commits data up to that check point. However if any of the rows fail then the ingestion fails in its entirety and begins from the previous checkpoint again.
The above provides sufficient Atomicity guarantee to ensure that Druid does not provide an incorrect picture and that users don’t have to track which data went through and which did not. Taking the example of debit and credit into bank accounts, these records will have same time stamp in the transaction system and when they are ingested into Druid then either both will be ingested or both will not be (with the caveat that both records are part of the same batch ingestion or the same kafka topic). This will ensure that the analytics results are correct.
Consistency – Druid does not allow a lot of rules on the data. There are no primary keys and no foreign keys. If the data types for columns are specified in the ingestion spec then any data that does not conform to the spec will be rejected. When an entity is updated Druid will hold multiple records for that entity as each update is an event in time. Druid provides LATEST sql command to get a consistent picture of the latest state of the entity.
Isolation – Isolation can be seen in Druid’s ability to ingest millions of rows per second while handling 100’s of queries per second at the same time – Historicals handle queries on the data that is already in the system and the indexers respond to queries using the data that is being indexed. Indexers publish the indexed data to deep storage as segments and the historicals download the segments from deep storage and the indexing task is complete only when a Historical has downloaded the segment. Any queries on the data being indexed is handled by the indexer until the Historical has downloaded the segment. This ensures that there are no read-write conflicts (consider the alternate approach of indexers writing the data directly to the historicals. This would have made it very difficult to avoid read-write conflicts). Additionally ,segments (which contain the data indexed and stored in a columnar format) are immutable. This ensures isolation between reads and writes.
Durability – The primary persistent store is the deep storage which is usually a blob storage. So any data that has been committed to deep storage is available even if the entire cluster goes down. A new cluster can be brought using the data in deep storage. Batch ingestion succeeds only if all the data in the batch is committed to deep storage and real time ingestion succeeds only when all data from the previous checkpoint is committed to deep storage.
Conclusion
Druid provides sufficient ACID guarantees from an analytics perspective. Full transactional ACID is not relevant to Druid. The guarantees Druid provides enable building a modern analytics application with large volume data, high throughput real time ingestion and high query concurrency
Apache Druid is an amazing database for powering a modern analytics application. Typically an analytics application requires sub second response times under heavy load and TB/PB scale. This means that doing as much processing as possible (other than aggregations which Druid does really well) in the ingestion or in the pipeline before Druid is often helpful.
One use case where processing needs to be done in the pipeline before Druid is when you have multiple streams of data flowing into Druid and the streams need to be joined for analytics. With the most recent release of Druid joining data while ingesting from batch sources is well supported using the multi-stage query engine (MSQ), however joining streaming sources is not possible during ingestion and hence the join has to be done at query time. This impacts query performance and hence the need to do the join in the pipeline upstream of Druid.
A typical example of this use case is analysing order information. Typically orders are in one stream and order details (order line items) are in another stream. To analyse orders the data needs to be joined on the order_id. One approach is to ingest order and orderdetail into separate data sources and join in the query. However typically both order and detail information runs into millions of records. Hence the preferred approach is to build a pipeline with Flink, Kafka and Druid.
Data and setup
The data setup consists of two files
orders.csv – This is a csv file with order information. The columns are orderid,orderdate,deliverydate,shipppeddate,status,comment and orderno.
orderdetails.csv– This is a csv with orderid,itemname,qty,rate and lineno.
For this demo I have created two kafka topics – orders and orderdetails which contain the contents of these two files and third one called joineddata which receives the joined stream from Flink. My aim is to put together the following pipeline.
The installation needs to be in the above order as Druid includes zookeeper while Kafka does not. Use the quick start guides which each of the above products to start the services.
Setting up the Flink Pipeline
Download the the data files orders.csv and orderdetails.csv. Create three topics in kafka – orders,details and joineddata
The above code creates a stream environment (env) and stream table environment (tableEnv). Setup two Kafka sources in Flink and ingest from orders and orderdetails.
Publish the join results to the Kafka topic joineddata and execute the streaming environment. Build the above code as a maven build and run it in Flink
./flink run <jar file>
Use the Druid ingestion spec (github) to start a Druid Kafka ingestion task. This should get the joined data into Druid and you should see the source in Druid
Conclusion
There are many use cases which require joining data coming through multiple Kafka streams into Druid. One way of handling such use cases is to join the data outside Druid. In this blog an approach using Flink to join the data was demonstrated. This is a scaleable approach that can be used with streams with large throughput and allows Druid to be used for the adhoc aggregations.
With all the excitement about chatGPT I started wondering how Druid can be used to help with training machine learning models. A typical model requires exploratory data analytics to identify features that can contribute, quick access to large amounts of data for training the model and real time monitoring of inputs and outputs from a model. Starting with this post I will be putting together a series of posts on the role Apache Druid can play in the model pipeline. In this first post I will go over some of the capabilities of Druid and how they help with ML models. I will then layout an example of training a regression model using druid as a source.
Where does Druid fit?
Exploratory Data Analytics
Exploratory analytics refers to understanding the different dimensions and metrics and how they behave in relation to the attribute that is being predicted. Some examples of the kind of analytics that are required
Identify number of rows with null values in categorical attributes and drop null values from training data
Most ML models do not deal well with null values in categorical variables
This is a filter operation – very fast in Druid. Druid uses bitmap indexes and hence filter operations are blazingly fast
Look at row counts for different categorical variables
Helps in identifying skew
This is an aggregation – very fast in Druid. Druid uses a shared nothing architecture, pre-fetching of data and partitioning to make aggregations very fast.
Compute mean, standard deviation and variance for numeric values
Identify outliers when training regression models to predict numeric data
Eliminate skewed data that could affect training output
Statistics on petabytes of data – very fast in Druid
Aggregate at different time granularities
Eliminate noisy data when using forecasting models
Time series queries – very fast in Druid. Data in druid is partitioned by time as the primary dimension
Look at time variation of metrics with different dimensions
Identify closely correlated variables and remove them from the training dataset
Time series queries – very fast in Druid
Model training
Model training requires one or more of below capabilities
Extract different data points from multiple time intervals to capture patterns in the data.
Train models on data with different granularities and compare accuracy
Extract specific patterns from data and check prediction accuracy
Extract 1000s of data points to train genetic algorithm.
All the above with many data scientists concurrently querying the data lake and executing queries.
With Druid one can easily extract data at different granularities and from different time periods. Druid is one of the best databases for handling high concurrency workloads.
Monitoring models
While there are many specialised tools to both server and monitor models, Druid is very useful in monitoring the input and output of models. Monitoring requires anomaly detection. To do this one most look back in time and detect patterns, compute statistics and real time and historical data and do all this in under 1 sec.
INSERT INTO "regression_demo"
WITH "ext" AS (SELECT *
FROM TABLE(
EXTERN(
'{"type":"local","baseDir":"<folder where dataset is unzipped>","filter":"20160930_203718.csv"}',
'{"type":"csv","findColumnsFromHeader":true}',
'[{"name":"Time (s)","type":"double"},{"name":"CO (ppm)","type":"long"},{"name":"Humidity (%r.h.)","type":"double"},{"name":"Temperature (C)","type":"double"},{"name":"Flow rate (mL/min)","type":"double"},{"name":"Heater voltage (V)","type":"double"},{"name":"R1 (MOhm)","type":"double"},{"name":"R2 (MOhm)","type":"double"},{"name":"R3 (MOhm)","type":"double"},{"name":"R4 (MOhm)","type":"double"},{"name":"R5 (MOhm)","type":"double"},{"name":"R6 (MOhm)","type":"double"},{"name":"R7 (MOhm)","type":"double"},{"name":"R8 (MOhm)","type":"double"},{"name":"R9 (MOhm)","type":"double"},{"name":"R10 (MOhm)","type":"double"},{"name":"R11 (MOhm)","type":"double"},{"name":"R12 (MOhm)","type":"double"},{"name":"R13 (MOhm)","type":"double"},{"name":"R14 (MOhm)","type":"double"}]'
)
))
SELECT
MILLIS_TO_TIMESTAMP(TIMESTAMP_TO_MILLIS(TIME_PARSE('2016-09-30T20:37:18Z'))+CAST (1000*("Time (s)") as INT)) __time,
"CO (ppm)",
"Humidity (%r.h.)",
"Temperature (C)",
"Flow rate (mL/min)",
"Heater voltage (V)",
"R1 (MOhm)",
"R2 (MOhm)",
"R3 (MOhm)",
"R4 (MOhm)",
"R5 (MOhm)",
"R6 (MOhm)",
"R7 (MOhm)",
"R8 (MOhm)",
"R9 (MOhm)",
"R10 (MOhm)",
"R11 (MOhm)",
"R12 (MOhm)",
"R13 (MOhm)",
"R14 (MOhm)"
FROM "ext"
PARTITIONED BY HOUR
Since each file has a title with day and time stamp, the time_floor expression for each file will change in the sql above. So ‘2016-09-30T20:37:18Z’ is for 20160930_203718.csv and ‘2016-10-01T23:18:09Z’ is for 20161001_231809.csv and so on. Install and configure Apache Superset.
Exploratory data analytics
Follow this post to link up superset with druid. Create a time series line chart using the regression_demo source in Druid.
add the CO (ppm) and choose sum as aggregation and create a line chart
add Humidity (%r.h) to get the below chart
Note that the correlation between humidity and CO is fairly consistent. So humidity would be a good measurement to use in predicting CO. Now add temperature and change the time grain to Hour.
Clearly temperature does not follow the CO concentration very closely with the Hourly grain. So I decided to drop temperature. With Druid it is easy to add or remove metrics and change the Time grain. Using a similar analysis I narrowed down the variables to
sum(“Humidity (%r.h.)”) AS Humidity, sum(“R1 (MOhm)”) AS R1, sum(“R3 (MOhm)”) AS R3, sum(“R4 (MOhm)”) R4, sum(“R14 (MOhm)”) R14
Training the model
Install Jupyter notebook. Download the notebook for this blog. This notebook uses pydruid to connect from python to Druid and use Sklearn to train a machine learning model (you need to install numpy and pandas along with sklearn and pydruid to make use of this notebook)
Import all the required libraries
from pydruid.client import *
from pydruid.utils.aggregators import *
from pydruid.db import connect
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.metrics import classification_report
from sklearn.linear_model import LogisticRegression
from sklearn import *
import random
import html
connect to Druid. Change the port and host to the port and host on which your Druid router is running
conn = connect(
host='localhost',
port=9999,
path='druid/v2/sql',
scheme='http')
curs = conn.cursor()
get data from Druid and load it into a data frame in python. Note that the below code extracts the selected metrics at an hourly grain from Druid
df = pd.DataFrame(curs.execute("""
SELECT time_floor(__time,'PT1H') AS t1,
sum("CO (ppm)") AS CO,
sum("Humidity (%r.h.)") AS Humidity,
sum("R1 (MOhm)") AS R1,
sum("R3 (MOhm)") AS R3,
sum("R4 (MOhm)") R4,
sum("R14 (MOhm)") R14
FROM "druid"."regression_demo"
GROUP BY 1
"""))
create feature and label vectors
X = df[['Humidity','R1','R3','R4','R14']]
y = df[['CO']]
create test/train split and train a knn regression model
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)
from sklearn.neighbors import KNeighborsRegressor
knnr = KNeighborsRegressor(n_neighbors = 50)
knnr.fit(X_train, y_train)
compute the mean square error
print ("The MSE is:",format(np.power(y_test-knnr.predict(X_test),2).mean()))
I started with an hourly grain which gave me a very large error (9.501070e+08) and then reduced the grain to second which resulted in a much smaller error (11.576547).
I further reduced the grain to 1 ms (the time grain of the raw data) and the error reduced to 8.891102. However with 1s grain the time to train the model was lesser than with the 1ms grain.
Conclusion
I trained a regression model using Druid as a source of training data. With Druid I was able to add and remove metrics and look at time series charts to identify metrics that could be used in the model. I was then able to train models using different time granularities to progressively reduce the error. This analysis is useful as we discovered that with 1s granularity the error is in the same ballpark as with 1ms granularity but the model training time is significantly lesser. This dataset had 3,547,481 rows. A production data lake will likely have billions of rows. Also there usually are 10s or 100s of data scientists training different models. Druid’s ability to handle adhoc aggregations under high concurrency enables models to be trained quickly. Bringing large amounts of data into python and aggregating slows down model training.
This post focussed on the ability to do adhoc aggregations in Druid with different metrics and different time granularities. In subsequent posts I will focus on some of the other capabilities