Kai Waehner <[email protected]> 18 Jan 2019
This script assumes that all components like Zookeeper, Kafka, Connect, KSQL, Jupyter) use default values.
We use the following test data (each row is one single payment):
Id bigint, Timestamp varchar, User varchar, Time int, V1 double, V2 double, V3 double, V4 double, V5 double, V6 double, V7 double, V8 double, V9 double, V10 double, V11 double, V12 double, V13 double, V14 double, V15 double, V16 double, V17 double, V18 double, V19 double, V20 double, V21 double, V22 double, V23 double, V24 double, V25 double, V26 double, V27 double, V28 double, Amount double, Class string
First we need to start a local Kafka ecosystem to use KSQL. This can be done in Jupyter or from your development environment or command line.
We also need to create some test data: Either start a data generator to create a continuous feed of streaming data, integrate with a file via a bash script, or use Kafka Connect for a real continuous data stream of any source data.
This is not part of the ML-related tasks, but just to get some test data into a Kafka topic:
// Start KSQL with Kafka and other dependencies
confluent start ksql-server
// Optional: Start Kafka Connect
confluent start connect
// Create Kafka topic
kafka-topics --zookeeper localhost:2181 --create --topic creditcardfraud_source --partitions 3 --replication-factor 1
// Produce several test messages at once:
cat /Users/kai.waehner/git-projects/python-jupyter-apache-kafka-ksql-tensorflow-keras/data/creditcard_extended.csv | kafka-console-producer --broker-list localhost:9092 --topic creditcardfraud_source
// Produce single test data manually
confluent produce creditcardfraud_source
1,"2018-12-18T12:00:00Z","Hans",0,-1.3598071336738,-0.0727811733098497,2.53634673796914,1.37815522427443,-0.338320769942518,0.462387777762292,0.239598554061257,0.0986979012610507,0.363786969611213,0.0907941719789316,-0.551599533260813,-0.617800855762348,-0.991389847235408,-0.311169353699879,1.46817697209427,-0.470400525259478,0.207971241929242,0.0257905801985591,0.403992960255733,0.251412098239705,-0.018306777944153,0.277837575558899,-0.110473910188767,0.0669280749146731,0.128539358273528,-0.189114843888824,0.133558376740387,-0.0210530534538215,149.62,"0"
// Keep last column empty => This is sending NULL => Gets filtered out as part of the preprocessing stream!
1,"2018-12-18T12:00:00Z","Hans",0,-1.3598071336738,-0.0727811733098497,2.53634673796914,1.37815522427443,-0.338320769942518,0.462387777762292,0.239598554061257,0.0986979012610507,0.363786969611213,0.0907941719789316,-0.551599533260813,-0.617800855762348,-0.991389847235408,-0.311169353699879,1.46817697209427,-0.470400525259478,0.207971241929242,0.0257905801985591,0.403992960255733,0.251412098239705,-0.018306777944153,0.277837575558899,-0.110473910188767,0.0669280749146731,0.128539358273528,-0.189114843888824,0.133558376740387,-0.0210530534538215,149.62,
As alternative, you can consume test data via Kafka Connect from a contiuous data stream. In this example - ironically - we read from a static CSV file 'creditcard_extended.csv' (like you would do it without Kafka / KSQL in a Jupyter ML notebook, but feel free to use a real continuous data source instead):
// Start File Connector to consume data from CSV file:
curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
"name" : "file-source",
"config" : {
"connector.class" : "org.apache.kafka.connect.file.FileStreamSourceConnector",
"tasks.max" : "1",
"file": "/Users/kai.waehner/git-projects/python-jupyter-apache-kafka-ksql-tensorflow-keras/data/creditcard_extended.csv",
"topic": "creditcardfraud_source",
"name": "file-source",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}'
confluent status file-source
curl -s -X DELETE localhost:8083/connectors/file-source
You can also use an easy-to-use Kafka data generator; either as standalone script or Kafka Connect connector. See the blog post "Easy Ways to Generate Test Data in Kafka" (https://www.confluent.io/blog/easy-ways-generate-test-data-kafka) for details and examples about the Kafka data generator.
Now go to the Jupyter Notebook 'python-jupyter-apache-kafka-ksql-tensorflow-keras.ipynb' (https://github.com/kaiwaehner/python-jupyter-apache-kafka-ksql-tensorflow-keras/blob/master/live-demo___python-jupyter-apache-kafka-ksql-tensorflow-keras.adoc) to do the preprocessing and interactive analysis with Python + KSQL, then the model training with Python + TensorFlow / Keras.
// Terminal
jupyter notebook
These KSQL statements can be used from KSQL CLI (not via Python API):
// KSQL-CLI
SELECT * FROM creditcardfraud_source;
KSQL commands (if you want to run it from KSQL CLI instead of using the Jupyter Notebook)
CREATE STREAM creditcardfraud_source (Id bigint, Timestamp varchar, User varchar, Time int, V1 double, V2 double, V3 double, V4 double, V5 double, V6 double, V7 double, V8 double, V9 double, V10 double, V11 double, V12 double, V13 double, V14 double, V15 double, V16 double, V17 double, V18 double, V19 double, V20 double, V21 double, V22 double, V23 double, V24 double, V25 double, V26 double, V27 double, V28 double, Amount double, Class string) WITH (kafka_topic='creditcardfraud_source', value_format='DELIMITED', KEY='Id');
// Preprocessed KSQL Stream:
// Filter columns
// Filter messages where class is empty
// Change data format to Avro
CREATE STREAM creditcardfraud_preprocessed_avro WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='creditcardfraud_preprocessed_avro') AS SELECT Time, V1 , V2 , V3 , V4 , V5 , V6 , V7 , V8 , V9 , V10 , V11 , V12 , V13 , V14 , V15 , V16 , V17 , V18 , V19 , V20 , V21 , V22 , V23 , V24 , V25 , V26 , V27 , V28 , Amount , Class FROM creditcardfraud_source WHERE Class IS NOT NULL;
// Terminal
confluent consume creditcardfraud_source --from-beginning
You can also oconsume the data outside of Jupyter (for instance helpful if you need to find out if a problem is due to Python issues or Kafka / KSQL issues):
// Terminal
confluent consume creditcardfraud_source --from-beginning
Some more examples how to use KSQL to preprocess and analyse the source data:
-
Anonymization
-
Augmentation
-
Merge / Join data frames
// Anonymization
SELECT Id, MASK_LEFT(User, 2) FROM creditcardfraud_source;
// Augmentation
SELECT Id, IFNULL(Class, \'-1\') FROM creditcardfraud_source;
// STREAM-TABLE-JOIN
kafka-topics --zookeeper localhost:2181 --create --topic users --partitions 3 --replication-factor 1
CREATE TABLE users (userid varchar, gender varchar, regionid varchar) WITH (kafka_topic='users', value_format='AVRO', key = 'userid');
CREATE STREAM creditcardfraud_per_user WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='creditcardfraud_per_user') AS SELECT Time, Amount, Class FROM creditcardfraud_source c INNER JOIN USERS u on c.user = u.userid WHERE u.USERID = 1;
Further KSQL statements for analysis and preprocessing (not integrated into Jupyter and the demo yet)
// kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic creditcardfraud --from-beginning
// confluent consume creditcardfraud --value-format avro --from-beginning
//KSQL
// shows dropping columns (e.g. Timestamp, ID, user etc)
// shows dropping rows (wher V1 is greater than 5 and V2 isnot null and usernae starts with Kai
// also switch to Avro & illustrate using bespoke kafka topic name
CREATE STREAM creditcardfraud WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='creditcardfraud_preprocessed_avro') AS SELECT Time, V1 , V2 , V3 , V4 , V5 , V6 , V7 , V8 , V9 , V10 , V11 , V12 , V13 , V14 , V15 , V16 , V17 , V18 , V19 , V20 , V21 , V22 , V23 , V24 , V25 , V26 , V27 , V28 , Amount , Class FROM creditcardfraud_enahnced c INNER JOIN USERS u on c.userid = u.userid WHERE V1 > 5 AND V2 IS NOT NULL AND u.CITY LIKE 'Premium%';
// DESCRIBE creditcardfraud;
// ^ show the schema
// DESCRIBE EXTENDED creditcardfraud;
// ^ show the schema and underlying query and the nuebr of msgs processed -> this is an app we've built and it's continually running
// Create a delimited version of this stream
// Now app that *needs* csv gets it but other users of the data benefit from the explictly decalred schema and dont' have to type it out each time
CREATE STREAM creditcardfraud_csv WITH (VALUE_FORMAT='DELIMITED') AS SELECT * FROM creditcardfraud
// KSQL => Extended CSV
Add column to:
SELECT 'hsbc.csv' AS SOURCE_FILE, * FROM creditcardfraud;
Remove NAs / No values
SELECT * FROM creditcardfraud WHERE V1 IS NOT NULL;
SELECT * FROM creditcardfraud WHERE (V1 IS NOT NULL AND V2 IS NOT NULL);
Restrict date range
// there isn't <NOW> function
// NOW - 1HOUR doesn't exist :(
// i.. you have to hard code the epoch
SELECT * FROM credicardfraud WHERE ROWTIME > {epoch value}
Timestamp handling
// See ATM fraud slides for illustration Slides: https://speakerdeck.com/rmoff/atm-fraud-detection-with-kafka-and-ksql
Code: https://github.com/confluentinc/demo-scene/blob/master/ksql-atm-fraud-detection/ksql-atm-fraud-detection-README.adoc
// this changes the way KSQL parses the timestamp of the message and uses a timestamp col from the payload - very important for time-based aggregations & time-based joins (e.g. stream-stream windowing)
CREATE STREAM credicardfraud … WITH (TIMESTAMP='timestamp_col',TIMESTAMP_FORMAT='YYYY etc')
ROWTIME then inherits tiemstamp_col _not_ kafka timestamp
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss Z'), ROWTIME , timestamp_col from creditcardfraud limit 1;
// or you can leave the timestamp of the mesasage alone and just filter as required
// useful for standard data prep & filtering
SELECT * FROM creditcardfraud where STRINGTOTIMESTAMP(timestamp_col,'YYYY etc') > {epoch value}
Drop column / row
// drop row -> WHERE clause
// Concatenate
SELECT COL1 + COL2 AS NEW_COL FROM MY_STREAM;
SELECT CAST(COL1 AS VARCHAR) + CAST(COL2 AS VARCHAR) FROM MY_STREAM;
SELECT COL1 || ': static value : ' || COL2 AS NEW_COL // not sure if this is still supported
SELECT CONCAT(COL1,COL2) // SQL users might expect it but it's ugly
// splitting a col - can't be done
// there is no INSTR/INDEXOF, there's no SPLIT
// SELECT SUBSTRING(FULL_NAME,1,INDEXOF(FULL_NAME,' '))
// -> please go and upvote these on github
SELECT SUBSTR(FULL_NAME, 1,5) FROM MY_STREAM
// COALLESCE / CASE are the other huge missing ones
https://github.com/confluentinc/ksql/issues/620
// Merge / Join data frames
// e.g. two sources of data with the same structure
CREATE STREAM website_source (SAME SCHEMA) (WITH KAFAK_TOPIC='from website')
CREATE STREAM api_source (SAME SCHEMA) (WITH KAFAK_TOPIC='api')
// also different geos etc
CREATE STREAM UNIFIED AS SELECT 'website' AS SOURCE, * FROM WEBSITE_SOURCE;
INSERT INTO UNIFIED AS SELECT 'api' AS SOURCE, * FROM API_SOURCE;
// Single resultig stream (-> kafka topic) but continually popualted with data from BOTH sources
// basically UNION of data sets
What else?
CREATE STREAM creditcardfraud (Id bigint, Time int, V1 double, V2 double, V3 double, V4 double, V5 double, V6 double, V7 double, V8 double, V9 double, V10 double, V11 double, V12 double, V13 double, V14 double, V15 double, V16 double, V17 double, V18 double, V19 double, V20 double, V21 double, V22 double, V23 double, V24 double, V25 double, V26 double, V27 double, V28 double, Amount double, Class string) WITH (kafka_topic='creditcardfraud', value_format='DELIMITED');
describe creditcardfraud;
SET 'auto.offset.reset'='earliest';
select * from creditcardfraud;
select TIME, V1, V2, AMOUNT, CLASS FROM creditcardfraud;
java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord
// TODO Start data generator (continuous flow of data instead of CSV file)
// TODO Use Kafka Connect Datagen for this: Kafka Connect Datagen
// TODO Create / fix creditcardtransactions.avro file
ksql-datagen quickstart=users format=json topic=users maxInterval=1000 propertiesFile=etc/ksql/datagen.properties
To open the Jupyter notebook, go to the folder where the '.ipynb' files are. Then:
// Open Jupyter and select the notebook 'python-jupyter-apache-kafka-ksql-tensorflow-keras.ipynb'
jupyter notebook
Some common commands for Jupyter, pip, conda to manage Python packages like ksql-python:
conda info
conda create --name ksql-python python=3.4 tensorflow ksql
conda info --envs
// Add to .bash_profile
source activate ksql-python
// Add Python packages
conda install --name ksql-python tensorflow numpy pandas keras seaborn matplotlib scipy scikit-learn
conda remove -n ksql scipy
conda install -n ksql-python pip
pip info
pip install ksql
pip install pickle
tensorboard --logdir logs
tensorboard --logdir=logs/keras-fraud