Back

Explore Courses Blog Tutorials Interview Questions
0 votes
2 views
in Big Data Hadoop & Spark by (19k points)

My dataset looks like this:

KEY |Event_Type | metric | Time 

001 |event1     | 10     | 2016-05-01 10:50:51

002 |event2     | 100    | 2016-05-01 10:50:53

001 |event3     | 20     | 2016-05-01 10:50:55

001 |event1     | 15     | 2016-05-01 10:51:50

003 |event1     | 13     | 2016-05-01 10:55:30

001 |event2     | 12     | 2016-05-01 10:57:00

001 |event3     | 11     | 2016-05-01 11:00:01

I want to get all when the keys that verify this:

"SUM of metric for a specific event" > threshold during 5 minutes .

This appear to me a perfect candidate for using the Sliding Windows Functions .

How can I do this with Spark SQL ?

Thank you.

1 Answer

0 votes
by (33.1k points)

For Spark >= 2.0

You can also use the window. Depending on a variant it assigns timestamp, to one more, potentially overlapping buckets:

df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")

// +---+---------------------------------------------+-----------+

// |KEY|window                                       |sum(metric)|

// +---+---------------------------------------------+-----------+

// |001|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|45         |

// |001|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|12         |

// |003|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|13         |

// |001|[2016-05-01 11:00:00.0,2016-05-01 11:05:00.0]|11         |

// |002|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|100        |

// +---+---------------------------------------------+-----------+

For Spark < 2.0

Here is an example:

import spark.implicits._  // import sqlContext.implicits._ in Spark < 2.0

val df = Seq(

  ("001", "event1", 10, "2016-05-01 10:50:51"),

  ("002", "event2", 100, "2016-05-01 10:50:53"),

  ("001", "event3", 20, "2016-05-01 10:50:55"),

  ("001", "event1", 15, "2016-05-01 10:51:50"),

  ("003", "event1", 13, "2016-05-01 10:55:30"),

  ("001", "event2", 12, "2016-05-01 10:57:00"),

  ("001", "event3", 11, "2016-05-01 11:00:01")

).toDF("KEY", "Event_Type", "metric", "Time")

Hope this answer will help you!

Browse Categories

...