List of articles

Spark for Dummies - Part 6 - Toolkit by Data Type

In this chapter we will see the different types managed by Spark and their main features.
This list covers most needs but is not exhaustive.
You will find everything you need in the org.apache.spark.sql.functions class
For this article we used the Databricks platform and the Python language to write the code.

Conversion to Spark-type column

The method lit allows to convert an object into a column containing this object in Spark type.
This is very useful to convert constants from the language used to Spark type, for example:

lit(3), lit(« three »)

from pyspark.sql.functions import col,lit
df1 = df.select(col("id"),col("Nom"),col("Profession"),lit("2").alias("lit_valeur"))
df1.show(truncate=False)

Working with Booleans

The where method allows to filter the data according to a condition given in parameter, which can be :
• An SQL expression, for example: where(« c=1 »)

df1.where("id==2").show()

• A condition in the form of a column, for example: where col(c)==1

df1.where(col("Profession")=="professeur").show()

It is then possible to chain the where filters where  , which is equivalent to using the and operator  and .

df1.where(df1.Nom == 'Laurent').where (df1.Profession == 'Docteur').show()

df1.where(df1.Nom == 'Laurent' and df1.Profession == 'Docteur').show()

Attention, a test of type « == » between two null values returns false; if you want to compare values by specifying that null values are equal, you must use the eqNullSafe operator eqNullSafe .

df.select(
    df['Profession'],
    (df['Profession'] == 'ingenieur').alias("EgalIngenieur"),
    (df['Profession'] == None).alias("EgalNone"),
    df['Profession'].eqNullSafe('ingenieur').alias("EqNullSafeIngenieur"),
    df['Profession'].eqNullSafe(None).alias("EqNullSafeNone")
).show()

The numbers

It is possible to use numbers in an expression, or their value in the code.
To round numbers you can either convert them to integers with the following command:


df1.select(col("c").cast("int").as("c")).show() 

or use the round up or round down methods
The method  corr is used to calculate the correlation between two columns: Dataframe.corr(“col1″,”col2” ).

Character strings

Spark offers a set of tools that allows us to do most of the basic string operations:
• Theinitcap method allows to capitalize the first character of each word of a string

 

from pyspark.sql.functions import initcap
df1.select(initcap("Sport")).show()

• To capitalize all the characters of a string, just use the upper methodfrom pyspark.sql.functions import upper, lower, col df1.select(upper("Name")).show()

The lower method allows to put the characters of a string in lower case

df1.select(lower("Name")).show()

• he lpad/rpad functions are used to format the data if necessary

from pyspark.sql.functions import lpad, rpad
df1.select("Sport", \
"salary", \
 lpad(trim(col("salary")), 4, "0").alias("formmated_data") \
).show()

• the functions :trim, ltrim and rtrim functions are used to remove white spaces at the beginning and end of data

from pyspark.sql.functions import lpad, rpad
df1.select(ltrim("salary")).show()   

• With the regexp_replace function we can replace any string corresponding to a regular expression pattern with another string:

from pyspark.sql.functions import regexp_replace
reg_exp = "foot"
df1.select(regexp_replace \
(col("Sport"), \
reg_exp,"football") \
.alias("sports"), "Sport") \
.show()

• The translate function replaces an ordered set of characters represented by a string with other characters modeled by another ordered set of characters

from pyspark.sql.functions import translate
df1.select(translate \(col("Sport"), \"ito", "ITO") \.alias("Nouveau") \).show()

• Instr returns 1 if a string contains another string, 0 otherwise

from pyspark.sql.functions import instr
df1.select("Sport", \instr(col("Sport"), \"foot")).show()

Dates and timestamps

There are date and timestamp functions in the Spark DataFrame API, which allow you to perform various date and time operations.

The current_date method displays the current date

dataf.select(current_date().alias("current_date")).show(1)

The date_add method allows you to add or remove one or more days to a date

dataf.select(col("date"), 
    date_add(col("date"),4).alias("date_add"), 
    date_sub(col("date"),4).alias("date_sub")
  ).show()

The date_diff function returns the number of days between two dates

dataf.select(col("date"), 
  datediff(current_date(),col("date")).alias("datediff")  
  ).show()

Process null data

The null value can be used to represent missing or empty data. It allows to easily perform cleaning operations, such as:

• Delete rows containing null valuesnull

  df.na.drop("any").show()

• Replace null values by other integer or string values

df.na.fill("Incomplet").show()

df.na.fill(0).show()

For double values, the syntax is : df.na.fill(double)

 

Complex types

Complex types contain multiple values within a single column. There are several complex data types such as structures (struct), arrays (arrays), key-value mapping tables (maps) or the standard JSON format.

Structs

The Struct type defines a set of values whose types can be different (primitive or complex)
To do this, all you have to do is define a scheme.
For example, for the structure below:

from pyspark.sql.types import StructType,StructField, StringType,IntegerType
struct_donnee= [
  (("jean","Brice"),"M",23),
  (("Olivier","Benoit"),"M",40),
  (("Agnès","Piccardi"),"F",26)
  ]

The corresponding diagram is as follows:

struct_schema=StructType([
  StructField('Infos', StructType([
    StructField('Prenom',StringType(), True),
    StructField('Nom',StringType(), True)
  ])),
  StructField('genre', StringType(), True),
  StructField('Age', IntegerType(),True)
])
df_struct.select("Infos.*","genre","age").printSchema()

Arrays

An array is an ordered collection of elements of complex or primitive type.
Warning: all elements of an array must be of the same data type, for example:

from pyspark.sql.functions import col,array_contains
from pyspark.sql import Row
df_array= spark.createDataFrame([Row(chiffre=[1,2,3],Nom="Brice"),Row(chiffre=[11,22,33],Nom="Olivier"),Row(chiffre=[111,222,333],Nom="Agnès")])
df_array.show(truncate=False)

The split method allows you to transform a string into an array where each element is a character of the string, for example:

from pyspark.sql.functions import split
df_array.select(split(col("Nom"),"")).show()

• To get the element of an array corresponding to a given position, we can use select_expr with its position in square brackets

df_array.selectExpr("chiffre[0]").show()

The size function returns the size of an array

from pyspark.sql.functions import size
df_array.select(size(col("chiffre"))).show()

• Check if an array contains a value passed in parameter :

df_find=df_array.filter(array_contains(df_array.chiffre,33))
df_find.show()

To “explode” an array on several rows, or in other words to obtain a row for each element of the array, we will use the

explode method

 

df_explode=df.select(df.Nom,explode(df.langage).alias("explode"))
df_explode.printSchema()
df_explode.show(truncate=False)

The Maps

Unlike arrays, maps are collections of elements not ordered by pairs of keys and values.
The values can be of primitive or complex type.On the other hand, the keys must be of primitive type, for example:
Example :

data= [({'a':1,'b':2},),({'c':3},), ({'a':4,'c':5},)]
df_map=spark.createDataFrame(data, ["map"])
df_map.show()
df_map.printSchema()

JSON

In Spark we have access to functions dedicated to the JSON format that allow us to perform different operations.
For example :

from pyspark.sql.functions import from_json,col
df=spark.read.json("/FileStore/tables/devices.json")

• The following command allows to display specific columns from a JSON format

df.select("device_id","zipcode").show(3)

• To convert strings in JSON format into Struct or Map type, we use the from_json method

from pyspark.sql.types import MapType,StringType
df.withColumn ("device_name",from_json(col("device_name"),MapType(StringType,StringType)))

• The to_json() method is used to convert Map or Struct types into json format

from pyspark.sql.functions import to_json
df.withColumn("device_name",to_json(col("device_name")))

Conclusion

We have seen in this article that Spark offers a very wide variety of tools that we have just listed in a simple and non-exhaustive way. We will see later that Spark offers many other possibilities.