Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by

I want to translate following routine from class

override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) val vectors = wordVectors.getVectors .mapValues(vv => Vectors.dense(vv.map(_.toDouble))) .map(identity) // mapValues doesn't return a serializable map (SI-7005) val bVectors = dataset.sparkSession.sparkContext.broadcast(vectors) val d = $(vectorSize) val word2Vec = udf { sentence: Seq[String] => if (sentence.isEmpty) { Vectors.sparse(d, Array.empty[Int], Array.empty[Double]) } else { val sum = Vectors.zeros(d) sentence.foreach { word => bVectors.value.get(word).foreach { v => BLAS.axpy(1.0, v, sum) } } BLAS.scal(1.0 / sentence.size, sum) sum } } dataset.withColumn($(outputCol), word2Vec(col($(inputCol)))) }

I tried to do some of the portion in bits and pieces but not able to put whole.

For scala idenity function I created same function in pyspark as pyspark dosen't have one.

   def identity(x):
        return x

Like I found BLAS.axpy() inner implementation which I can leverage for pyspark is

axpy(double a, Vector x, Vector y)
    y += a * x
Same way for BLAS.scal() , the inner logic is 
scal(double a, Vector x)
x = a * x

I tried to convert following line

val vectors = wordVectors.getVectors
      .mapValues(vv => Vectors.dense(vv.map(_.toDouble)))
      .map(identity)

And I came up with this following , not sure how to do vv.map(_.toDouble) in pyspark ? Is that right ?

  vectors_final = model.getVectors().rdd.mapValues(lambda vv: Vectors.dense(vv)).map(lambda x: identity(x))

Please log in or register to answer this question.

Browse Categories

...