Home PC Games Linux Windows Database Network Programming Server Mobile  
           
  Home \ Server \ Spark read more HBase tables a RDD     - Squid proxy server configuration under Linux (Server)

- Ubuntu 14.04 install PostgreSQL 9.2 (Database)

- Linux user login and IP restrictions (Linux)

- Ubuntu 15.04 / CentOS 7.0 to set custom boot (Linux)

- Getting the Linux shell variable test (Programming)

- Git Advanced Tutorial (Linux)

- Android Application Development: Genymotion can not start solving (Linux)

- CentOS / Linux kernel upgrade (Linux)

- Ubuntu Tutorial: E: Failed to get lock / var / lib / apt / lists / lock - open (Linux)

- You know the difference between URL, URI and URN among you (Linux)

- Oracle 11G using DG Broker create DataGuard (Database)

- Django Signals from practice to source code analysis (Programming)

- Custom Android UI template Comments (Programming)

- Nginx + ownCloud + PHP + MySQL to build personal private cloud under CentOS7 (Server)

- MongoDB study notes - polymerization (Database)

- Java string intern constant pool resolution Introduction (Programming)

- Installation Android IDE development tools, Android Studio 1.5 under Ubuntu (Linux)

- Fundamentals of the Java virtual machine memory management principles (Programming)

- Linux Getting Started tutorial: How to backup Linux systems (Linux)

- Java multi-threaded communications pipeline flow (Programming)

 
         
  Spark read more HBase tables a RDD
     
  Add Date : 2018-11-21      
         
         
         
  Environment: Spark-1.5.0 HBase-1.0.0.

Scene: HBase table data stored in accordance talent required to merge data at any time into a RDD to do subsequent calculations.

Try 1: Looking once read multiple tables API, find the closest MultiTableInputFormat is a thing called, it uses the well in MapReduce,

But did not find RDD method for reading HBase.

Try 2: Each table generates an RDD, then the union merger, the code logic is as follows:

var totalRDD = xxx // reads the first table
for {// loop to read the table and into totalRDD
val sRDD = xxx
totalRDD.union (sRDD)
}

Code on the implementation of the cluster, totalRDD union is not the right result, with the var really is not.

Try 3: 2 similar ideas, but to use SparkContext.union once merge multiple RDD, the code logic is as follows:

var rddSet: xxx = Set () // Create a list RDD
dateSet.foreach (date => {// put all tables will RDD list
    val sRDD = xxx
    rddSet + = sRDD
}
val totalRDD = sc.union (rddSet.toSeq) // consolidated list of all RDD

The complete code is as follows:

import java.text.SimpleDateFormat
import org.apache.Hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.rdd.RDD
import org.apache.spark. {SparkContext, SparkConf}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import scala.collection.mutable.Set

/ **
  * Processing time class
  * /
object Htime {
  / **
    * Get a list according to the date of start and end dates
    * For example, start and end time for the 20160118,20160120, the date the list is (20160118,20160119,20160120)
    *
    * @param SDate start date
    * @param EDate end date
    * @return List of dates
    * /
  def getDateSet (sDate: String, eDate: String): Set [String] = {
    // Define a list of dates to be generated
    var dateSet: Set [String] = Set ()

    // Custom date format
    val sdf = new SimpleDateFormat ( "yyyyMMdd")

    // Define the date format according to the top will start and end time into milliseconds
    val sDate_ms = sdf.parse (sDate) .getTime
    val eDate_ms = sdf.parse (eDate) .getTime

    // Calculate the number of milliseconds a day for subsequent iterations
    val day_ms = 24 * 60 * 60 * 1000

    // Loop to generate a list of dates
    var tm = sDate_ms
    while (tm <= eDate_ms) {
      val dateStr = sdf.format (tm)
      dateSet + = dateStr
      tm = tm + day_ms
    }

    // List as a return date
    dateSet
  }
}

/ **
  * Read from the HBase data to calculate the behavior of the crowd Category
  * /
object Classify {
  / **
    * @param Args the command line arguments, the first argument for the behavioral data start date, the second at the end of the day, such as 20,160,118
    * /
  def main (args: Array [String]) {
    // The number of command line arguments must be 2
    if (args.length! = 2) {
      System.err.println ( "Wrong number of arguments")
      System.err.println ( "Usage: Classify ")
      System.exit (1)
    }

    // Get the command line arguments start and end dates of behavioral data
    val startDate = args (0)
    val endDate = args (1)

    // Get the list according to the date of the beginning and ending log
    // For example, start and end time for the 20160118,20160120, the date the list is (20160118,20160119,20160120)
    val dateSet = Htime.getDateSet (startDate, endDate)

    // Spark context
    val sparkConf = new SparkConf (). setAppName ( "Classify")
    val sc = new SparkContext (sparkConf)

    // Initialize HBase configuration
    val conf = HBaseConfiguration.create ()

    // Read by date list of the plurality of RDD in the presence of a Set, and then SparkContext.union () merged into a RDD
    var rddSet: Set [RDD [(ImmutableBytesWritable, Result)]] = Set ()
    dateSet.foreach (date => {
      conf.set (TableInputFormat.INPUT_TABLE, "behaviour_test_" + date) // set the table name
      val bRdd: RDD [(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD (conf, classOf [TableInputFormat],
        classOf [org.apache.hadoop.hbase.io.ImmutableBytesWritable],
        classOf [org.apache.hadoop.hbase.client.Result])
      rddSet + = bRdd
    })
    
    val behavRdd = sc.union (rddSet.toSeq)
    
    behavRdd.collect (). foreach (println)
  }
}
     
         
         
         
  More:      
 
- SME Linux network security policy server security (Linux)
- Oracle Enterprise Linux 64-bit install apache-tomcat-7.0.53 step (Server)
- Linux / CentOS 7.0 installation and configuration under Tomcat 8.0 (Server)
- Java Virtual Machine Basics (Programming)
- Adding SSH to Github (Linux)
- Linux SSH commands (Linux)
- RHEL7 system making use of OpenStack mirror (Linux)
- Build ftp server under CentOS 6.5 (Server)
- Upgrade installation manual CentOS6.5 GCC4.8.2 (Linux)
- Implement firewall function on a closed Linux machine (Linux)
- Oracle Bug caused by the collection of AWR Snapshot fault (Database)
- Hadoop namenode do NFS disaster recovery (Server)
- Android memory optimization of the optimal load Bitmap (Linux)
- Linux Security Module (LSM) Introduction (Linux)
- Linux script commands - terminal recorder (Linux)
- Ubuntu disable graphics card (Linux)
- Linux server startup and logon security settings (Linux)
- Git uses a standard process (Linux)
- What is Java EE (Programming)
- Linux Creating a new user error Creating mailbox file: File exists (Linux)
     
           
     
  CopyRight 2002-2022 newfreesoft.com, All Rights Reserved.