2

First, I want to say that I am very new at Scala and I am having problems with basic conversion formats... I hope I can get better in functional programming in the near future, so sorry if this is a dumb question

Using Spark SQL I do a query and I get the results in a variable called "probesGroupby"

  val probesGroupby = sqlContext.sql("SELECT id_counter as id_counter, co_mac as co_mac, ts_timestamp as ts_timestamp, max(qt_rssi) as qt_rssi, count(*) as qt_tracks " +
                                     " FROM probes GROUP BY id_counter, co_mac, ts_timestamp")

All right until here. After this, I need to write data into a InfluxDB Database and the API demands this format:

val probeRequest= Series("probeRequest",
  Array("id_counter","co_mac","time","qt_rssi","qt_tracks"),
  Array(
    Array(row[0],row[1],row[2],row[3], row[4]),
    Array(row[0],row[1],row[2],row[3], row[4]),
    Array(row[0],row[1],row[2],row[3], row[4]),
    ...
  )
)
assert(None == client.writeSeries(Array(probeRequest)))

How may I create the variable "probeRequest" with this format so I have an Array with values for each Row returned by the query? I have tried some stuffs but it doesn't seem to work :(

Thank you in advance,

2 Answers 2

1

You'll have to wrap it using Series but otherwise it is as simple as that:

probesGroupby.map(_.toSeq.toArray).collect

or if you prefer more explicit approach you can use pattern matching:

rdd.map { case Row(idCounter, coMac, time, qtRssi, tTracks) => 
    Array(idCounter, coMac, time, qtRssi, tTracks)
} collect
Sign up to request clarification or add additional context in comments.

3 Comments

Thank you very much, Could you please tell me which is the difference between doing toSeq.toArray before/after Collect() ¿?¿?
Well, difference is fundamental. If you collect first and then map as in accepted answer then everything is processed sequentially on a driver side. When mapping first, and collecting afterwards creating arrays is done in parallel on the worker nodes and send to the driver.
Thank you very much for the explanation. I should work more on functional programming. Thanks
1

First you collectdata from RDD to Array on a driver, and then transform each individual Row. If you now the types of columns you can use getInt, getLong etc instead of just get

val probesGroupby: RDD[Row] = ...

val payload: Array[Array[Any]] = probesGroupby.collect().map { row =>
   val idCounter = row.get(0);
   val coMac = row.get(1)
   val time = row.get(2)
   val qtRssi = row.get(3)
   val qtTracks = row.get(4)
   Array(idCounter, coMac, time, qtrs., qtTracks)
   // or just: row.toArray()
}

val probeRequest= Series("probeRequest",
  Array("id_counter","co_mac","time","qt_rssi","qt_tracks"),
  payload,
  ...
 )
 )

1 Comment

Thank you very much, Finally I did like this: val influxData = new ArrayBuffer[Array[Any]]() probesGroupbySecond.collect().foreach(row => // id_counter, co_mac, ts_timestamp(Long), qt_rssi, qt_tracks influxData += Array(row.getInt(0),row(1).toString,row.getLong(2),row.getInt(3),row.getLong(4))) But your way is much cleaner! =)

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.