Home PC Games Linux Windows Database Network Programming Server Mobile  
           
  Home \ Programming \ Mahout source code analysis: FP-Growth algorithm parallelization     - Ubuntu installation module Python rq (Linux)

- SQLite (Database)

- Ubuntu 14.04 and derivative version of the user on how to install cURL 7.37.1 (Linux)

- Ubuntu install Vendetta Online 14.04 (Linux)

- Service Discovery: Zookeeper vs etcd vs Consul (Server)

- Command-line based tools for better alternatives (Linux)

- GROUP BY extension (Database)

- Puppet subcommands Introduction (Server)

- Debian users to install FFmpeg 2.2.2 (Linux)

- MySQL in order by inaccurate results in problems and solutions (Database)

- To protect the temporary files for Linux security (Linux)

- CentOS-based Kickstart automated installation practice (Linux)

- CentOS install SystemTap-2.6 (Linux)

- Linux remote landing system (Linux)

- OS X CAOpenGLLayer how to enable OpenGL 3.2 core profile (Programming)

- 12 novice Linux command must learn (Linux)

- The hash function under OpenSSL (Linux)

- ActiveMQ memory settings and flow control (Linux)

- ASP.NET 5 is connected with the Redis server on the Linux platform (Server)

- Eclipse configuration GTK (Linux)

 
         
  Mahout source code analysis: FP-Growth algorithm parallelization
     
  Add Date : 2018-11-21      
         
         
         
  FP-Growth is a process commonly used for correlation analysis algorithm for mining frequent item. Compared with Aprior algorithm, FP-Growth algorithm takes the form of a prefix tree to represent data, reducing the number of scanning the transaction database by recursively generating conditions FP-tree to mining frequent items. References [1] detailed analysis of the process. In fact, when faced with large amounts of data, FP-Growth algorithm generates FP-tree is too large to put in the memory, tap into frequent items may also have more than one index. This paper will analyze how to parallelize FP-Growth algorithm and Mahout parallelize FP-Growth algorithm source code.

1. parallelization FP-Growth

 Parallelization FP-Growth There are many ways, the first to propose the use of MapReduce parallel algorithm of FP-Growth should come from Google Beijing Research of Haoyuan Li et al. (See reference [2]). They proposed to use three MapReduce to parallelize FP-Growth, the whole process can be divided into five steps:

Step 1: Sharding

To equalize read and write performance of the entire cluster, the transaction database data into a number of segments (shard), P stored in a node.

Step 2: Parallel Counting

Similar WordCount, through a MapReduce to compute each item (item) level of support. Specifically, each mapper from hdfs acquired several pieces of data transaction database (shards), so the mapper's input is , Ti represents the data of a data segment. For each item in the Ti aj, mapper output . When the cluster mapper all processed data, all key = aj value pairs will be assigned to the same reducer, so the reducer input is . reducer only once summed, then output . You will eventually receive a degree of support in descending order according to the list, called F-List:
Step 3: Grouping Items

The F-List of items (item) into Q groups (group), each group has a unique group-id, and we'll all its corresponding group-id referred to as G-List.

Step 4: Parallel FP-Growth

This step is crucial to parallelize FP-Growth, but also the whole algorithm is relatively difficult to understand part. This step will be to use a MapReduce. Each mapper input from the first step of generating data fragments, so the mapper's input is . Before dealing with these pieces of data, mapper reads the third step the resulting G-List. G-List is actually a Hashmap, is a key item, the corresponding value is the group-id item, and generally not very large amount of space, can be placed in memory. The last item from Ti forward scanning, or scan from right to left, G-List if aL in the corresponding group-id is the first to be scanned, the output {a0, a1, ..., aL}, otherwise it does not output any data. In the data, for example, if the support threshold is 1, Q 3, then the resulting G-List:
Among them, the third column is the group-id. If the mapper is input {milk, eggs, bread, potato chips}, from the last item scanned, the output . After two bread and eggs, the same as their corresponding group-id and crisps, it does not output any data. The first milk, it corresponds to the group-id has not appeared, so the output .

All the same data will be pushed to the group-id with a reducer, so the reducer input is . reducer locally constructed FP-tree, and as the condition recursively build FP-tree like a traditional FP-Growth algorithm, and frequent pattern mining. With the traditional FP-Growth algorithm it is not the same, reducer does not directly output to the mining frequent patterns, but put it in a size of K, established under the support of large root heap sort, and then outputs the K support high frequent patterns: .

Step 5: Aggregating

Previous mining to frequent mode Top K Frequent Patterns already contains all frequent patterns, but the step MapReduce accordance groupID to divide the data, so the key = item corresponding to the frequent mode exists on a number of different groupID of reduce nodes. To merge all key = item key-value pairs, the optimization results show form, you can use the default MapReduce sort of key characteristics of mining frequent patterns to look at treatment: Top K Frequent Patterns in turn each item as a key, and then this output contains the key of a Top K Frequent Patterns. Therefore, the output of each mapper is , reducer summary output of all mapper, and outputs the final result .

 

2. Parallel FP-Growth source code analysis

Mahout provides some classic machine learning algorithm. After Mahout0.9 version has been removed Parallel FP-Growth algorithm. This paper will analyze Mahout0.8 in Parallel FP-Growth source.
FPGrowthDriver.java

FPGrowthDriver is FPGrowth algorithm driver class inherits from AbstractJob class. Run Hadoop tasks are usually performed from the command line bin / hadoop script, passing parameters. ToolRunner class GenericOptionsParser can get these command-line parameters. AbstractJob class encapsulates addInputOption, addOutputOption, addOption, parseArguments and other methods to help parse the command line arguments provided. params object stores the parameters required for the whole algorithm. FPGrowthDriver according to the command-line parameters, if the order of execution, call the runFPGrowth method within the file, if parallelize the execution, the call PFPGrowth.java file runPFPGrowth method.

 1 public final class FPGrowthDriver extends AbstractJob {
 2
 3 private static final Logger log = LoggerFactory.getLogger (FPGrowthDriver.class);
 4
 5 private FPGrowthDriver () {
 6}
 7
 8 public static void main (String [] args) throws Exception {
 9 // static methods ToolRunner the run (there GenericOptionsParser) inside. By GenericOptionsParser.getRemainingArgs () can get passed command line parameters. After, ToolRunner.run () will call FPGrowthDriver.run ().
10 ToolRunner.run (new Configuration (), new FPGrowthDriver (), args);
11}
12
13 / **
14 * Run TopK FPGrowth given the input file,
Fifteen */
16 @Override
17 public int run (String [] args) throws Exception {
18 addInputOption (); // add the default input directory path
19 addOutputOption (); // add the default output directory path
20
21 addOption ( "minSupport", "s", "(Optional) The minimum number of times a co-occurrence must be present."
22 + "Default Value: 3", "3"); // add support threshold
23 addOption ( "maxHeapSize", "k", "(Optional) Maximum Heap Size k, to denote the requirement to mine top K items."
24 + "Default value: 50", "50"); // add large root heap size
25 addOption ( "numGroups", "g", "(Optional) Number of groups the features should be divided in the map-reduce version."
26 + "Does not work in sequential version Default Value:" + PFPGrowth.NUM_GROUPS_DEFAULT,
27 Integer.toString (PFPGrowth.NUM_GROUPS_DEFAULT)); // add the group number g
28 addOption ( "splitterPattern", "regex", "Regular Expression pattern used to split given string transaction into"
29 + ".. Itemsets Default value splits comma separated itemsets Default Value:"
30 + "\" [, \ t] * [, | \ t] [, \ t] * "", "[, t] * [, | t] [, t] *" ); // add separators
31 addOption ( "numTreeCacheEntries", "tc", "(Optional) Number of entries in the tree cache to prevent duplicate"
32 + "tree building. (Warning) a first level conditional FP-Tree might consume a lot of memory,"
33 + "so keep this value small, but big enough to prevent duplicate tree building."
34 + "Default Value: 5 Recommended Values: [5-10]", "5");
35 addOption ( "method", "method", "Method of processing: sequential | mapreduce", "sequential"); // add training methods, the order of execution or parallel execution
36 addOption ( "encoding", "e", "(Optional) The file encoding Default value: UTF-8.", "UTF-8"); // add encoding
37 addFlag ( "useFPG2", "2", "Use an alternate FPG implementation");
38
39 // If the parsed command line parameters fails, quit
40 if (parseArguments (args) == null) {
41 return -1;
42}
43
44 Parameters params = new Parameters ();
forty five
46 if (hasOption ( "minSupport")) {
47 String minSupportString = getOption ( "minSupport");
48 params.set ( "minSupport", minSupportString);
49}
50 if (hasOption ( "maxHeapSize")) {
51 String maxHeapSizeString = getOption ( "maxHeapSize");
52 params.set ( "maxHeapSize", maxHeapSizeString);
53}
54 if (hasOption ( "numGroups")) {
55 String numGroupsString = getOption ( "numGroups");
56 params.set ( "numGroups", numGroupsString);
57}
58
59 if (hasOption ( "numTreeCacheEntries")) {
60 String numTreeCacheString = getOption ( "numTreeCacheEntries");
61 params.set ( "treeCacheSize", numTreeCacheString);
62}
63
64 if (hasOption ( "splitterPattern")) {
65 String patternString = getOption ( "splitterPattern");
66 params.set ( "splitPattern", patternString);
67}
68
69 String encoding = "UTF-8";
70 if (hasOption ( "encoding")) {
71 encoding = getOption ( "encoding");
72}
73 params.set ( "encoding", encoding);
74
75 if (hasOption ( "useFPG2")) {
76 params.set (PFPGrowth.USE_FPG2, "true");
77}
78
79 Path inputDir = getInputPath ();
80 Path outputDir = getOutputPath ();
81
82 params.set ( "input", inputDir.toString ());
83 params.set ( "output", outputDir.toString ());
84
85 String classificationMethod = getOption ( "method");
86 if ( "sequential" .equalsIgnoreCase (classificationMethod)) {
87 runFPGrowth (params);
88} else if ( "mapreduce" .equalsIgnoreCase (classificationMethod)) {
89 Configuration conf = new Configuration ();
90 HadoopUtil.delete (conf, outputDir);
91 PFPGrowth.runPFPGrowth (params);
92}
93
94 return 0;
95}
PFPGrowth.java

PFPGrowth parallel algorithm of FP-Growth driver class. runPFPGrowth initialized within (params) method of a Configuration object after calling runPFPGrowth (params, conf) method. runPFPGrowth (params, conf) comprising the FP-Growth algorithm parallelization of five key steps. Wherein, startParallelCounting (params, conf) corresponding to Step1 and Step2, statistical support for each one of a similar WordCount method, the output result will be readFList () and saveList () is used to generate FList. Thereafter, in accordance with the user input command line parameters NUM_GROUPS to calculate the number of each group contains items, and store it in params. startParallelFPGrowth (params, conf) corresponding to Step3 and Step4. startAggregating (params, conf) correspond to Step5.

 1 public static void runPFPGrowth (Parameters params, Configuration conf) throws IOException, InterruptedException, ClassNotFoundException {
 2 conf.set ( "io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization," + "org.apache.hadoop.io.serializer.WritableSerialization");
 3
 4 startParallelCounting (params, conf); // corresponds Step1 and Step2
 5
 6 // save feature list to dcache
 7 List > fList = readFList (params);
 8 saveFList (fList, params, conf);
 9
10 // set param to control group size in MR jobs
11 int numGroups = params.getInt (NUM_GROUPS, NUM_GROUPS_DEFAULT);
12 int maxPerGroup = fList.size () / numGroups;
13 if (fList.size ()% numGroups! = 0) {
14 maxPerGroup ++;
Fifteen }
16 params.set (MAX_PER_GROUP, Integer.toString (maxPerGroup));
17
18 startParallelFPGrowth (params, conf); // corresponds Step3 and Step4
19
20 startAggregating (params, conf); // corresponds Step5
twenty one }
startParallelCounting method initializes a Job object. The Job object calls ParallelCountingMapper ParallelCountingReducer and complete support to statistics.

 1 /**
 2 * Count the frequencies of various features in parallel using Map / Reduce
 3 * /
 4 public static void startParallelCounting (Parameters params, Configuration conf)
 5 throws IOException, InterruptedException, ClassNotFoundException {
 6 conf.set (PFP_PARAMETERS, params.toString ());
 7
 8 conf.set ( "mapred.compress.map.output", "true");
 9 conf.set ( "mapred.output.compression.type", "BLOCK");
10
11 String input = params.get (INPUT);
12 Job job = new Job (conf, "Parallel Counting Driver running over input:" + input);
13 job.setJarByClass (PFPGrowth.class);
14
15 job.setOutputKeyClass (Text.class);
16 job.setOutputValueClass (LongWritable.class);
17
18 FileInputFormat.addInputPath (job, new Path (input));
19 Path outPath = new Path (params.get (OUTPUT), PARALLEL_COUNTING);
20 FileOutputFormat.setOutputPath (job, outPath);
twenty one
22 HadoopUtil.delete (conf, outPath);
twenty three
24 job.setInputFormatClass (TextInputFormat.class);
25 job.setMapperClass (ParallelCountingMapper.class);
26 job.setCombinerClass (ParallelCountingReducer.class);
27 job.setReducerClass (ParallelCountingReducer.class);
28 job.setOutputFormatClass (SequenceFileOutputFormat.class);
29
30 boolean succeeded = job.waitForCompletion (true);
31 if (! Succeeded) {
32 throw new IllegalStateException ( "Job failed!");
33}
34
35}
ParallelCountingMapper.java

Enter the map ParallelCountingMapper methods are byte offset offset transaction database and a row in the data input. All input data in multiple occurrences are considered to occur only once, so after the input data stored in the HashSet split. Output map method is .
1 public class ParallelCountingMapper extends Mapper {
 2
 3 private static final LongWritable ONE = new LongWritable (1);
 4
 5 private Pattern splitter;
 6
 7 @Override
 8 protected void map (LongWritable offset, Text input, Context context) throws IOException, InterruptedException {
 9
10 String [] items = splitter.split (input.toString ());
11 Set uniqueItems = Sets.newHashSet (Arrays.asList (items));
12 for (String item: uniqueItems) {
13 if (item.trim (). IsEmpty ()) {
14 continue;
Fifteen }
16 context.setStatus ( "Parallel Counting Mapper:" + item);
17 context.write (new Text (item), ONE);
18}
19}
20
21 @Override
22 protected void setup (Context context) throws IOException, InterruptedException {
23 super.setup (context);
24 Parameters params = new Parameters (. Context.getConfiguration () get (PFPGrowth.PFP_PARAMETERS, ""));
25 splitter = Pattern.compile (params.get (PFPGrowth.SPLIT_PATTERN, PFPGrowth.SPLITTER.toString ()));
26}
27}
ParallelCountingReducer.java

Enter ParallelCountingReducer reduce the process is . All key = item key-value pair will be assigned to a machine, it only needs to traverse the summation values ​​can be determined support of the item.

 1 public class ParallelCountingReducer extends Reducer {
 2
 3 @Override
 4 protected void reduce (Text key, Iterable values, Context context) throws IOException,
 5 InterruptedException {
 6 long sum = 0;
 7 for (LongWritable value: values) {
 8 context.setStatus ( "Parallel Counting Reducer:" + key);
 9 sum + = value.get ();
10}
11 context.setStatus ( "Parallel Counting Reducer:" + key + "=>" + sum);
12 context.write (key, new LongWritable (sum));
13
14}
Fifteen }
PFPGrowth.java

You can get the output path ParallelCountingReducer by params in OUTPUT parameters. Uses several data structures readFList this method. Pair implements the Comparable interface and the Serializable interface, the data members of the first and second are used to indicate the corresponding item and item support. PriorityQueue is a balanced binary tree with a small pile top, if you specify a Comparator, Comparator will be in accordance with the elements of PriorityQueue sort Comparator if not specified, according to the Comparable interface elements to achieve order. In parallel of FP-Growth algorithm, the initialization PriorityQueue specify a Comparator, which according to the first element Pair sort, if the first element are equal, according to the second element to be sorted. By initializing SequenceFileDirIterable to traverse the last MapReduce output result, each time adding to PriorityQueue Pair sort of simultaneously. Finally, one by one PriorityQueue the elements taken into fList. Therefore, fList is a decreasing support in accordance with the list.

 1 /**
 2 * read the feature frequency List which is built at the end of the Parallel counting job
 3 *
 4 * @return Feature Frequency List
 5 * /
 6 public static List > readFList (Parameters params) {
 7 int minSupport = Integer.valueOf (params.get (MIN_SUPPORT, "3"));
 8 Configuration conf = new Configuration ();
 9
10 Path parallelCountingPath = new Path (params.get (OUTPUT), PARALLEL_COUNTING);
11
12 PriorityQueue > queue = new PriorityQueue > (11,
13 new Comparator > () {
14 @Override
15 public int compare (Pair o1, Pair o2) {
16 int ret = o2.getSecond () compareTo (o1.getSecond ()).;
17 if (ret! = 0) {
18 return ret;
19}
20 return o1.getFirst () compareTo (o2.getFirst ()).;
twenty one }
twenty two });
twenty three
24 for (Pair record
25: new SequenceFileDirIterable (new Path (parallelCountingPath, FILE_PATTERN),
26 PathType.GLOB, null, null, true, conf)) {
27 long value = record.getSecond () get ().;
28 if (value> = minSupport) {
29 queue.add (new Pair (record.getFirst () toString (), value).);
30}
31}
32 List > fList = Lists.newArrayList ();
33 while (! Queue.isEmpty ()) {
34 fList.add (queue.poll ());
35}
36 return fList;
37}
As has been generated fList, the output of the last MapReduce has no use, therefore, saveFList method first delete these files. After, saveFList method flist written to hdfs. For files stored on the hdfs, DistributedCache provides the functionality of the cache file can be copied before the Slave Node calculation file hdfs on to these sites.

 1 /**
 2 * Serializes the fList and returns the string representation of the List
 3 * /
 4 public static void saveFList (Iterable > flist, Parameters params, Configuration conf)
 5 throws IOException {
 6 Path flistPath = new Path (params.get (OUTPUT), F_LIST);
 7 FileSystem fs = FileSystem.get (flistPath.toUri (), conf);
 8 flistPath = fs.makeQualified (flistPath);
 9 HadoopUtil.delete (conf, flistPath);
10 SequenceFile.Writer writer = new SequenceFile.Writer (fs, conf, flistPath, Text.class, LongWritable.class);
11 try {
12 for (Pair pair: flist) {
13 writer.append (new Text (pair.getFirst ()), new LongWritable (pair.getSecond ()));
14}
15} finally {
16 writer.close ();
17}
18 DistributedCache.addCacheFile (flistPath.toUri (), conf);
19}
startParallelFPGrowth method initializes a Job object. The Job object calls ParallelFPGrowthMapper and ParallelFPGrowthReducer to achieve Step3 and Step4.

 1 /**
 2 * Run the Parallel FPGrowth Map / Reduce Job to calculate the Top K features of group dependent shards
 3 * /
 4 public static void startParallelFPGrowth (Parameters params, Configuration conf)
 5 throws IOException, InterruptedException, ClassNotFoundException {
 6 conf.set (PFP_PARAMETERS, params.toString ());
 7 conf.set ( "mapred.compress.map.output", "true");
 8 conf.set ( "mapred.output.compression.type", "BLOCK");
 9 Path input = new Path (params.get (INPUT));
10 Job job = new Job (conf, "PFP Growth Driver running over input" + input);
11 job.setJarByClass (PFPGrowth.class);
12
13 job.setMapOutputKeyClass (IntWritable.class);
14 job.setMapOutputValueClass (TransactionTree.class);
Fifteen
16 job.setOutputKeyClass (Text.class);
17 job.setOutputValueClass (TopKStringPatterns.class);
18
19 FileInputFormat.addInputPath (job, input);
20 Path outPath = new Path (params.get (OUTPUT), FPGROWTH);
21 FileOutputFormat.setOutputPath (job, outPath);
twenty two
23 HadoopUtil.delete (conf, outPath);
twenty four
25 job.setInputFormatClass (TextInputFormat.class);
26 job.setMapperClass (ParallelFPGrowthMapper.class);
27 job.setCombinerClass (ParallelFPGrowthCombiner.class);
28 job.setReducerClass (ParallelFPGrowthReducer.class);
29 job.setOutputFormatClass (SequenceFileOutputFormat.class);
30
31 boolean succeeded = job.waitForCompletion (true);
32 if (! Succeeded) {
33 throw new IllegalStateException ( "Job failed!");
34}
35}
ParallelFPGrowthMapper.java

ParallelFPGrowthMapper the setup method will be run before the map method. setup process called readFList method. Note that the method readFList readFList method parameters before the analysis is different, so are two completely different approaches. Here readFList method HadoopUtil.getCachedFiles (conf) to get the cache files flist, store it in fMap, wherein the item as fMap key, item number in the position as the value fMap flist in, for example, the first item flist which will be in fMap . Need to use this position number when doing this because after fMap points Q a group. In the map method, the input is a byte offset and the transaction database data in a row. According to user-specified delimiter splitter to split the data. To filter non-frequent items by fMap.containsKey (item) method to find out whether in the present in fList. If there is, it corresponds to position number will be added to itemSet, otherwise it is discarded. itemArr itemSet copy data in, and in accordance with the position incremented sort that sort descending according to the degree of support. After the for loop from the last element itemArr traversal forward, if it corresponds to groupID not groups, then initialized TransactionTree, will itemArr [0], itemArr [1], ..., itemArr [j] stored in the TransactionTree in. groupID calculation is very simple, the position number can be divided by maxPerGroup. TransactionTree realized Writable and Iterable > interface initialization TransactionTree, the constructor parameters assigned to TransactionTree data member List > transactionSet. Pair two elements here are stored in the object list and No. 1 position.

 1 /**
 2 * maps each transaction to all unique items groups in the transaction. Mapper
 3 * outputs the group id as key and the transaction as value
 4 *
 5 * /
 6 public class ParallelFPGrowthMapper extends Mapper {
 7
 8 private final OpenObjectIntHashMap fMap = new OpenObjectIntHashMap ();
 9 private Pattern splitter;
10 private int maxPerGroup;
11 private final IntWritable wGroupID = new IntWritable ();
12
13 @Override
14 protected void map (LongWritable offset, Text input, Context context)
15 throws IOException, InterruptedException {
16
17 String [] items = splitter.split (input.toString ());
18
19 OpenIntHashSet itemSet = new OpenIntHashSet ();
20
21 for (String item: items) {
22 if (fMap.containsKey (item) &&! Item.trim (). IsEmpty ()) {
23 itemSet.add (fMap.get (item));
twenty four }
25}
26
27 IntArrayList itemArr = new IntArrayList (itemSet.size ());
28 itemSet.keys (itemArr);
29 itemArr.sort ();
30
31 OpenIntHashSet groups = new OpenIntHashSet ();
32 for (int j = itemArr.size () - 1; j> = 0; j--) {
33 // generate group dependent shards
34 int item = itemArr.get (j);
35 int groupID = PFPGrowth.getGroup (item, maxPerGroup);
36
37 if (! Groups.contains (groupID)) {
38 IntArrayList tempItems = new IntArrayList (j + 1);
39 tempItems.addAllOfFromTo (itemArr, 0, j);
40 context.setStatus ( "Parallel FPGrowth: Generating Group Dependent transactions for:" + item);
41 wGroupID.set (groupID);
42 context.write (wGroupID, new TransactionTree (tempItems, 1L));
43}
44 groups.add (groupID);
forty five }
46
47}
48
49 @Override
50 protected void setup (Context context) throws IOException, InterruptedException {
51 super.setup (context);
52
53 int i = 0;
54 for (Pair e: PFPGrowth.readFList (context.getConfiguration ())) {
55 fMap.put (e.getFirst (), i ++);
56}
57
58 Parameters params =
59 new Parameters (. Context.getConfiguration () get (PFPGrowth.PFP_PARAMETERS, ""));
60
61 splitter = Pattern.compile (params.get (PFPGrowth.SPLIT_PATTERN,
62 PFPGrowth.SPLITTER.toString ()));
63
64 maxPerGroup = params.getInt (PFPGrowth.MAX_PER_GROUP, 0);
65}
66}
ParallelFPGrowthReducer.java

 ParallelFPGrowthReducer input is . method to get the setup parameters params, and cache files flist acquired by PFPGrowth.readFList (conf) method, the frequent items stored featureReverseMap, frequently for the entry into the support freqList. Before analyzing the TransactionTree ParallelFPGrowthMapper output is actually List > transactionSet. ParallelFPGrowthReducer initialized within a TransactionTree, although this TransactionTree previous Transaction is the same class, but it is a two-dimensional array to achieve tree. Taking into account the length of the article, the achievements of the process will not be analyzed here. Assumed to have been built this tree, cTree.generateFList tree traversal method, return Map frequencyList. Specific traversal methods here will not be analyzed in detail, the way it calls the procedure: TransactionTree implement Iterator > Interface rewrite the iterator method, the method in generateFList generated by an iterator iterator way to traverse the entire tree . iterator method returns TransactionTreeIterator object. TransactionTreeIterator object inherits from AbstractIterator >, realized the TransactionTree traverse. localFList merger and in accordance with the results of generateFList support descending order. There are two methods to generate frequent patterns, the user can choose to call FPGrowthIds.generateTopKFrequentPatterns method or fpGrowth.generateTopKFrequentPatterns way to generate frequent patterns, this article will analyze the latter. There is also a IteratorAdapter in ParallelFPGrowthReducer class. It is a very classic design mode specific application adapter mode, two different types of iterators can be decoupled. ParallelFPGrowthReducer output is .

  1 /**
  2 * takes each group of transactions and runs Vanilla FPGrowth on it and
  3 * outputs the the Top K frequent Patterns for each group.
  4 *
  5 * /
  6 public final class ParallelFPGrowthReducer extends Reducer {
  7
  8 private final List featureReverseMap = Lists.newArrayList ();
  9 private final LongArrayList freqList = new LongArrayList ();
 10 private int maxHeapSize = 50;
 11 private int minSupport = 3;
 12 private int numFeatures;
 13 private int maxPerGroup;
 14 private boolean useFP2;
 Fifteen
 16 private static final class IteratorAdapter implements Iterator , Long >> {
 17 private final Iterator > innerIter;
 18
 19 private IteratorAdapter (Iterator > transactionIter) {
 20 innerIter = transactionIter;
 twenty one }
 twenty two
 23 @Override
 24 public boolean hasNext () {
 25 return innerIter.hasNext ();
 26}
 27
 28 @Override
 29 public Pair , Long> next () {
 30 Pair innerNext = innerIter.next ();
 31 return new Pair , Long> (innerNext.getFirst () toList (), innerNext.getSecond ().);
 32}
 33
 34 @Override
 35 public void remove () {
 36 throw new UnsupportedOperationException ();
 37}
 38}
 39
 40 @Override
 41 protected void reduce (IntWritable key, Iterable values, Context context) throws IOException {
 42 TransactionTree cTree = new TransactionTree ();
 43 for (TransactionTree tr: values) {
 44 for (Pair p: tr) {
 45 cTree.addPattern (p.getFirst (), p.getSecond ());
 46}
 47}
 48
 49 List > localFList = Lists.newArrayList ();
 50 for (Entry fItem:. CTree.generateFList () entrySet ()) {
 51 localFList.add (new Pair (fItem.getKey (), fItem.getValue () toLong ()).);
 52}
 53
 54 Collections.sort (localFList, new CountDescendingPairComparator ());
 55
 56 if (useFP2) {
 57 FPGrowthIds.generateTopKFrequentPatterns (
 58 cTree.iterator (),
 59 freqList,
 60 minSupport,
 61 maxHeapSize,
 62 PFPGrowth.getGroupMembers (key.get (), maxPerGroup, numFeatures),
 63 new IntegerStringOutputConverter (
 64 new ContextWriteOutputCollector (context),
 65 featureReverseMap),
 66 new ContextStatusUpdater (context));
 67} else {
 68 FPGrowth fpGrowth = new FPGrowth ();
 69 fpGrowth.generateTopKFrequentPatterns (
 70 new IteratorAdapter (cTree.iterator ()),
 71 localFList,
 72 minSupport,
 73 maxHeapSize,
 74 Sets.newHashSet (PFPGrowth.getGroupMembers (key.get (),
 75 maxPerGroup,
 76 numFeatures) .toList ()),
 77 new IntegerStringOutputConverter (
 78 new ContextWriteOutputCollector (context),
 79 featureReverseMap),
 80 new ContextStatusUpdater (context));
 81}
 82}
 83
 84 @Override
 85 protected void setup (Context context) throws IOException, InterruptedException {
 86
 87 super.setup (context);
 88 Parameters params = new Parameters (. Context.getConfiguration () get (PFPGrowth.PFP_PARAMETERS, ""));
 89
 90 for (Pair e: PFPGrowth.readFList (context.getConfiguration ())) {
 91 featureReverseMap.add (e.getFirst ());
 92 freqList.add (e.getSecond ());
 93}
 94
 95 maxHeapSize = Integer.valueOf (params.get (PFPGrowth.MAX_HEAPSIZE, "50"));
 96 minSupport = Integer.valueOf (params.get (PFPGrowth.MIN_SUPPORT, "3"));
 97
 98 maxPerGroup = params.getInt (PFPGrowth.MAX_PER_GROUP, 0);
 99 numFeatures = featureReverseMap.size ();
100 useFP2 = "true" .equals (params.get (PFPGrowth.USE_FPG2));
101}
TransactionTree.java

Before analyzing fpGrowth.generateTopKFrequentPatterns method, first to analyze the achievements addPattern method used in the process. The following code shows the TransactionTree addPattern data members and methods. In addPattern method, first, starting from the root node and myList in comparison. Child node childWithAttribute return temp node whether there attributeValue name and the same node. If not, addCountMode set to false, myList add the remaining nodes to this tree; and if so, to increase support child nodes by addCount methods. Exactly the same ideas and the achievements of the traditional FP-Growth in the contribution of ideas.

 1 private int [] attribute; // name of the attribute node
 2 private int [] childCount; // this node how many child nodes are counted
 3 private int [] [] nodeChildren; // two-dimensional array, recording the child node of each node
 4 private long [] nodeCount; // support count of the current node
 5 private int nodes;
 6 private boolean representedAsList; // true representation to show the form of List, false representation in the form of a tree showing
 7 private List > transactionSet;
 8
 9 public int addPattern (IntArrayList myList, long addCount) {
10 int temp = ROOTNODEID;
11 int ret = 0;
12 boolean addCountMode = true;
13 for (int idx = 0; idx 14 int attributeValue = myList.get (idx);
15 int child;
16 if (addCountMode) {
17 child = childWithAttribute (temp, attributeValue);
18 if (child == -1) {
19 addCountMode = false;
20} else {
21 addCount (child, addCount);
22 temp = child;
twenty three }
twenty four }
25 if (! AddCountMode) {
26 child = createNode (temp, attributeValue, addCount);
27 temp = child;
28 ret ++;
29}
30}
31 return ret;
32}
FPGrowth.java

Shaped generateTopKFrequentPatterns method parameters have transactionStream, frequencyList, minSupport, k, Collection < A > returnableFeatures, OutputCollector < A, List < Pair < List < A >, Long > > > output, Statusupdater updater. Wherein, transactionStream is based on the current key = groupID corresponding Pair < List < A >, Long > type values ​​established cTree, the first number is the position here Pair, the second term is 1; frequencyList is ParallelFPGrowthReducer in localFList , is the location of its first number, the second is the support; collection < A > returnableFeatures current key = position number set group-id included.

attributeIdMapping filtered transactionStream non-frequent items and assigns new id frequent items mapped into < key = position number, value = id >. reverseMapping upside down attributeIdMapping mapping. attributeFrequentcy records the index id of support items. For returnableFeatures in position number traversal, filter infrequent items, returnFeatures records remaining frequent items. After calling generateTopKFrequentPatterns approach to building local and FP-tree header table (Header-Table), and FP-tree traversal outputs frequent item. References [1] a detailed analysis of this process, there is no further analysis is required in the form of notes that are stored in Mahout in FP-tree based array.

 1 /**
 2 * Generate Top K Frequent Patterns for every feature in returnableFeatures
 3 * given a stream of transactions and the minimum support
 4 *
 5 * @param transactionStream
 6 * Iterator of transaction
 7 * @param frequencyList
 8 * list of frequent features and their support value
 9 * @param minSupport
10 * minimum support of the transactions
11 * @param k
12 * Number of top frequent patterns to keep
13 * @param returnableFeatures
14 * set of features for which the frequent patterns are mined. If the
15 * set is empty or null, then top K patterns for every frequent item (an item
16 * whose support> minSupport) is generated
17 * @param output
18 * The output collector to which the the generated patterns are
19 * written
20 * @throws IOException
twenty one */
22 public final void generateTopKFrequentPatterns (Iterator < Pair < List < A >, Long > > transactionStream,
23 Collection < Pair < A, Long > > frequencyList,
24 long minSupport,
25 int k,
26 Collection < A > returnableFeatures,
27 OutputCollector < A, List < Pair < List < A >, Long > > > output,
28 StatusUpdater updater) throws IOException {
29
30 Map < Integer, A > reverseMapping = Maps.newHashMap ();
31 Map < A, Integer > attributeIdMapping = Maps.newHashMap ();
32
33 int id = 0;
34 for (Pair < A, Long > feature: frequencyList) {
35 A attrib = feature.getFirst ();
36 Long frequency = feature.getSecond ();
37 if (frequency> = minSupport) {
38 attributeIdMapping.put (attrib, id);
39 reverseMapping.put (id ++, attrib);
40}
41}
42
43 long [] attributeFrequency = new long [attributeIdMapping.size ()];
44 for (Pair < A, Long > feature: frequencyList) {
45 A attrib = feature.getFirst ();
46 Long frequency = feature.getSecond ();
47 if (frequency 48 break;
49}
50 attributeFrequency [attributeIdMapping.get (attrib)] = frequency;
51}
52
53 log.info ( "Number of unique items {}", frequencyList.size ());
54
55 Collection returnFeatures = Sets.newHashSet ();
56 if (returnableFeatures! = Null &&! ReturnableFeatures.isEmpty ()) {
57 for (A attrib: returnableFeatures) {
58 if (attributeIdMapping.containsKey (attrib)) {
59 returnFeatures.add (attributeIdMapping.get (attrib));
60 log.info ( "Adding Pattern {} => {}", attrib, attributeIdMapping
61 .get (attrib));
62}
63}
64} else {
65 for (int j = 0; j 66 returnFeatures.add (j);
67}
68}
69
70 log.info ( "Number of unique pruned items {}", attributeIdMapping.size ());
71 generateTopKFrequentPatterns (new TransactionIterator < A > (transactionStream,
72 attributeIdMapping), attributeFrequency, minSupport, k, reverseMapping
73 .size (), returnFeatures, new TopKPatternsOutputConverter < A > (output,
74 reverseMapping), updater);
75
76}
 AggregatorMapper input is , TopKStringPatterns is a storage < Pair < List < String >, Long > > type list, List < String > type element record every key = item corresponding frequent pattern, Long type element records support.

 1 /**
 2 *
 3 * outputs the pattern for each item in the pattern, so that reducer can group them
 4 * and select the top K frequent patterns
 5 *
 6 * /
 7 public class AggregatorMapper extends Mapper < Text, TopKStringPatterns, Text, TopKStringPatterns > {
 8
 9 @Override
10 protected void map (Text key, TopKStringPatterns values, Context context) throws IOException,
11 InterruptedException {
12 for (Pair < List < String >, Long > pattern: values.getPatterns ()) {
13 for (String item: pattern.getFirst ()) {
14 List < Pair < List < String >, Long > > patternSingularList = Lists.newArrayList ();
15 patternSingularList.add (pattern);
16 context.setStatus ( "Aggregator Mapper: Grouping Patterns for" + item);
17 context.write (new Text (item), new TopKStringPatterns (patternSingularList));
18}
19}
20
twenty one }
twenty two }
Key AggregatorReducer summarizes all the same item, and then follow the descending order of support, the final output Top K a frequent pattern.

 1 /**
 2 *
 3 * groups all Frequent Patterns containing an item and outputs the top K patterns
 4 * containing that particular item
 5 *
 6 * /
 7 public class AggregatorReducer extends Reducer < Text, TopKStringPatterns, Text, TopKStringPatterns > {
 8
 9 private int maxHeapSize = 50;
10
11 @Override
12 protected void reduce (Text key, Iterable < TopKStringPatterns > values, Context context) throws IOException,
13 InterruptedException {
14 TopKStringPatterns patterns = new TopKStringPatterns ();
15 for (TopKStringPatterns value: values) {
16 context.setStatus ( "Aggregator Reducer: Selecting TopK patterns for:" + key);
17 patterns = patterns.merge (value, maxHeapSize);
18}
19 context.write (key, patterns);
20
twenty one }
twenty two
23 @Override
24 protected void setup (Context context) throws IOException, InterruptedException {
25 super.setup (context);
26 Parameters params = new Parameters (. Context.getConfiguration () get ( "pfp.parameters", ""));
27 maxHeapSize = Integer.valueOf (params.get ( "maxHeapSize", "50"));
28
29}
30}
3 Discussion

Parallelization FP-Growth algorithm to solve the traditional FP-Growth performance bottleneck when large amounts of data. In addition to FP-Growth algorithm parallelization, there are many ways to optimize the FP-Growth algorithm, such as parallelism Load balance when FP-Growth algorithm, maximum frequent itemsets and closed frequent item sets represent frequent pattern.

Maximum frequent itemsets
Maximum frequent itemsets is so frequent item set, which are not directly superset frequent. Maximum frequent itemsets forming a minimum term set can export all frequent item sets collection, but not great frequent item set contains a subset of the support of their information.

Closed frequent itemsets
If the direct superset itemsets and it does not have the same support and the support of the set is greater than or equal to the minimum support threshold, then the set is closed frequent item sets. Closed frequent item set provides frequent item sets a minimum representation, the representation is not lost support information.
     
         
         
         
  More:      
 
- Linux Firewall IPCop Profile (Linux)
- Oracle Database asynchronous IO cause slow query response (Database)
- How to Install terminator 0.98 on Ubuntu and Linux Mint (Linux)
- Oracle Data Pump Example (Database)
- How to use Quagga BGP (Border Gateway Protocol) router to filter BGP routing (Linux)
- How Glances monitoring system on Ubuntu (Linux)
- Java Concurrency: synchronized (Programming)
- To compile and install MySQL 5.7.7 RC under CentOS 7.1 (Database)
- Docker ecosystem security is gradually maturing (Server)
- MySQL IO SSD attempt at optimization (Database)
- Linux process or thread is bound to a CPU (Programming)
- Talk about the Linux ABI compatibility Application (Linux)
- Ubuntu users install the video driver Nvidia Driver 334.21 (Linux)
- Linux compiler of GCC (Linux)
- HBase table data processing tab (Database)
- Open remote MySQL database connection managed under CentOS (Database)
- Linux tar compressed exclude a folder (Linux)
- mysqldump MySQL command-line tool (Database)
- How Mutt mail client to use cipher text password (Linux)
- vector C ++ sequence containers (Programming)
     
           
     
  CopyRight 2002-2020 newfreesoft.com, All Rights Reserved.