Practical Apache Spark in 10 minutes (Data Frames)

Practical Apache Spark in 10 minutes (Data Frames)

A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. In this article we consider the basics of working with Data Frame

First we will consider the basics of working with RDD

Create a simple RDD from list of smiles using SparkContext’s parallelize() method.

smiles = ['happy', 'sad', 'grinning', 'happy', 'confused', 'kissing', 'kissing', 'sad', 'grinning', 'happy', 'happy', 'happy']
data = sc.parallelize(smiles)

Create a function to add information to each smile relatively the mood: positive or negative.

def sentiments(smile):
    if smile == 'happy' or smile == 'grinning' or smile == 'kissing':
        sent = 'positive'
    else:
        sent = 'negative'
    return (sent, smile)

Apply the function sentiments to rdd data and views first 5 elements

data_tuples = data.map(sentiments)
print data_tuples.take(5)
[('positive', 'happy'), ('negative', 'sad'), ('positive', 'grinning'), ('positive', 'happy'), ('negative', 'confused')]

Use groupByKey() for grouping by key and map for counting the number of positive and negative smiles.

data_grouped = data_tuples.groupByKey().map(lambda x: (x[0], len(x[1])))
print data_grouped.collect()
[('positive', 9), ('negative', 3)]

We now consider working with Data Frames. To demonstrate operations with data frame we will take dataset "California home prices".

Import the necessary modules to work and PySpark pandas DataFrame

from  pyspark import sql
from pyspark.sql.types import *
from pyspark.sql import Row
import pandas as pd

Next read dataset from text file.

rdd = sc.textFile('Sacramentorealestatetransactions.csv')
rdd.take(3)
[u'street,city,zip,state,beds,baths,sq__ft,type,sale_date,price,latitude,longitude',
 u'3526 HIGH ST,SACRAMENTO,95838,CA,2,1,836,Residential,Wed May 21 00:00:00 EDT 2008,59222,38.631913,-121.434879',
 u'51 OMAHA CT,SACRAMENTO,95823,CA,3,1,1167,Residential,Wed May 21 00:00:00 EDT 2008,68212,38.478902,-121.431028']

Split each line by comma by using map()

rdd = rdd.map(lambda line: line.split(","))
print rdd.take(2)
[ [u'street', u'city', u'zip', u'state', u'beds', u'baths', u'sq__ft', u'type', u'sale_date', u'price', u'latitude', u'longitude'], [u'3526 HIGH ST', u'SACRAMENTO', u'95838', u'CA', u'2', u'1', u'836', u'Residential', u'Wed May 21 00:00:00 EDT 2008', u'59222', u'38.631913', u'-121.434879'] ]

Drop the header from data with filter() transformation

header = rdd.first()
rdd= rdd.filter(lambda line:line != header)
print rdd.take(2)
[ [u'3526 HIGH ST', u'SACRAMENTO', u'95838', u'CA', u'2', u'1', u'836', u'Residential', u'Wed May 21 00:00:00 EDT 2008', u'59222', u'38.631913', u'-121.434879'], [u'51 OMAHA CT', u'SACRAMENTO', u'95823', u'CA', u'3', u'1', u'1167', u'Residential', u'Wed May 21 00:00:00 EDT 2008', u'68212', u'38.478902', u'-121.431028'] ]

We have prepared the data and can now start working with DataFrames. Let’s create Data Frame from the data and see the first five rows.

df = rdd.map(lambda line: Row(street = line[0], city = line[1], zip=line[2], beds=line[4], baths=line[5], sqft=line[6], price=line[9])).toDF()
df.show(5)
+-----+----+----------+-----+----+----------------+-----+
|baths|beds|      city|price|sqft|          street|  zip|
+-----+----+----------+-----+----+----------------+-----+
|    1|   2|SACRAMENTO|59222| 836|    3526 HIGH ST|95838|
|    1|   3|SACRAMENTO|68212|1167|     51 OMAHA CT|95823|
|    1|   2|SACRAMENTO|68880| 796|  2796 BRANCH ST|95815|
|    1|   2|SACRAMENTO|69307| 852|2805 JANETTE WAY|95815|
|    1|   2|SACRAMENTO|81900| 797| 6001 MCMAHON DR|95824|
+-----+----+----------+-----+----+----------------+-----+
only showing top 5 rows

To view the structure of our DataFrame we can use the method printSchema ()

df.printSchema()
root
 |-- baths: string (nullable = true)
 |-- beds: string (nullable = true)
 |-- city: string (nullable = true)
 |-- price: string (nullable = true)
 |-- sqft: string (nullable = true)
 |-- street: string (nullable = true)
 |-- zip: string (nullable = true)

Get 2 row wich have value less than 5000 prices

df.filter(df.price < 5000).show(2)
+-----+----+-------+-----+----+------------------+-----+
|baths|beds|   city|price|sqft|            street|  zip|
+-----+----+-------+-----+----+------------------+-----+
|    0|   0|LINCOLN| 4897|   0|20 CRYSTALWOOD CIR|95648|
|    0|   0|LINCOLN| 4897|   0|24 CRYSTALWOOD CIR|95648|
+-----+----+-------+-----+----+------------------+-----+
only showing top 2 rows

Alternatively, use Pandas-like syntax to get the same result

df[df.price < 5000].show(2)
+-----+----+-------+-----+----+------------------+-----+
|baths|beds|   city|price|sqft|            street|  zip|
+-----+----+-------+-----+----+------------------+-----+
|    0|   0|LINCOLN| 4897|   0|20 CRYSTALWOOD CIR|95648|
|    0|   0|LINCOLN| 4897|   0|24 CRYSTALWOOD CIR|95648|
+-----+----+-------+-----+----+------------------+-----+
only showing top 2 rows

Count the number of rows, grouped by city

df.groupBy("city").count().show(2)
+--------------+-----+
|          city|count|
+--------------+-----+
|RANCHO MURIETA|    3|
|  CAMERON PARK|    9|
+--------------+-----+
only showing top 2 rows

Show summary statistic about numberical features by using describe() function

df.describe(['baths', 'beds','price','sqft']).show()
+-------+------------------+------------------+------------------+------------------+
|summary|             baths|              beds|             price|              sqft|
+-------+------------------+------------------+------------------+------------------+
|  count|               985|               985|               985|               985|
|   mean|1.7766497461928934|2.9116751269035532|234144.26395939087|1314.9167512690356|
| stddev|0.8949168036438349|1.3072681384582236| 138295.5847828183|  852.615113131778|
|    min|                 0|                 0|            100000|                 0|
|    max|                 5|                 8|             99000|               998|
+-------+------------------+------------------+------------------+------------------+

We can Cconvert our Data Frame to Pandas Data Frame by using function toPandas()

pandas_df = df.toPandas()
pandas_df[:5]

 

Similar to RDDs, DataFrames are evaluated lazily. Computation only happens when an action (e.g. display result, save output) is required. This allows their executions to be optimized, by applying techniques such as predicate push-downs and bytecode generation, as explained later in the section “Under the Hood: Intelligent Optimization and Code Generation”. All DataFrame operations are also automatically parallelized and distributed on clusters.

Comments 1

  •  name |   30 December 2015 - 04:08

    Phasellus ut ex ut ipsum pharetra tincidunt dictum ac felis. Ut ut ipsum eget arcu rhoncus rutrum. In hac habitasse platea dictumst. Aenean eu bibendum erat, quis mattis ipsum. Mauris sit amet mattis nisl. Sed ac orci quam. Morbi convallis facilisis luctus. Donec ac nunc erat. In luctus nisl faucibus libero facilisis cursus. Nulla vel purus a quam ultricies consectetur. Mauris sed consectetur tortor. Etiam eu imperdiet velit. Aenean auctor arcu libero, eget facilisis dui vestibulum id.

Add a new comment: