Pages

Monday, August 20, 2018

Spark, Dataframes, PostgreSQL

Spark is one of the most successful projects at Apache with one of the most popular skills for Big data engineers and a lot of companies look out for this specific skills while hiring.

Spark is a distributed computing software where it can employ multiple machines (cluster) which means you can scale horizontally (scale out) by adding more and more computers instead of having to buy/rent computers with higher CPU and Memory (scaling vertically/ scaling up ).

Setting up (Standalone Mode)
  1. brew install apache-spark
  2. Run master : /usr/local/Cellar/apache-spark/2.3.1/bin/spark-class org.apache.spark.deploy.master.Master
  3. Run Slave(s) : /usr/local/Cellar/apache-spark/2.3.1/bin/spark-class org.apache.spark.deploy.worker.Worker  spark://:7077 -c 1 -m 512M 
    • you will get the master url in the console output after running step 1 and you can run slaves either in another terminal or on another computer which is connected to the same network.
  1. Run example on master : /usr/local/Cellar/apache-spark/2.3.1/bin/run-example SparkPi
You can run a program by submitting the jar file of the code to spark

usr/local/Cellar/apache-spark/2.3.1/bin/spark-submit —class WordCountTask  --master local[2] /Users/dev/spark/spark-example/target/first-example-1.0-SNAPSHOT.jar /Users/dev/spark/spark-example/src/test/resources/loremipsum.txt /Users/dev/spark/spark-example/src/test/resources/

Primarily you can deploy scala or python code in a spark cluster. You can get an interactive shell to try out bits of code and for production, the code is generally shipped (submitting spark job) to the cluster.

If you are already familiar with Python dataframes, spark data frames are very similar, you can slide and dice two dimentional data using dataframes.

One can directly load the CSV file into a spark dataframe or just can load CSV file into PostgreSQL from where data frame can be loaded using SQL queries.
COPY MyTable FROM '/Users/myUser/myFile.csv' DELIMITER ',' CSV HEADER;
or
\copy  MyTable FROM '/usr/myFile.csv'  DELIMITER ',' CSV HEADER;
or
CREATE EXTERNAL TABLE IF NOT EXISTS MyTable(
itemId STRING,
itemCount INT )
COMMENT 'MyTable'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/user/myFile'
  TBLPROPERTIES('skip.header.line.count'='1');
To deal with PostgreSQL you would need the driver library in the cluster. So you can mention the library while starting the spark shell.
pyspark --conf spark.executor.extraClassPath=/home/sshuser/postgresql-42.2.4.jar  --driver-class-path /home/sshuser/postgresql-42.2.4.jar --jars /home/sshuser/postgresql-42.2.4.jar
If you want to start a spark shell on your local computer but if the master is running somewhere else
pyspark --conf spark.executor.extraClassPath= --driver-class-path --jars --master
For example
pyspark --conf spark.executor.extraClassPath=/Users/postgresql-42.2.4.jar  --driver-class-path /Users/postgresql-42.2.4.jar  --master spark://192.168.1.199:7077 --executor-memory 512m
 Although PostgreSQL is RDBMS, you can always use JSON data structure to store variable schema.
create table items_var(id VARCHAR(100), attributes jsonb);
insert into items_var values('id1', '{"color": "Red", "style": "stripe"}');
select attributes -> 'color' from items_var;
select attributes @> '{"color":"Red"}' from items_var;
select id, attributes  from items_var where attributes @> '{"color":"Red"}';
Now its time to load the data to dataframe from SQL using spark.

Load Data from PostgreSQL to dataframe
df = spark.read \
    .format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", "jdbc:postgresql:dbName") \
    .option("dbtable", "MyTable") \
    .option("user", "myUser") \
    .option("password", "") \
    .load()
Write data to PostgreSQL
df.write \
    .format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", "jdbc:postgresql:dbName") \
    .option("dbtable", "MyTable") \
    .option("user", "myUser") \
    .option("password", "")
Alternatively
mode = "overwrite"
url = "jdbc:postgresql:dbName"
properties = {"user": "myUser","password": "","driver": "org.postgresql.Driver"}
df1.write.jdbc(url=url, table="myTable", mode=mode, properties=properties)

Perform aggregations
df.groupBy("itemid").count()
    .filter("count" >= 2)
    .show()
 Most of the time in a production environment you would want to do all these programmatically e.g. from a java based microservice. Check out this GitHub repository has some sample code for the same.

You can also configure such that pyspark command launches Jupytr notebook and you can interactively run spark commands.