0

Input Json

{"studentName": "abc","mailId": "[email protected]","class" : 7,"newSub" : "Environment","grade" : "A","score"  : 95,"scoreBoard" : [{"subject":"Math","score":90,"grade":"A"},{"subject":"Science","score":82,"grade":"A"},{"subject":"History","score":80,"grade":"A"},{"subject":"Hindi","score":75,"grade":"B"}, {"subject":"English","score":80,"grade":"A"},{"subject":"Geography","score":80,"grade":"A"}]}
{"studentName": "xyz","mailId": "[email protected]","class" : 8,"newSub" : "Environment","grade" : "A","score"  : 95,"scoreBoard" : [{"subject":"Math","score":90,"grade":"A"},{"subject":"Physics","score":85,"grade":"A"},{"subject":"Chemistry","score":80,"grade":"A"},{"subject":"Hindi","score":75,"grade":"B"},{"subject":"English","score":70,"grade":"B"},{"subject":"Biology","score":87,"grade":"A"}]}
{"studentName": "efg","mailId": "[email protected]","class" : 9,"newSub" : "Environment","grade" : "A","score"  : 95,"scoreBoard" : [{"subject":"Math","score":91,"grade":"A"},{"subject":"Physics","score":77,"grade":"B"},{"subject":"Chemistry","score":72,"grade":"B"},{"subject":"Computer","score":95,"grade":"A"},{"subject":"English","score":82,"grade":"A"},{"subject":"Biology","score":76,"grade":"B"}]}

+-----+-----+-------------+-----------+-----+------------------------------------------------------------------------------------------------+-----------+
|class|grade|mailId       |newSub     |score|scoreBoard                                                                                      |studentName|
+-----+-----+-------------+-----------+-----+------------------------------------------------------------------------------------------------+-----------+
|7    |A    |[email protected]|Environment|95   |[[A,90,Math], [A,82,Science], [A,80,History], [B,75,Hindi], [A,80,English], [A,80,Geography]]   |abc        |
|8    |A    |[email protected]|Environment|95   |[[A,90,Math], [A,85,Physics], [A,80,Chemistry], [B,75,Hindi], [B,70,English], [A,87,Biology]]   |xyz        |
|9    |A    |[email protected]|Environment|95   |[[A,91,Math], [B,77,Physics], [B,72,Chemistry], [A,95,Computer], [A,82,English], [B,76,Biology]]|efg        |
+-----+-----+-------------+-----------+-----+------------------------------------------------------------------------------------------------+-----------+

Processing I want -

  1. add newSub's json is scoreBoard list (read data from user row - newSub, score, grade)

  2. sort them on score and remove the json from scoreBoard list having less score

Expected output -

{"studentName": "abc","mailId": "[email protected]","class" : 7,"scoreBoard" : [{"subject":"Environment","score":95,"grade":"A"},{"subject":"Math","score":90,"grade":"A"},{"subject":"Science","score":82,"grade":"A"},{"subject":"History","score":80,"grade":"A"},{"subject":"English","score":80,"grade":"A"},{"subject":"Geography","score":80,"grade":"A"}]}
{"studentName": "xyz","mailId": "[email protected]","class" : 8,"scoreBoard" : [{"subject":"Environment","score":95,"grade":"A"},{"subject":"Math","score":90,"grade":"A"},{"subject":"Physics","score":85,"grade":"A"},{"subject":"Chemistry","score":80,"grade":"A"},{"subject":"Hindi","score":75,"grade":"B"},{"subject":"Biology","score":87,"grade":"A"}]}
{"studentName": "efg","mailId": "[email protected]","class" : 9,"scoreBoard" : [{"subject":"Environment","score":95,"grade":"A"},{"subject":"Math","score":91,"grade":"A"},{"subject":"Physics","score":77,"grade":"B"},{"subject":"Computer","score":95,"grade":"A"},{"subject":"English","score":82,"grade":"A"},{"subject":"Biology","score":76,"grade":"B"}]}

+-----+-------------+---------------------------------------------------------------------------------------------------+-----------+
|class|mailId       |scoreBoard                                                                                         |studentName|
+-----+-------------+---------------------------------------------------------------------------------------------------+-----------+
|7    |[email protected]|[[A,95,Environment], [A,90,Math], [A,82,Science], [A,80,History], [A,80,English], [A,80,Geography]]|abc        |
|8    |[email protected]|[[A,95,Environment], [A,90,Math], [A,85,Physics], [A,80,Chemistry], [B,75,Hindi], [A,87,Biology]]  |xyz        |
|9    |[email protected]|[[A,95,Environment], [A,91,Math], [B,77,Physics], [A,95,Computer], [A,82,English], [B,76,Biology]] |efg        |
+-----+-------------+---------------------------------------------------------------------------------------------------+-----------+

I tried

1st way - UDF processing but Sorting and deleting json from scoreBoard column in UDF is challenging

2nd way - explode the column scoreBoard, got 6 row for single student, each for every subject. Challenge I am facing in this is, how to process data group wise, Like how to add new row for new Subject,sort each user's subject score and delete one row.

Need help to select way to solve this problem, if anyone know is there any new/different efficient way to do the same processing. Thanks!!

2 Answers 2

1
 import ss.implicits._

  val schema = new ArrayType(new StructType(Array(
    StructField("grade",DataTypes.StringType,true),
    StructField("score",DataTypes.LongType,true),
    StructField("subject",DataTypes.StringType,true))),true)

  def addValue = udf((array: Seq[Row], newval:Row)=> array ++ Array(newval),schema)

  def sortAndRemove = udf((array: Seq[Row])=> array.sortBy(x=>x.getAs[Long]("score"))(Ordering[Long].reverse).slice(0,array.length-1),schema)

val df2 =  df.withColumn("map_col",struct(col("grade"),col("score"),col("newSub").as("subject")))
    .withColumn("scoreBoard",sortAndRemove(addValue(col("scoreBoard"),col("map_col"))))
  df2.select("scoreBoard").show(false)

UDF approach, where ss is SparkSession. addvalue can be replaced with array_union if using spark version 2.4 and above.

Above code will work for spark 2.0 and above

Sign up to request clarification or add additional context in comments.

4 Comments

if there is epoch day attr also in the schema along with grade,score,Subject. -> {"studentName": "abc","mailId": "[email protected]","class" : 7,"newSub" : "Environment","grade" : "A","score" : 95,"currentDay" : "18358","scoreBoard" : [{"subject":"Math","score":90,"grade":"A","day": "18340"},{"subject":"Science","score":82,"grade":"A","day": "18354"},{"subject":"History","score":80,"grade":"A","day": "18355"},{"subject":"Environment","score":95,"grade":"A","day": "18356"}, {"subject":"English","score":80,"grade":"A","day": "18356"},{"subject":"Geography","score":80,"grade":"A","day": "18356"}]}
|class|currentDay|grade|mailId |newSub |score|scoreBoard |studentName| +-----+----------+-----+-------------+-----------+-----+---------------------------------------------------------------------------------------------------------------------------------------+-----------+ |7 |18358 |A |[email protected]|Environment|95 |[[18340,A,90,Math], [18354,A,82,Science], [18355,A,80,History], [18356,A,95,Environment], [18356,A,80,English], [18356,A,80,Geography]]|abc|
1. If score of new sub is same as scoreBoard like Environment has 95 in cilumn ans also in scoreBoard then, I don't want to add new column 2. I want to compare all the days with currentDay column and scoreBoard entries which are older than 10 day from currentDay.. I want to delete them (there may be 1 / 2 / n or No old record for deletion) here day = 18340 are old and want to delete those entries.. Is this possible??How can I achieve this using UDF Thanks In advance...
you can modify sort & delete udf
1

This approach is using Spark dataframes/datasets and Spark SQL.

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Row, SparkSession}

object ProcessingList {
  val spark = SparkSession
    .builder()
    .appName("ProcessingList")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id","ProcessingList") // To silence Metrics warning
    .getOrCreate()

  val sc = spark.sparkContext

  val sqlContext = spark.sqlContext

  val input = "/home/cloudera/files/tests/list_processing.json"

  case class Student(cl: Long, grade: String,mail : String,ns: String,score: Long,sbGrade: String, sbScore: Long,sbSubject: String, name: String)

  def main(args: Array[String]): Unit = {

    Logger.getRootLogger.setLevel(Level.ERROR)

    try {
      import spark.implicits._

      val studentTest = sqlContext
        .read
        .json(input)
        .flatMap(r => r.getSeq(5).map( (sq: Row)  => Student(r.getLong(0), r.getString(1), r.getString(2), r.getString(3), r.getLong(4),sq.getString(0),sq.getLong(1), sq.getString(2), r.getString(6)))).as[Student]
        .cache()

      studentTest.show(truncate = false)

      studentTest.createOrReplaceTempView("student_test")

      sqlContext
          .sql(
            """
              |SELECT cl, grade, mail,ns, score, 
              |RANK() OVER(PARTITION BY cl ORDER BY sbScore DESC) AS ranking, 
              |sbGrade,sbScore, sbSubject, name
              |FROM student_test
              |ORDER BY cl
              |""".stripMargin)
          .show(truncate = false)


      // To have the opportunity to view the web console of Spark: http://localhost:4041/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
    } finally {
      sc.stop()
      println("SparkContext stopped")
      spark.stop()
      println("SparkSession stopped")
    }
  }
}

and expected results

+---+-----+-------------+-----------+-----+------+-------+-------+---------+----+
|cl |grade|mail         |ns         |score|points|sbGrade|sbScore|sbSubject|name|
+---+-----+-------------+-----------+-----+------+-------+-------+---------+----+
|7  |A    |[email protected]|Environment|95   |1     |A      |90     |Math     |abc |
|7  |A    |[email protected]|Environment|95   |2     |A      |82     |Science  |abc |
|7  |A    |[email protected]|Environment|95   |3     |A      |80     |History  |abc |
|7  |A    |[email protected]|Environment|95   |3     |A      |80     |English  |abc |
|7  |A    |[email protected]|Environment|95   |3     |A      |80     |Geography|abc |
|7  |A    |[email protected]|Environment|95   |6     |B      |75     |Hindi    |abc |
|8  |A    |[email protected]|Environment|95   |1     |A      |90     |Math     |xyz |
|8  |A    |[email protected]|Environment|95   |2     |A      |87     |Biology  |xyz |
|8  |A    |[email protected]|Environment|95   |3     |A      |85     |Physics  |xyz |
|8  |A    |[email protected]|Environment|95   |4     |A      |80     |Chemistry|xyz |
|8  |A    |[email protected]|Environment|95   |5     |B      |75     |Hindi    |xyz |
|8  |A    |[email protected]|Environment|95   |6     |B      |70     |English  |xyz |
|9  |A    |[email protected]|Environment|95   |1     |A      |95     |Computer |efg |
|9  |A    |[email protected]|Environment|95   |2     |A      |91     |Math     |efg |
|9  |A    |[email protected]|Environment|95   |3     |A      |82     |English  |efg |
|9  |A    |[email protected]|Environment|95   |4     |B      |77     |Physics  |efg |
|9  |A    |[email protected]|Environment|95   |5     |B      |76     |Biology  |efg |
|9  |A    |[email protected]|Environment|95   |6     |B      |72     |Chemistry|efg |
+---+-----+-------------+-----------+-----+------+-------+-------+---------+----+

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.