Home PC Games Linux Windows Database Network Programming Server Mobile  
           
  Home \ Server \ Spark read more HBase tables a RDD     - C # mobile side and PC-side data exchange (Database)

- MySQL primary and secondary replicate data inconsistencies (Database)

- Iptables application layer plug (Linux)

- Installation of Gitlab under Ubuntu (Linux)

- Verify the character set on MyCAT (Database)

- Iscsi package is installed on RHEL 6.3 x86-64 systems (Linux)

- Use of the storage-level replication technology will quickly clone a ASM database to the target environment (Database)

- Network security system (Network)

- The correct way of logical backup mysqldump (Database)

- C ++ copy constructor (Programming)

- CentOS7 minimized installation can not find the 'ifconfig' command - Repair Tips (Linux)

- NFS installation process under the CentOS (Linux)

- Based Docker build stand-alone high-availability cluster Hadoop2.7.1 Spark1.7 (Server)

- Oracle 12c detailing the new features (Database)

- Modify grub solve computer startup error: ERROR 17 (Linux)

- Installation under Linux Mint system guidelines for Gtk (Linux)

- Linux terminal program running in the background (Linux)

- Linux Network Security: nmap port scanning software (Linux)

- ORA-4031 error Solution (Database)

- C ++ thread creates transmission parameters are changed (Programming)

 
         
  Spark read more HBase tables a RDD
     
  Add Date : 2018-11-21      
         
         
         
  Environment: Spark-1.5.0 HBase-1.0.0.

Scene: HBase table data stored in accordance talent required to merge data at any time into a RDD to do subsequent calculations.

Try 1: Looking once read multiple tables API, find the closest MultiTableInputFormat is a thing called, it uses the well in MapReduce,

But did not find RDD method for reading HBase.

Try 2: Each table generates an RDD, then the union merger, the code logic is as follows:

var totalRDD = xxx // reads the first table
for {// loop to read the table and into totalRDD
val sRDD = xxx
totalRDD.union (sRDD)
}

Code on the implementation of the cluster, totalRDD union is not the right result, with the var really is not.

Try 3: 2 similar ideas, but to use SparkContext.union once merge multiple RDD, the code logic is as follows:

var rddSet: xxx = Set () // Create a list RDD
dateSet.foreach (date => {// put all tables will RDD list
    val sRDD = xxx
    rddSet + = sRDD
}
val totalRDD = sc.union (rddSet.toSeq) // consolidated list of all RDD

The complete code is as follows:

import java.text.SimpleDateFormat
import org.apache.Hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.rdd.RDD
import org.apache.spark. {SparkContext, SparkConf}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import scala.collection.mutable.Set

/ **
  * Processing time class
  * /
object Htime {
  / **
    * Get a list according to the date of start and end dates
    * For example, start and end time for the 20160118,20160120, the date the list is (20160118,20160119,20160120)
    *
    * @param SDate start date
    * @param EDate end date
    * @return List of dates
    * /
  def getDateSet (sDate: String, eDate: String): Set [String] = {
    // Define a list of dates to be generated
    var dateSet: Set [String] = Set ()

    // Custom date format
    val sdf = new SimpleDateFormat ( "yyyyMMdd")

    // Define the date format according to the top will start and end time into milliseconds
    val sDate_ms = sdf.parse (sDate) .getTime
    val eDate_ms = sdf.parse (eDate) .getTime

    // Calculate the number of milliseconds a day for subsequent iterations
    val day_ms = 24 * 60 * 60 * 1000

    // Loop to generate a list of dates
    var tm = sDate_ms
    while (tm <= eDate_ms) {
      val dateStr = sdf.format (tm)
      dateSet + = dateStr
      tm = tm + day_ms
    }

    // List as a return date
    dateSet
  }
}

/ **
  * Read from the HBase data to calculate the behavior of the crowd Category
  * /
object Classify {
  / **
    * @param Args the command line arguments, the first argument for the behavioral data start date, the second at the end of the day, such as 20,160,118
    * /
  def main (args: Array [String]) {
    // The number of command line arguments must be 2
    if (args.length! = 2) {
      System.err.println ( "Wrong number of arguments")
      System.err.println ( "Usage: Classify ")
      System.exit (1)
    }

    // Get the command line arguments start and end dates of behavioral data
    val startDate = args (0)
    val endDate = args (1)

    // Get the list according to the date of the beginning and ending log
    // For example, start and end time for the 20160118,20160120, the date the list is (20160118,20160119,20160120)
    val dateSet = Htime.getDateSet (startDate, endDate)

    // Spark context
    val sparkConf = new SparkConf (). setAppName ( "Classify")
    val sc = new SparkContext (sparkConf)

    // Initialize HBase configuration
    val conf = HBaseConfiguration.create ()

    // Read by date list of the plurality of RDD in the presence of a Set, and then SparkContext.union () merged into a RDD
    var rddSet: Set [RDD [(ImmutableBytesWritable, Result)]] = Set ()
    dateSet.foreach (date => {
      conf.set (TableInputFormat.INPUT_TABLE, "behaviour_test_" + date) // set the table name
      val bRdd: RDD [(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD (conf, classOf [TableInputFormat],
        classOf [org.apache.hadoop.hbase.io.ImmutableBytesWritable],
        classOf [org.apache.hadoop.hbase.client.Result])
      rddSet + = bRdd
    })
    
    val behavRdd = sc.union (rddSet.toSeq)
    
    behavRdd.collect (). foreach (println)
  }
}
     
         
         
         
  More:      
 
- Using nmcli commands to manage network in RedHat / CentOS 7.x (Linux)
- Share Practical Tutorial GitHub (Linux)
- The Oracle VM VirtualBox use U disk under ubuntu (Linux)
- Two minutes thoroughly so that you understand Android Activity Lifecycle (Programming)
- Oracle table space usage monitoring (Database)
- How to add two-factor authentication for Linux systems SSH (Linux)
- Try the command ip, ifconfig is obsolete under Linux (Linux)
- Oracle 11g logical standby achieve BI needs (Database)
- Security enhancements in Ubuntu ssh service (Linux)
- Linux operating system ARP Spoofing Defense (Linux)
- To install Scribus 1.4.4 under ubuntu (Linux)
- How to Check notebook CPU temperature in Ubuntu (Linux)
- Denyhosts prevent hackers using SSH scanning (Linux)
- Mac OS X command line to submit the local project to Git (Server)
- Linux System Administrator Network Security Experience (Linux)
- Django Web dynamic three linkage (Programming)
- Compile and install the latest version of Redis Stable (Database)
- To share some very useful Vim command (Linux)
- 14.04.3 Ubuntu configuration and successfully compiled source code Android 6.0 r1 (Programming)
- CentOS 6.4 installation environment to build Scrapy 0.22 (Linux)
     
           
     
  CopyRight 2002-2022 newfreesoft.com, All Rights Reserved.