A zero cost type safe Apache Spark A PI.
Why Wick?
Using Wick gives you the guarantee that your code is correct at compile time, full IDE support with suggestions of
available columns and their types, and better guidance for your AI agents such as Claude.
Using Wick will save you HOURS of work because you won't have to go through lengthy packaging and
deployment to a cluster to test if your job works... repeatedly because it never works the first time 😫
Jump to:
- Getting started
- Creating a DataSeq
- Filtering
- Selecting
- Joining
- Aggregating
- Related projects
- Contributing
(All code snippets in this markdown documentation are compiled and guaranteed to work)
Important
Wick needs Scala 3.7.0 or higher.
This is because Wick relies on Named Tuples
to provide all these compile time safety and IDE auto completion.
Add the dependency to Gradle:
com.netflix.wick:wick_$scalaBinaryVersion:<version>
Or to Scala CLI:
com.netflix.wick::wick:<version>
Check the latest version on Artifactory.
It is recommended to enable Explicit Nulls
with the Scala compiler option -Yexplicit-nulls. This option allows tracking nullable columns by the type system.
Import everything you need from Wick:
import com.netflix.wick.{*, given}Create a SparkSession:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()Here is an example you can copy/paste in a file like wick.scala and run with
scala wick.scala (no prerequirement needed):
//> using scala 3.7.4
//> using options -Yexplicit-nulls
//> using jvm 17
//> using dep com.netflix.wick::wick:<version>
//> using dep org.apache.spark:spark-sql_2.13:3.5.1
//> using javaOptions --add-exports java.base/sun.nio.ch=ALL-UNNAMED
import org.apache.spark.sql.SparkSession
import com.netflix.wick.{*, given} // Import everything you need from Wick
// Write your case class models here (Spark does not like when they are defined inside a def or a class)
@main def wick =
val spark = SparkSession.builder().master("local").getOrCreate()
// Write your job hereNote
DataSeq is the type safe representation that Wick introduces instead of the completely untyped DataFrame or
Dataset which comes with potential performance drawbacks.
DataSeq is as performant as DataFrame can be but with even better typing than Dataset.
Given this model:
case class Department(id: Int, name: String)
case class Employee(id: Int, name: String, dept_id: Int, title_id: Int)
case class Title(id: Int, name: String, managing: Boolean)We can generate some dummy data:
val employees = spark.createDataSeq(
Seq(
Employee(id = 1, name = "Alice", dept_id = 1, title_id = 2),
Employee(id = 2, name = "Bob", dept_id = 2, title_id = 1),
Employee(id = 3, name = "Charlie", dept_id = 1, title_id = 2)
)
)
val departments = spark.createDataSeq(
Seq(
Department(id = 1, name = "Engineering"),
Department(id = 2, name = "Marketing")
)
)
val titles = spark.createDataSeq(
Seq(
Title(id = 1, name = "Ad Manager", managing = true),
Title(id = 2, name = "Software Engineer", managing = false)
)
)
employees.show()
// +---+-------+-------+--------+
// | id| name|dept_id|title_id|
// +---+-------+-------+--------+
// | 1| Alice| 1| 2|
// | 2| Bob| 2| 1|
// | 3|Charlie| 1| 2|
// +---+-------+-------+--------+Or load the data from actual tables:
val employees = spark.tableDataSeq[Employee]("employees")
val departments = spark.tableDataSeq[Department]("departments")
val titles = spark.tableDataSeq[Title]("titles")☝️ this will need to run in the cluster (not local) due to ACLs.
Note
All following examples will assume the above setup (imports and classes).
Wick provides type-safe filtering with compile-time guarantees that your filter conditions are valid:
case class Person(name: String, age: Int | Null)
val persons = spark.createDataSeq(Seq(
Person("Alice", age = 30),
Person("Bob", age = 15),
Person("Charlie", age = 35)
))
// Filter with simple conditions
val adults = persons.filter(_.age.orElse(0) > 18)
adults.show()
// +-------+---+
// | name|age|
// +-------+---+
// | Alice| 30|
// |Charlie| 35|
// +-------+---+
// Equality conditions
val alice = persons.filter(_.name === "Alice")
alice.show()
// +-----+---+
// | name|age|
// +-----+---+
// |Alice| 30|
// +-----+---+
// Inequality conditions
val notBob = persons.filter(_.name !== "Bob")
notBob.show()
// +-------+---+
// | name|age|
// +-------+---+
// | Alice| 30|
// |Charlie| 35|
// +-------+---+
// Negation
val notBob2 = persons.filter(person => !(person.name === "Bob"))
notBob2.show()
// +-------+---+
// | name|age|
// +-------+---+
// | Alice| 30|
// |Charlie| 35|
// +-------+---+
// Combined logical operators
val middleAged = persons.filter(person => nullable(person.age.? > 25 && person.age.? < 35).orElse(false))
middleAged.show()
// +-----+---+
// | name|age|
// +-----+---+
// |Alice| 30|
// +-----+---+Joining is described in a later section but here is an example of how to filter a joined data
using the departments and employees DataSeqs defined in the Creating a DataSeq
section:
// Joining employees and departments
val joined = employees.join(departments, (emp, dept) => emp.dept_id === dept.id)
// Filter joined data for only engineering employees
val engineeringEmployees = joined.filter((emp, dept) => dept.name === "Engineering")
engineeringEmployees.show()
// +---+-------+------+---+-----------+
// | id| name|deptId| id| name|
// +---+-------+------+---+-----------+
// | 1| Alice| 1| 1|Engineering|
// | 3|Charlie| 1| 1|Engineering|
// +---+-------+------+---+-----------+Wick provides a type-safe way to select and transform columns from your DataSeq:
// Select and transform columns with named tuples
val doubleAges = persons.select(person => (double_age = nullable(person.age.? * 2)))
doubleAges.show()
// +----------+
// |double_age|
// +----------+
// | 60|
// | 30|
// | 70|
// +----------+
// Chain selections for multiple transformations
val quadrupleAges = doubleAges.select(row => (quadruple_age = nullable(row.double_age.? * 2)))
quadrupleAges.show()
// +-------------+
// |quadruple_age|
// +-------------+
// | 120|
// | 60|
// | 140|
// +-------------+Wick supports type-safe joins between DataSeqs.
Using the departments, employees and titles DataSeqs defined in the Creating a DataSeq
section:
// Join two DataSeqs
val empDepts = employees.join(departments, _.dept_id === _.id)
// Select from joined data with type-safe column access
val selected = empDepts.select((emp, dept) => (
emp_name = emp.name,
dept_name = dept.name
))
selected.show()
// +--------+-----------+
// |emp_name| dept_name|
// +--------+-----------+
// | Alice|Engineering|
// | Bob| Marketing|
// | Charlie|Engineering|
// +--------+-----------+Multiple joins:
// Chain multiple joins
val empDeptTitle = employees
.join(departments, (emp, dept) => emp.dept_id === dept.id)
.join(titles, (emp, dept, title) => emp.title_id === title.id)
val finalResult = empDeptTitle.select((emp, dept, title) => (
emp_name = emp.name,
dept_name = dept.name,
title_name = title.name,
managing = title.managing
))
finalResult.show()
// +--------+-----------+-----------------+--------+
// |emp_name| dept_name| title_name|managing|
// +--------+-----------+-----------------+--------+
// | Alice|Engineering|Software Engineer| false|
// | Bob| Marketing| Ad Manager| true|
// | Charlie|Engineering|Software Engineer| false|
// +--------+-----------+-----------------+--------+Wick provides type-safe grouping and aggregation operations:
import com.netflix.wick.functions.{count, `*`}
// Group by a computed column and aggregate
val aggregated = employees
.groupBy(emp => (dept_id = emp.dept_id))
.agg(emp => (population = count(`*`)))
aggregated.show()
// +-------+----------+
// |dept_id|population|
// +-------+----------+
// | 1| 2|
// | 2| 1|
// +-------+----------+
// Access both grouping keys and aggregated values
val summary = aggregated.select(row => (
dept_id = row.dept_id,
population = row.population
))Aggregations of grouped data only compiles for Scalar expressions. For example the follow does not compile:
persons
.groupBy(person => (age_group = person.age))
.agg(person => (age_sum = person.age)) // This does not compile because person.age is not an aggregation
^
(age_sum : com.netflix.wick.column.Expr[Int | Null]) is not a NamedTuple of ScalarExpr[?].Instead maybe you wanted to do:
import com.netflix.wick.functions.sum
persons
.groupBy(person => (age_group = person.age))
.agg(person => (age_sum = sum(person.age))) // With the sum, it's better!Wick provides type-safe ordering operations that ensure you can only sort by orderable columns at compile time:
// Sort by a single column (ascending by default)
val byAge = persons.orderBy(_.age)
byAge.show()
// +-------+---+
// | name|age|
// +-------+---+
// | Bob| 15|
// | Alice| 30|
// |Charlie| 35|
// +-------+---+
// Sort by multiple columns
val byAgeAndName = persons.orderBy(person => (person.age, person.name))
byAgeAndName.show()
// +-------+---+
// | name|age|
// +-------+---+
// | Bob| 15|
// | Alice| 30|
// |Charlie| 35|
// +-------+---+For explicit control over sort direction, use the asc and desc functions:
import com.netflix.wick.functions.{asc, desc}
// Sort by age descending
val byAgeDesc = persons.orderBy(person => desc(person.age))
byAgeDesc.show()
// +-------+---+
// | name|age|
// +-------+---+
// |Charlie| 35|
// | Alice| 30|
// | Bob| 15|
// +-------+---+
// Sort by multiple columns with mixed directions
val mixedSort = persons.orderBy(person => (desc(person.age), asc(person.name)))
mixedSort.show()
// +-------+---+
// | name|age|
// +-------+---+
// |Charlie| 35|
// | Alice| 30|
// | Bob| 15|
// +-------+---+Sorting a DataSeq only compiles if used on supported types, shielding your job from resulting in a:
ExtendedAnalysisException: [DATATYPE_MISMATCH.INVALID_ORDERING_TYPE] Cannot resolve "col ASC NULLS FIRST" due to data type mismatch: The `sortorder` does not support ordering on type "MAP<STRING, INT>".
case class ComplexData(name: String, metadata: Map[String, Int])
val complex = spark.createDataSeq(Seq(
ComplexData("Alice", Map("score" -> 100)),
ComplexData("Bob", Map("score" -> 85))
))
// This will NOT compile - Map types are not orderable
// complex.orderBy(_.metadata) // Compilation error!Wick has safer column operations that will make sure the transformations you make are legal at compile time rather than at runtime, after you packaged and deployed your job.
The || or && boolean operands are only valid on Expr[Boolean] and Expr[Boolean | Null], therefore code like
person.age || person.name will simply not compile because it does not make any sense!
The usage of +, -, *, /, <, <=, > and >= numeric operands only compiles if used on supported
numeric columns and their nullable types (like Option[Int]).
It is recommended to use .orElse() over coalesce() since it can track if the expression is still nullable or not
with the -Yexplicit-nulls Scala compiler option.
import com.netflix.wick.column.orElse
val nullFreePerson = persons.select(person => (name = person.name, age_or_zero = person.age.orElse(0)))Given that person.age is of type Int | Null, person.age.orElse(0) changes the type to Int, making sure the
resulting age_or_zero column does not have nulls.
Now that the age_or_zero is not nullable calling orElse on this column would not work:
nullFreePerson.select(person => (name = person.name, age_or_zero = person.age_or_zero.orElse(-1)))
[error] ^^^^^^^^^^^^^^^^^^^^^^^^^
[error] value orElse is not a member of com.netflix.wick.column.Expr[Int].Using any of asc, desc, min and max operands only compiles if used on supported types, shielded your job from
resulting in a:
ExtendedAnalysisException: [DATATYPE_MISMATCH.INVALID_ORDERING_TYPE] Cannot resolve "col ASC NULLS FIRST" due to data type mismatch: The `sortorder` does not support ordering on type "MAP<STRING, INT>".
For more comprehensive examples, please check the test folder.
- Wick is heavily inspired from Going structural with Named Tuples by Jamie Thompson (demo repo)
- Iskra (Using Macros and not very IDE friendly)
- Frameless (Scala 2 and outdated)
- spark-compiletime