05.10

2015

Apache-Spark – postawienie Sparka + postawienie Cassandry + użycie Sparka z Cassandrą

Autor: artur

Dzisiaj gościnny wpis Marty Karaś dotyczący Big Data. W języku angielskim, żeby było łatwiej.

Mam nadzieję, że to będzie początek cyklu artykułów z tego tematu.

Narzędzia do pracy z Big Data rozwijają się bardzo dynamicznie. Wpis trochę czekał na publikację i teraz mamy Sparka 1.5.1. W wolnej chwili zaktualizujemy wpis.

Apache Spark to narzędzie ,,nowej generacji” do przetwarzania Big Data, które w ogromnym tempie zdobywa popularność na całym świecie. Warto zainteresować się Sparkiem z co najmniej kilku powodów:

  1. Jest szybki. Spark umożliwia wykonywanie złożonych operacji szybciej, niż przy użyciu popularnego modelu MapReduce. Przykładowo, przy użyciu Sparka możemy wykonywać zapytania na Big Data interaktywnie (nie musimy czekać długich minut na wynik).

  2. Spark obejmuje szeroki zakres zadań, które wcześniej wymagały oddzielnych systemów rozproszonych. Są to m.in.: aplikacje wsadowe, interacyjne algorytmy, zapytania interaktywne i streaming. Poprzez wspieranie tych zadań w ramach jednego silnika, Spark umożliwia łatwe i tanie połączenie różnego typu procesów przetwarzania danych.

  3. Spark z założenia ma być łatwo dostępny. Spark udostępnia proste API w Pythonie, Javie i Scali, a także sam dostarcza zasobne biblioteki wbudowane. Integruje się także ściśle z innymi narzędziami Big Data. W szczególności, Spark może pracować na klastrach Hadoopowych, a także wykorzystywać róźne inne źródła danych (np. lokalny filesystem, baza danych Cassandra).

Poniższa notatka zawiera instrukcję, jak zainstalować Apache Spark 1.2.0 pod Ubuntu 14.04. Notatka stanowi jednocześnie Proof of Concept połączenia Sparka z bazą danych Cassandra 1.2.0 jako źródłem danych, stąd zawiera też instrukcję postawienia bazy oraz użycia biblioteki, budującej to połączenie.

Table of Contents

  1. Introduction
  2. Installation of the Apache Cassandra (single-node cluster)
    1. Run Cassandra
    2. Query Cassandra
    3. Stop Cassandra
  3. Installation of the Apache Spark
    1. Scala installation
    2. Spark installation
  4. Using the Spark Cassandra Connector
    1. Downloading and building
    2. Running spark-shell with spark-cassandra-connector jar

Introduction

This note shows steps for launching Apache Spark 1.2.0 framework on Ubuntu 14.04, with Apache Cassandra 2.1.4 database as a possible data source. To make use of Cassandra, the Spark Cassandra Connector 1.2.0 library (by DataStax) is used.

The steps include:

  1. installation of the Apache Cassandra 2.1.4 database (single-node cluster),
  2. examples of querying Cassandra, including loading and saving data to/from the database,
  3. installation of the Apache Spark 1.2.0 framework,
  4. installation of the spark-cassandra-connector 1.2.0 library,
  5. example of playing on Spark with Cassandra, with the use of the spark-cassandra-connector library.

Remark. The instructions below give a minimum of what is needed to launch that environment. I am taking no notice of some configuration aspects of the installation unless it is necessary to have a basic operations run. At the same time, I try to supply references and sources for those of you who are more interested in those fields.

In general, I’ve got inspired by the Installing the Cassandra/Spark OSS Stack post, written by Al Tobey from DataStax.

Installation of the Apache Cassandra (single-node cluster)

In this part we are going to download and run the Apache Cassandra 2.1.4 database. It is going to be a singe-node setup (we will set our database locally – on a single machine).

Move to tmp directory. Download apache-cassandra-2.1.4-bin files and unpack them into the ~/cassandra directory. (Here, it is made via wget function but you can download the files directly from the Cassandra download page).

cd /tmp
wget https://archive.apache.org/dist/cassandra/2.1.4/apache-cassandra-2.1.4-bin.tar.gz
sudo mkdir ~/cassandra
sudo tar xvf apache-cassandra-2.1.4-bin.tar.gz -C ~/cassandra

Cassandra makes use of log directories. Create these dirs and assure the Ubuntu user has access to write in them.

sudo mkdir /var/lib/cassandra
sudo mkdir /var/log/cassandra
sudo chown -R $USER:$GROUP /var/lib/cassandra
sudo chown -R $USER:$GROUP /var/log/cassandra

Export Cassandra-related variables. (Note that we assume the downloaded files are located in the ~/cassandra directory, as presented above.)

export CASSANDRA_HOME=~/cassandra
export PATH=$PATH:$CASSANDRA_HOME/bin

For further informations about Apache Cassandra, refer to Cassandra version 2.1 documentation.

Run Cassandra

To run Cassandra, type:

sudo sh ~/cassandra/apache-cassandra-2.1.4/bin/cassandra

The output should be something like this:

CompilerOracle: inline org/apache/cassandra/db/AbstractNativeCell.compareTo (Lorg/apache/cassandra/db/composites/Composite;)I
CompilerOracle: inline org/apache/cassandra/db/composites/AbstractSimpleCellNameType.compareUnsigned (Lorg/apache/cassandra/db/composites/Composite;Lorg/apache/cassandra/db/composites/Composite;)I
(...)
INFO  17:22:24 Compacting [SSTableReader(path='/home/martakarass/cassandra/apache-cassandra-2.1.4/bin/../data/data/system/local-7ad54392bcdd35a684174e047860b377/system-local-ka-1-Data.db'), SSTableReader(path='/home/martakarass/cassandra/apache-cassandra-2.1.4/bin/../data/data/system/local-7ad54392bcdd35a684174e047860b377/system-local-ka-3-Data.db'), SSTableReader(path='/home/martakarass/cassandra/apache-cassandra-2.1.4/bin/../data/data/system/local-7ad54392bcdd35a684174e047860b377/system-local-ka-2-Data.db'), SSTableReader(path='/home/martakarass/cassandra/apache-cassandra-2.1.4/bin/../data/data/system/local-7ad54392bcdd35a684174e047860b377/system-local-ka-4-Data.db')]
INFO  17:22:25 Node localhost/127.0.0.1 state jump to normal

Press Enter. Our database is running and we can query it.

Query Cassandra

Cassandra Query Language (CQL) is the default interface into the Cassandra Database Management System (DBMS). Using CQL is similar to using SQL. A few basic queries are shown below. For more information, refer to the CQL documentation.

To start sqlsh, the CQL interactive terminal, type:

~/cassandra/apache-cassandra-2.1.4/bin/cqlsh

It should result in the console ready to be used:

Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 2.1.4 | CQL spec 3.2.0 | Native protocol v3]
Use HELP for help.
cqlsh>
KEYSPACE

We start with creating a keyspace. Keyspace is the container for your application data, similar to a schema in a relational database. Keyspaces are used to group column families together.

Below we have a few examples of creating a keyspace with different parameters (for more details, go to the keyspace documentation).

CREATE KEYSPACE example_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
CREATE KEYSPACE Risky WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 3 } AND DURABLE_WRITES = false;
CREATE KEYSPACE Excalibur WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'dc1' : 3, 'dc2' : 2};
CREATE KEYSPACE NTSkeyspace WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1 };

Let’s list keyspaces we have created.

SELECT * FROM system.schema_keyspaces;
 keyspace_name    | durable_writes | strategy_class                                       | strategy_options
------------------+----------------+------------------------------------------------------+----------------------------
 example_keyspace |           True |          org.apache.cassandra.locator.SimpleStrategy | {"replication_factor":"1"}
      ntskeyspace |           True | org.apache.cassandra.locator.NetworkTopologyStrategy |        {"datacenter1":"1"}
            risky |          False | org.apache.cassandra.locator.NetworkTopologyStrategy |        {"datacenter1":"3"}
           system |           True |           org.apache.cassandra.locator.LocalStrategy |                         {}
        excalibur |           True | org.apache.cassandra.locator.NetworkTopologyStrategy |      {"dc2":"2","dc1":"3"}
    system_traces |           True |          org.apache.cassandra.locator.SimpleStrategy | {"replication_factor":"2"}
 
(6 rows)

Then we define which keyspace we are using from now.

USE example_keyspace;
TABLE

Let’s create a simple table and insert some data into it.

CREATE TABLE users 
(
  KEY varchar PRIMARY KEY,
  password varchar,
  gender varchar,
  session_token varchar,
  state varchar,
  birth_year bigint
);
INSERT INTO users (KEY, password) VALUES ('jsmith', 'ch@ngem3a');
INSERT INTO users (KEY, gender, password) VALUES ('jessie', 'f', 'avlrenfls');
INSERT INTO users (KEY, gender, password) VALUES ('kate', 'f', '897q7rggg');
INSERT INTO users (KEY, gender, password) VALUES ('mike', 'm', 'mike123');

Selecting all values results in 4 rows being displayed:

SELECT * FROM users;
 key    | birth_year | gender | password  | session_token | state
--------+------------+--------+-----------+---------------+-------
   kate |       null |      f | 897q7rggg |          null |  null
 jessie |       null |      f | avlrenfls |          null |  null
   mike |       null |      m |   mike123 |          null |  null
 jsmith |       null |   null | ch@ngem3a |          null |  null

Please note a table was previously known as columnfamily (as explained in this StackOverflow question).

INDEXING AND FILTERING

To filter table rows, we need to index a table firstly.

CREATE INDEX idx_gender ON users (gender);
SELECT * FROM users WHERE gender='f';
 key    | birth_year | gender | password  | session_token | state
--------+------------+--------+-----------+---------------+-------
   kate |       null |      f | 897q7rggg |          null |  null
 jessie |       null |      f | avlrenfls |          null |  null
 
(2 rows)

If you want to filter rows with the use of multiple indexes, Cassandra requires the ALLOW FILTERING directive:

CREATE INDEX idx_password ON users (password);
SELECT * FROM users WHERE gender='f' AND password ='avlrenfls' ALLOW FILTERING;

If we want to drop a table, it is important to start with dropping any index we have set on that particular table. Otherwise some index corrputions may occur (compare with this question). Thus, dropping users would look like:

DROP INDEX IF EXISTS idx_gender; 
DROP INDEX IF EXISTS idx_password;
DROP TABLE IF EXISTS users;

(but refrain from dropping it now =] we will use users shortly!)

When it comes to filtering rows content, you may be interested in reading more about retrieving and sorting results and slicing over partition rows.

COPYING DATA FROM TABLE TO CSV FILE

To copy table content to a CSV file, type:

COPY users (key, gender, password) TO '/home/martakarass/Documents/users_dump.csv';
3 rows exported in 0.005 seconds.
COPYING DATA FROM CSV FILE TO TABLE

Similarly, we can copy data from a CSV file to a table in the Cassandra database. Let’s use a sample of simulated invoices data, containing 1031 rows. The head of invoices.csv data sheet looks like the following:

2/1/2015,Product_15702,Client_100,6,123.41960784000
1/9/2015,Product_15715,Client_100,3,44.94048390000
1/9/2015,Product_15715,Client_100,39,414.58094748000

We have to remember that an unique PRIMARY KEY has to be specified. To specify it, we insert an unique value (here: simply a row number) and a separator (here: a comma) at the beginning of each line in the file. Rumor has it that vim makes it well:

vim /home/martakarass/Documents/invoices.csv 
:%s/^/\=line('.').','/
!wq

Now we have data sheet which starts with:

1,2/1/2015,Product_15702,Client_100,6,123.41960784000
2,1/9/2015,Product_15715,Client_100,3,44.94048390000
3,1/9/2015,Product_15715,Client_100,39,414.58094748000

In the next step, we create an empty table in the CQL console:

CREATE TABLE invoices (
  id int PRIMARY KEY,
  date text,
  product_id text,
  customer_id text,
  quantity int,
  sales float
);

and perform acutal copying into that table:

COPY invoices (id, date, product_id, customer_id, quantity, sales) 
FROM '/home/martakarass/Documents/invoices.csv' 
WITH DELIMITER=',' AND HEADER=false;
1043 rows imported in 0.614 seconds.
EXITING CONSOLE

To exit from CQL console, type:

exit;

Stop Cassandra

The way I stop Cassandra from running is to find the Cassandra-related process ID (a pid value) and to kill the process.

ps auwx | grep cassandra
root      3271  4.0 28.2 3848524 2267484 pts/11 SLl 19:22   0:24 java -ea -javaagent:/home/martakarass/cassandra/apache-cassandra-2.1.4/bin/../lib/jamm-0.3.0.jar -XX:+CMSClassUnloadingEnabled -XX:+UseThreadPriorities -XX:ThreadPriorityPolicy=42 -Xms1960M -Xmx1960M -Xmn490M -XX:+HeapDumpOnOutOfMemoryError -Xss280k -XX:StringTableSize=1000003 -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -X
(...)
sudo kill 3271

Installation of the Apache Spark

Scala installation

To install the Apache Spark framework, we firstly have to have Scala of an appropriate version installed. According to the Apache Spark 1.2.0 documentation, Spark 1.2.0 uses Scala 2.10. You will need to use a compatible Scala version (2.10.x).

The steps below are inspired by the Install Apache Spark on Ubuntu-14.04 post by Prabeesh K.

Download 2.10.x maintenance release (here: Scala 2.10.4) and unpack it.

cd /tmp
wget http://www.scala-lang.org/files/archive/scala-2.10.4.tgz
sudo mkdir /usr/local/src/scala
sudo tar xvf scala-2.10.4.tgz -C /usr/local/src/scala/

Then edit .bashrc file, e.g. in vim:

sudo vim ~/.bashrc

by adding the following content at the end of the file:

export SCALA_HOME=/usr/local/src/scala/scala-2.10.4
export PATH=$SCALA_HOME/bin:$PATH

Restart .bashrc

cd
 . .bashrc

To check if Scala is properly installed, type:

scala -version
Scala code runner version 2.10.4 -- Copyright 2002-2013, LAMP/EPFL

To go to Scala interactive shell, type:

scala
Welcome to Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_31).
Type in expressions to have them evaluated.
Type :help for more information.
 
scala>

Spark installation

Remark. I have chosen 1.2.0 version of Spark because the examples from Advanced Analytics with Spark. Patterns for Learning from Data at Scale are presented with the use of Spark 1.2.0 version.

Download Spark distribution of version 1.2.0 (you may want to do it manually from here). Unpack the files and move to their dedicated directory.

cd /tmp
wget http://d3kbcqa49mib13.cloudfront.net/spark-1.2.0.tgz
sudo mkdir /usr/local/src/spark/
sudo tar xvf spark-1.2.0.tgz -C /usr/local/src/spark/
BUILDING

SBT (Simple Build Tool) is used for building Spark, which is bundled with it. To compile the code, type:

cd /usr/local/src/spark/spark-1.2.0
sbt/sbt assembly

Building takes some time. When the process is completed, you may see something like:

(...)
[warn] Strategy 'first' was applied to 2175 files
[info] SHA-1: 8b22235f78ab7ab174d6743bb2c0bf7771ec88a7
[info] Packaging /usr/local/src/spark/spark-1.2.0/examples/target/scala-2.10/spark-examples-1.2.0-hadoop1.0.4.jar ...
[info] Done packaging.
[success] Total time: 234 s, completed Apr 6, 2015 9:06:40 PM
SIMPLE APPLICATION RUN EXAMPLE

After building, we can run a sample program which estimates a pi value:

./bin/run-example SparkPi 10
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
15/04/03 02:27:21 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
(...)
15/04/06 21:08:33 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:35, took 2.191087 s
Pi is roughly 3.141984
15/04/03 02:27:31 INFO SparkUI: Stopped Spark web UI at http://marta-komputer.home:4040
(...)
INTERACTIVE SHELL USAGE EXAMPLE

You can run Spark interactively through the Scala shell

./bin/spark-shell
scala>

Let’s run a simple example

val textFile = sc.textFile("README.md")
textFile.count()
res0: Long = 127

Using the Spark Cassandra Connector

The Spark Cassandra Connector is a library by DataStax that enables cluster computing with Spark and Cassandra.

According to the datastax/spark-cassandra-connector README.md, this library lets you expose Cassandra tables as Spark RDDs, write Spark RDDs to Cassandra tables, and execute arbitrary CQL queries in your Spark applications. Accoring to the compatibility table from README.md, we need Connector of version 1.2 (as we have installed Spark 1.2.0 and Cassandra 2.1.4).

At the moment of writting this note, the release version of Spark Cassandra Connector 1.2 was not ready. I use the Release 1.2.0 RC 3 version, which is a release candidate.

Downloading and building

Download the Release 1.2.0 RC 3 version of spark-cassandra-connector library directly from Release 1.2.0 RC 3 page (choose tar.gz file). Below, we assume the downloaded files are located in ~/Downloads directory.

cd /Downloads
sudo tar xvf spark-cassandra-connector-1.2.0-rc3.tar.gz -C /usr/local/src/spark/spark-1.2.0

To perform building, type:

cd /usr/local/src/spark/spark-1.2.0/spark-cassandra-connector-1.2.0-rc3
sudo ./sbt/sbt assembly

Building takes time (here: almost 10 minutes). A successful building should finish with something like:

(...)
[info] Packaging /usr/local/src/spark/spark-1.2.0/spark-cassandra-connector-1.2.0-rc3/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.2.0-rc3.jar ...
[info] Done packaging.
[success] Total time: 541 s, completed Apr 6, 2015 10:19:48 PM

Running spark-shell with spark-cassandra-connector jar

According to Installing the Cassandra / Spark OSS Stack post, the easiest way to get spark-shell pick the jar up is to set the jar’s classpath while launching the spark-shell:

/usr/local/src/spark/spark-1.2.0/bin/spark-shell --jars /usr/local/src/spark/spark-1.2.0/spark-cassandra-connector-1.2.0-rc3/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.2.0-rc3.jar
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/04/06 23:40:23 INFO SecurityManager: Changing view acls to: martakarass
(...)
15/04/06 23:40:27 INFO SparkILoop: Created spark context..
Spark context available as sc.
 
scala>
MAKING CONNECTION TO CASSANDRA FROM SPARK

Firstly, we stop the current spark context:

sc.stop

The command above will print some log information and present scala> prompt (if not, press Enter and scala> prompt should appear).

Then we import the Connector

import com.datastax.spark._
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import org.apache.spark.{SparkConf, SparkContext, Logging}

and define new spark context, with connection to Cassandra defined. (As mentioned above, the Cassandra is being run on localhost; in my case its address is: 127.0.0.1).

val conf = new SparkConf(true).set("spark.cassandra.connection.host", "127.0.0.1")
val sc = new SparkContext("local[*]", "test", conf)
LOADING AND ANALYZING DATA FROM CASSANDRA

To define an RDD object from a Cassandra table, we use:

  • keyspace name (here: example_keyspace)
  • table name (here: invoices)
val rdd = sc.cassandraTable("example_keyspace", "invoices")

Let’s perform a few basic operations on newly created RDD object.

We can count rows in the table:

println(rdd.count)
1043

To see first row of the table, type:

println(rdd.first)
CassandraRow{id: 886, customer_id: Client_1013, date: 1/9/2015, product_id: Product_7705, quantity: 3, sales: 14.858835}

To sum sales column value, type:

println(rdd.map(_.getInt("sales")).sum)
267749.0
SAVING DATA FROM RDD TO CASSANDRA TABLE

According to the documentation, it is possible to save an RDD to an existing Cassandra table as well as to let the connector create appropriate table automatically based on the definition of the RDD item class.

However, there is a known bug of 1.2.0.rc3 version of the connector because of which I was not able to perform saving data to the Cassandra. The bug is fixed in the 1.2.0.rc4 version which is not avaliable yet (see avaliable releases ). The note will be updated after the 1.2.0.rc4 version release.

Spróbuj ponownie