Home PC Games Linux Windows Database Network Programming Server Mobile  
           
  Home \ Programming \ Storm basic framework for analysis     - 20 Unix / Linux command skills (Linux)

- PHP 7.0 Upgrade Notes (Linux)

- MySQL Authorized a recovered (Database)

- Linux common commands MEMO (Linux)

- Using nmcli commands to manage network in RedHat / CentOS 7.x (Linux)

- SUSE Firewall Configuration (Linux)

- Shuffle Process Arrangement in MapReduce (Server)

- MongoDB, Cassandra, HBase transaction design strategy (Database)

- Nginx Installation and Configuration (Server)

- How to view the Linux QPS (Linux)

- Install Java on RHEL6 (Linux)

- Use nice, cpulimit and cgroups limit cpu usage (Linux)

- Oracle 11g through SCN do incremental backup repair standby library detailed process (Database)

- Linux data redirection (Linux)

- Linux directory configuration (Linux)

- Protobuf compiled and used on the Ubuntu 14.04 (Programming)

- Kali Linux 2.0 U disk installation errors Your installation cd-rom could not be mounted (Linux)

- Ubuntu 14.04 LTS NTFS partition can not access solution (Linux)

- Ubuntu 15.10 15.04 14.10 14.04 Install Ubuntu Tweak (Linux)

- C ++ Supplements - References (Lvalue Reference, Rvalue Reference) (Linux)

 
         
  Storm basic framework for analysis
     
  Add Date : 2017-04-13      
         
         
         
  Background

Early receipt of questions:

1, Topology, we can specify the degree of parallelism spout, bolt, and at the time of submission Topology Storm how to spout, bolt automatically published to each server and control services CPU, disk and other resources?

2, generates a message according to Topology tree when processing messages Storm, Storm how to track each message, how to ensure that messages are not lost and how important message mechanism?

Part I: Storm is how to ensure that at least once semantics
Answer to the first two issues.

Benpian to establish a basic background to look probably constitute some basic framework storm flow of computing power, and partially answer the first question.

Relations worker, executor, task of

worker is a process.
executor is a thread running tasks of the physical container.
Is the task spout / bolt / acker tasks such as logical abstraction.
supervisor will regularly get topology information from the zookeeper topologies, task assignment information assignments and various heart-rate information, as a basis for task allocation.

When synchronization supervisor, according to the new distribution of tasks to start a new worker or close old worker and perform load balancing.

worker information through regular updates connections to other worker it should learn to communicate.

When the worker starts, starts one or more executor threads in accordance with their assigned tasks. These threads will only deal with a unique topology.
If a new tolopogy been submitted to the cluster, nimbus will be re-assigned tasks, they will discuss this later.

executor thread is responsible for handling multiple spouts or more bolts of logic, these spouts or bolts, also referred to as tasks.

Specifically the number of worker, the number of executor, how many of each executor responsible task, and designated by the configuration of parallelism-hint joint decision, but this value is not necessarily equal to the number of actual operation.

If the calculated total exceeds the limit nimbus of executors, this topology will not be executed.

The role of parallelism:

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; Calculation of all tolopogy to executors of topology-id mapping
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(Defn- compute-topology-> executors [nimbus storm-ids]
  "Compute a topology-id -> executors map"
  (Into {} (for [tid storm-ids]
             {Tid (set (compute-executors nimbus tid))})))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; Computing topology-id mapping to executors
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(Defn- compute-executors [nimbus storm-id]
  (Let [conf (: conf nimbus)
        storm-base (.storm-base (: storm-cluster-state nimbus) storm-id nil)
        component-> executors (: component-> executors storm-base)
        storm-conf (read-storm-conf conf storm-id)
        topology (read-storm-topology conf storm-id)
        task-> component (storm-task-info topology storm-conf)]
    (- >> (Storm-task-info topology storm-conf)
         reverse-map
         (Map-val sort)
         (Join-maps component-> executors)
         (Map-val (partial apply partition-fixed))
         (Mapcat second)
         (Map to-executor-id)
         )))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; Computing topology of task-info
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(Defn storm-task-info
  "Returns map from task -> component id"
  [^ StormTopology user-topology storm-conf]
  (-! >> (System-topology storm-conf user-topology)
       all-components
    ;; Get the number of parallel each component
       (Map-val (comp # (get% TOPOLOGY-TASKS) component-conf))
       (Sort-by first)
       (Mapcat (fn [[c num-tasks]] (repeat num-tasks c)))
       (Map (fn [id comp] [id comp]) (iterate (comp int inc) (int 1)))
       (Into {})
       ))
Call the above code in the nimbus task assignments:

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; Nimbus task assignment
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
mk-assignments
-> Compute-new-topology-> executor-> node + port
-> Compute-topology-> executors
-> ...
Threading model and messaging system

When the worker started, in addition to launch multiple threads executor, will launch multiple worker threads to be responsible for messaging.

worker subscribe to the transfer-queue to consume news, but also announced to transfer-queue, such as the need for remote release (a bolt on another process or node).

executor will publish a message to the executor-send-queue for example emit tuple, and it will consume a message from the executor-receive-queue, such as the implementation of ack or fail.

batch-transfer-worker-handler thread subscribe to the executor-send-queue message consumer and announced to the transfer-queue for worker consumption.

transfer-thread subscribe to the transfer-queue message consumption, and is responsible for sending a message through the socket port on the remote node.

worker through the receive-thread thread to collect remote message, and the message locally specified in the message published to the executor corresponding executor-receive-queue. executor by point 3 to consume news.

All of the above message queues are Disruptor Queue, very efficient inter-thread communication framework.

A local publication, refers to the process within and between worker executor threads news release.
The so-called remote release means between worker processes between different machines news release.

Task scheduling and load balancing

nimbus worker will be able to work is called worker-slot.

nimbus Control is the core of the cluster responsible for the overall topology of the submission, health monitoring, load balancing, and reassign tasks, and so on work.
nimbus assigned tasks contains the path (in the nimbus local), tasks, executors and workers topology information the code is located.
worker from the node + port uniquely determined.

supervisor responsible for the actual operation of synchronous worker. A supervisor called a node. The so-called synchronization worker, refers to the response nimbus task scheduling and distribution, build worker's conduct, dispatch and destruction.
Which by topology code downloaded from the nimbus to a local task scheduling.

Task assignment information includes task of mapping information to the worker task -> node + host, so the worker node can be judged accordingly with the information which the remote machine communication.

Cluster state management

The state of the cluster through a storm-cluster-state object to describe.
Which it offers many features interfaces, such as:

Basic Operation zookeeper related, such as create-node, set-data, remove-node, get-children and so on.
Heartbeat interfaces, such as supervisor-heartbeat!, Worker-heatbeat! And so on.
Heartbeat information, such as the executors-beats and the like.
Start, update, stop the storm, such as update-storm! And so on.

Task scheduling based on

zookeeper is the entire cluster state synchronization, coordination of core components.

supervisor, worker, executor and other components will regularly write zookeeper heartbeat.

When the topology errors, or new submission to the cluster topology when, topologies information will be synchronized to the zookeeper.

nimbus regularly monitoring task assignment information on assignments zookeeper, and plans to re-allocate synchronized to zookeeper.

So, nimbus based heartbeat, topologies information and task information has been assigned as the basis, to reassign tasks

The timing of task scheduling

State machine shown above, rebalance and do-reblalance (such as from a web call) triggers mk-assignments That task (re) allocation.

Meanwhile, the nimbus process is started periodically mk-assignments call for load balancing and task assignments.

Clients ... topology submitted by storm jar topology, will be submitted nimbus functionality through thrift interface calls, then will start the storm, and trigger mk-assignments call.

topology submission process

A topology submission process:

Non-native mode, client by calling nimbus thrift interface to upload code to the nimbus and trigger commit.

nimbus task assignment, and the information is synchronized to the zookeeper.

supervisor assignments regularly obtain information, if the topology code is missing, it will download the code from the nimbus, and according to the task assignment information, synchronized worker.

According to the information worker tasks assigned executor start multiple threads simultaneously instantiate spout, bolt, acker other components, this time, waiting for all connections (worker machines and other communication network connection) boot is completed, that is, into the storm-cluster work status.

Unless the call is displayed kill topology, or the spout, bolt and other components will run.

Epilogue

Above, the basic framework set forth the basis for the storm, but did not involve trident mechanism, basically answered the question 1.
     
         
         
         
  More:      
 
- MySQL 5.7.10 source code for the latest version of the installation process in detail (Database)
- The method of MySQL two kinds of incomplete recovery (Database)
- 5 interesting Linux command line tips (Linux)
- Analysis of memory mapping process in Linux x86-64 mode (Linux)
- Nginx DHCP TFTP Kickstart set up automatic installation system (Server)
- C ++ 11 feature: decltype keywords (Programming)
- RabbitMQ Getting Started Tutorial (Linux)
- The source code compiler installation Nginx 1.8.0 under Ubuntu 14.10 (Server)
- Shell array: Define Shell array, the array length (Programming)
- Binary tree to the next node (Programming)
- Using Linux / Unix Text processing (Linux)
- How to install and use the malware detection tool LMD and ClamAV antivirus engine on Linux (Linux)
- CentOS installation pycurl (Linux)
- CUDA (including GPU card driver) installation process under Ubuntu (Linux)
- Linux Bash share tips for getting started (Linux)
- How to remove the files inside the privacy of data on Linux (Linux)
- HTTP and FTP TCP-FLOOD CC Attacks Analysis and Prevention solutions under CentOS 6.5 (Linux)
- Python is not C (Programming)
- MySQL Installation Troubleshooting (Database)
- Vim Common Command Summary (Linux)
     
           
     
  CopyRight 2002-2022 newfreesoft.com, All Rights Reserved.