Explore Courses Blog Tutorials Interview Questions
0 votes
in Data Science by (17.6k points)

Flink version : 1.2.0

Scala version : 2.11.8

I want to use a DataStream to predict using a model in flink using scala. I have a DataStream[String] in flink using scala which contains json formatted data from a kafka source.I want to use this datastream to predict on a Flink-ml model which is already trained. The problem is all the flink-ml examples use DataSet api to predict. I am relatively new to flink and scala so any help in the form of a code solution would be appreciated.

Input :


Code :

package org.apache.flink.quickstart


import java.util.Properties

import org.apache.flink.api.scala._



import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import scala.util.parsing.json.JSON

//kafka consumer imports

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09

import org.apache.flink.streaming.util.serialization.SimpleStringSchema

//kafka json table imports

import org.apache.flink.table.examples.scala.StreamTableExample

import org.apache.flink.table.api.TableEnvironment

import org.apache.flink.streaming.connectors.kafka.Kafka09JsonTableSource


//JSon4s imports

import org.json4s.native.JsonMethods

// Case class

case class CC(FC196:String,FC174:String,FC195:String,FC176:String,FC198:String,FC175:String,FC197:String,FC178:String,FC177:String,FC199:String,FC179:String,FC190:String,FC192:String,FC191:String,FC194:String,FC193:String,FC203:String,FC205:String,FC185:String,FC184:String,FC187:String,FC186:String,FC189:String,FC200:String,FC188:String,FC202:String,FC201:String,FC181:String,FC180:String,FC183:String,FC182:String)

object WordCount {

  implicit val formats = org.json4s.DefaultFormats

  def main(args: Array[String]) {

    // set up the execution environment

    implicit lazy val formats = org.json4s.DefaultFormats

    // kafka properties

    val properties = new Properties()

    properties.setProperty("bootstrap.servers", "***.**.*.***:9093")

    properties.setProperty("zookeeper.connect", "***.**.*.***:2181")


    properties.setProperty("auto.offset.reset", "earliest")

    val env = StreamExecutionEnvironment.getExecutionEnvironment

//    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val st = env

      .addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema() , properties))

      .flatMap(raw => JsonMethods.parse(raw).toOption)

    val mapped =[CC])





1 Answer

0 votes
by (41.4k points)

For solving your problem you just need to write a MapFunction that will read the model upon start of the job. 

After that,the MapFunction will store the model as part of its internal state.

In case of a failure, it will be automatically recovered by this way.

public static void main(String[] args) throws Exception {

        // obtain execution environment, run this example in "ingestion time"

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        DataStream<Value> input = ...; // read from Kafka for example

        DataStream<Prediction> prediction = Predictor());




    public static class Predictor implements MapFunction<Value, Prediction>, CheckpointedFunction {

        private transient ListState<Model> modelState;

        private transient Model model;


        public Prediction map(Value value) throws Exception {

            return model.predict(value);



        public void snapshotState(FunctionSnapshotContext context) throws Exception {

            // we don't have to do anything here because we assume the model to be constant



        public void initializeState(FunctionInitializationContext context) throws Exception {

            ListStateDescriptor<Model> listStateDescriptor = new ListStateDescriptor<>("model", Model.class);

            modelState = context.getOperatorStateStore().getUnionListState(listStateDescriptor);

            if (context.isRestored()) {

                // restore the model from state

                model = modelState.get().iterator().next();

            } else {


                // read the model from somewhere, e.g. read from a file

                model = ...;

                // update the modelState so that it is checkpointed from now





    public static class Model {}

    public static class Value{}

    public static class Prediction{}


Browse Categories