3
\$\begingroup\$

I have some use cases where I have small parquet files in Hadoop, say, 10-100 MB. I would to compact them so as to have files at least say 100 MB or 200 MB.

The logic of my code is to: * find a partition to compact then get the data from that partition and load it into a dataframe * save that dataframe into a temporary location with a small coalesce number * load the data into the location of the hive table

val tblName = args(0)
val explHdfs = args(1)
val tmpHdfs = args(2)
val numCoalesce = args(3).toInt
val partitionCompact = args(4).toInt
val removeDuplicates = args(5).toBoolean
println(s"tbl $tblName")
println(s"expl hdfs $explHdfs")
println(s"temp hdfs $tmpHdfs")
println(s"num coalesce $numCoalesce")
var sparkConf = new SparkConf().setAppName("Spark Compaction Load in Path") //.setMaster("local[2]")
sparkConf.set("spark.hadoop.parquet.enable.summary-metadata", "false")
var sc = new SparkContext(sparkConf)
sc.setLogLevel("ERROR")
var hiveContext = new HiveContext(sc)
hiveContext.setConf("hive.exec.dynamic.partition", "true")

hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

//set compression to gzip

hiveContext.setConf("parquet.compression", "GZIP")
val partitionsDf = hiveContext.sql(s"show partitions $tblName").toDF()
//think how to handle if more than one partitioned column
partitionsDf.show(false)
//collect the df I dont we need the year by itself
val countPartitions = partitionsDf.count().toInt
println(s"count the number of partitions $countPartitions")
val partitionsArray = partitionsDf.collect()
for (i <- 0 until countPartitions) {

  println(s"printing min partition next line  partition away $i")
  //2d array
  println(partitionsArray(i)(0))

  //minPartition(i).foreach(println)
}
//get the partition
val partitiitionCompact = partitionsArray(partitionCompact)(0).toString
val colPartitioned = partitiitionCompact.split("=")(1)

val explPartition = explHdfs + partitiitionCompact
println(s"expl partittion $explPartition")
val dfFiltered = if (removeDuplicates) hiveContext.sql(s"select * from $tblName where $partitiitionCompact").toDF().distinct()
else hiveContext.sql(s"select * from $tblName where $partitiitionCompact").toDF()
// now we need the column and not the partition
//partitionsDf.coalesce(numCoalesce).write.mode("overwrite").partitionBy(colPartitioned).parquet(tmpHdfs)
println("saved to temp location now loading data in path")
hiveContext.sql(s"load data inpath '$tmpHdfs/$partitionCompact' overwrite into table $tblName partition ($colPartitioned)")
println("finished loading data doing an msck repair table")
hiveContext.sql(s"msck repair table $tblName")

I would like some feedback and am specifically looking for ways to improve the technique or performance of a compactor.

\$\endgroup\$

0

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.