Home PC Games Linux Windows Database Network Programming Server Mobile  
           
  Home \ Server \ Spark read more HBase tables a RDD     - 10 practical Java programming technology (Programming)

- Linux / proc directory Comments (Linux)

- How to install Ubuntu strategy game Wesnoth 1.12.0 (Linux)

- CentOS7 method to upgrade the kernel to 3.18 (Linux)

- Security enhancements in Ubuntu ssh service (Linux)

- CentOS 6 Install Xen4.2 Virtualization Practice (Linux)

- Spring inject a type of object to enumerate (Programming)

- About Git (Linux)

- Examples of Exploration Class File (Programming)

- OpenStack / Dev stack with neutron on Ubuntu 14.04 (Server)

- Linux server is how to do after the invasion (Linux)

- Introduction and use of the Raspberry Pi (Linux)

- LAN in Ubuntu shared folders to Windows (Linux)

- Ubuntu 14.04 LTS 64-bit installation and activation Sublime Text 3 can not solve the Chinese input method to solve the problem (Linux)

- Java class HashSet (Programming)

- VSFTPD Security (Linux)

- Text analysis tools - awk (Linux)

- How to use the beta / unstable version of the software in Debian library (Linux)

- Ubuntu deployed under regular tasks with crontab (Linux)

- RHEL7 unattended automatic installation DHCP + TFTP + SYSLINUX + TFTP + Kickstart (Linux)

 
         
  Spark read more HBase tables a RDD
     
  Add Date : 2018-11-21      
         
         
         
  Environment: Spark-1.5.0 HBase-1.0.0.

Scene: HBase table data stored in accordance talent required to merge data at any time into a RDD to do subsequent calculations.

Try 1: Looking once read multiple tables API, find the closest MultiTableInputFormat is a thing called, it uses the well in MapReduce,

But did not find RDD method for reading HBase.

Try 2: Each table generates an RDD, then the union merger, the code logic is as follows:

var totalRDD = xxx // reads the first table
for {// loop to read the table and into totalRDD
val sRDD = xxx
totalRDD.union (sRDD)
}

Code on the implementation of the cluster, totalRDD union is not the right result, with the var really is not.

Try 3: 2 similar ideas, but to use SparkContext.union once merge multiple RDD, the code logic is as follows:

var rddSet: xxx = Set () // Create a list RDD
dateSet.foreach (date => {// put all tables will RDD list
    val sRDD = xxx
    rddSet + = sRDD
}
val totalRDD = sc.union (rddSet.toSeq) // consolidated list of all RDD

The complete code is as follows:

import java.text.SimpleDateFormat
import org.apache.Hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.rdd.RDD
import org.apache.spark. {SparkContext, SparkConf}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import scala.collection.mutable.Set

/ **
  * Processing time class
  * /
object Htime {
  / **
    * Get a list according to the date of start and end dates
    * For example, start and end time for the 20160118,20160120, the date the list is (20160118,20160119,20160120)
    *
    * @param SDate start date
    * @param EDate end date
    * @return List of dates
    * /
  def getDateSet (sDate: String, eDate: String): Set [String] = {
    // Define a list of dates to be generated
    var dateSet: Set [String] = Set ()

    // Custom date format
    val sdf = new SimpleDateFormat ( "yyyyMMdd")

    // Define the date format according to the top will start and end time into milliseconds
    val sDate_ms = sdf.parse (sDate) .getTime
    val eDate_ms = sdf.parse (eDate) .getTime

    // Calculate the number of milliseconds a day for subsequent iterations
    val day_ms = 24 * 60 * 60 * 1000

    // Loop to generate a list of dates
    var tm = sDate_ms
    while (tm <= eDate_ms) {
      val dateStr = sdf.format (tm)
      dateSet + = dateStr
      tm = tm + day_ms
    }

    // List as a return date
    dateSet
  }
}

/ **
  * Read from the HBase data to calculate the behavior of the crowd Category
  * /
object Classify {
  / **
    * @param Args the command line arguments, the first argument for the behavioral data start date, the second at the end of the day, such as 20,160,118
    * /
  def main (args: Array [String]) {
    // The number of command line arguments must be 2
    if (args.length! = 2) {
      System.err.println ( "Wrong number of arguments")
      System.err.println ( "Usage: Classify ")
      System.exit (1)
    }

    // Get the command line arguments start and end dates of behavioral data
    val startDate = args (0)
    val endDate = args (1)

    // Get the list according to the date of the beginning and ending log
    // For example, start and end time for the 20160118,20160120, the date the list is (20160118,20160119,20160120)
    val dateSet = Htime.getDateSet (startDate, endDate)

    // Spark context
    val sparkConf = new SparkConf (). setAppName ( "Classify")
    val sc = new SparkContext (sparkConf)

    // Initialize HBase configuration
    val conf = HBaseConfiguration.create ()

    // Read by date list of the plurality of RDD in the presence of a Set, and then SparkContext.union () merged into a RDD
    var rddSet: Set [RDD [(ImmutableBytesWritable, Result)]] = Set ()
    dateSet.foreach (date => {
      conf.set (TableInputFormat.INPUT_TABLE, "behaviour_test_" + date) // set the table name
      val bRdd: RDD [(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD (conf, classOf [TableInputFormat],
        classOf [org.apache.hadoop.hbase.io.ImmutableBytesWritable],
        classOf [org.apache.hadoop.hbase.client.Result])
      rddSet + = bRdd
    })
    
    val behavRdd = sc.union (rddSet.toSeq)
    
    behavRdd.collect (). foreach (println)
  }
}
     
         
         
         
  More:      
 
- Installation of Gitlab under Ubuntu (Linux)
- Linux --- process handle limit summary (Linux)
- Iptables use examples (Linux)
- Linux memory management (Linux)
- QBit development of micro-services (Server)
- Ubuntu 14.04 / 13.10 users how to install Ubuntu Touch core applications (Linux)
- Firewall Configuration Red Hat Enterprise Linux 4 (Linux)
- Laravel 4.2 Laravel5 comprehensive upgrade Raiders (Server)
- Create and modify Oracle temporary table space (Database)
- Android LayoutInflater source parsing (Programming)
- How to configure Ceph stored on CentOS 7.0 (Server)
- Java Virtual Machine class loading mechanism and bytecode execution engine (Programming)
- Zabbix monitoring Oracle Database use Orabbix plug (Enhanced Edition) (Database)
- CentOS 5.3 under broadcom NIC dual activation issues (Linux)
- The formatted Linux hard drive and mount (Linux)
- Bash added to the Vi mode indicator (Linux)
- Drawing from the Android source code analysis View (Programming)
- Android Fragment everything you need to know (Programming)
- Echo Command Examples (Linux)
- MySQL service failed to start thinking of settlement under CentOS7 (Database)
     
           
     
  CopyRight 2002-2022 newfreesoft.com, All Rights Reserved.