0 votes
2 views
in Big Data Hadoop & Spark by (11.5k points)

I have kryo serialization turned on with this:


conf.set( "spark.serializer", "org.apache.spark.serializer.KryoSerializer" )


I want to ensure that a custom class is serialized using kryo when shuffled between nodes. I can register the class with kryo this way:


conf.registerKryoClasses(Array(classOf[Foo]))


As I understand it, this does not actually guarantee that kyro serialization is used; if a serializer is not available, kryo will fall back to Java serialization.
To guarantee that kryo serialization happens, I followed this recommendation from the Spark documentation:


conf.set("spark.kryo.registrationRequired", "true")


But this causes IllegalArugmentException to be thrown ("Class is not registered") for a bunch of different classes which I assume Spark uses internally, for example the following:


org.apache.spark.util.collection.CompactBuffer
scala.Tuple3

 

1 Answer

0 votes
by (31.4k points)

Serialization plays an important role in the performance of any distributed application. Formats that are slow to serialize objects into, or consume a large number of bytes, will greatly slow down the computation. Often, this will be the first thing you should tune to optimize a Spark application. Spark aims to strike a balance between convenience (allowing you to work with any Java type in your operations) and performance. It provides two serialization libraries:

Java serialization: By default, Spark serializes objects using Java’s ObjectOutputStream framework, and can work with any class you create that implements java.io.Serializable. You can also control the performance of your serialization more closely by extending java.io.Externalizable. Java serialization is very flexible, and leads to large serialized formats for many classes. But it is quiet slow.

Kryo serialization: Spark can also use the Kryo v4 library in order to serialize objects more quickly. Kryo is significantly faster and more compact as compared to Java serialization (approx 10x times), but Kryo doesn’t support all Serializable types and requires you to register the classes in advance that you’ll use in the program in advance in order to achieve best performance.

When Kryo serializes an instance of an unregistered class it has to output the fully qualified class name. That's a lot of characters. Instead, if a class gets pre-registered, Kryo can simply output a numeric reference to this pre-registered class, which is generally 1-2 bytes.

This gets very crucial in cases where each row of an RDD is serialized with Kryo. You don't want to include the same class name for each of a billion rows. So you pre-register these classes. But it's easy to forget to register a new class and then you're wasting bytes again. The solution is to require every class to be registered:

conf.set("spark.kryo.registrationRequired", "true")

Now Kryo will never output full class names. If it encounters an unregistered class, that's a runtime error.

Unfortunately it's hard to enumerate all the classes that you are going to be serializing in advance. The idea is that Spark registers the Spark-specific classes, and you register everything else. You have an RDD[(X, Y, Z)]? You have to register classOf[scala.Tuple3[_, _, _]].


 

If your objects are large, you may also need to increase the spark.kryoserializer.buffer config. This value needs to be large enough to hold the largest object you will serialize.


The list of classes that Spark registers actually includes CompactBuffer, so if you see an error for that, you're doing something wrong. You are bypassing the Spark registration procedure. You have to use either spark.kryo.classesToRegister or spark.kryo.registrator to register your classes. (See the config options. If you use GraphX, your registrator should call GraphXUtils. registerKryoClasses).

Welcome to Intellipaat Community. Get your technical queries answered by top developers !


Categories

...