|
Environment: Spark-1.5.0 HBase-1.0.0.
Scene: HBase table data stored in accordance talent required to merge data at any time into a RDD to do subsequent calculations.
Try 1: Looking once read multiple tables API, find the closest MultiTableInputFormat is a thing called, it uses the well in MapReduce,
But did not find RDD method for reading HBase.
Try 2: Each table generates an RDD, then the union merger, the code logic is as follows:
var totalRDD = xxx // reads the first table
for {// loop to read the table and into totalRDD
val sRDD = xxx
totalRDD.union (sRDD)
}
Code on the implementation of the cluster, totalRDD union is not the right result, with the var really is not.
Try 3: 2 similar ideas, but to use SparkContext.union once merge multiple RDD, the code logic is as follows:
var rddSet: xxx = Set () // Create a list RDD
dateSet.foreach (date => {// put all tables will RDD list
val sRDD = xxx
rddSet + = sRDD
}
val totalRDD = sc.union (rddSet.toSeq) // consolidated list of all RDD
The complete code is as follows:
import java.text.SimpleDateFormat
import org.apache.Hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.rdd.RDD
import org.apache.spark. {SparkContext, SparkConf}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import scala.collection.mutable.Set
/ **
* Processing time class
* /
object Htime {
/ **
* Get a list according to the date of start and end dates
* For example, start and end time for the 20160118,20160120, the date the list is (20160118,20160119,20160120)
*
* @param SDate start date
* @param EDate end date
* @return List of dates
* /
def getDateSet (sDate: String, eDate: String): Set [String] = {
// Define a list of dates to be generated
var dateSet: Set [String] = Set ()
// Custom date format
val sdf = new SimpleDateFormat ( "yyyyMMdd")
// Define the date format according to the top will start and end time into milliseconds
val sDate_ms = sdf.parse (sDate) .getTime
val eDate_ms = sdf.parse (eDate) .getTime
// Calculate the number of milliseconds a day for subsequent iterations
val day_ms = 24 * 60 * 60 * 1000
// Loop to generate a list of dates
var tm = sDate_ms
while (tm <= eDate_ms) {
val dateStr = sdf.format (tm)
dateSet + = dateStr
tm = tm + day_ms
}
// List as a return date
dateSet
}
}
/ **
* Read from the HBase data to calculate the behavior of the crowd Category
* /
object Classify {
/ **
* @param Args the command line arguments, the first argument for the behavioral data start date, the second at the end of the day, such as 20,160,118
* /
def main (args: Array [String]) {
// The number of command line arguments must be 2
if (args.length! = 2) {
System.err.println ( "Wrong number of arguments")
System.err.println ( "Usage: Classify ")
System.exit (1)
}
// Get the command line arguments start and end dates of behavioral data
val startDate = args (0)
val endDate = args (1)
// Get the list according to the date of the beginning and ending log
// For example, start and end time for the 20160118,20160120, the date the list is (20160118,20160119,20160120)
val dateSet = Htime.getDateSet (startDate, endDate)
// Spark context
val sparkConf = new SparkConf (). setAppName ( "Classify")
val sc = new SparkContext (sparkConf)
// Initialize HBase configuration
val conf = HBaseConfiguration.create ()
// Read by date list of the plurality of RDD in the presence of a Set, and then SparkContext.union () merged into a RDD
var rddSet: Set [RDD [(ImmutableBytesWritable, Result)]] = Set ()
dateSet.foreach (date => {
conf.set (TableInputFormat.INPUT_TABLE, "behaviour_test_" + date) // set the table name
val bRdd: RDD [(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD (conf, classOf [TableInputFormat],
classOf [org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf [org.apache.hadoop.hbase.client.Result])
rddSet + = bRdd
})
val behavRdd = sc.union (rddSet.toSeq)
behavRdd.collect (). foreach (println)
}
} |
|
|
|