In questo tutorial imparerai a usare Hadoop con gli esempi di MapReduce. I dati di input utilizzati sono SalesJan2009.csv. Contiene informazioni relative alle vendite come nome del prodotto, prezzo, modalità di pagamento, città, paese del cliente, ecc. L'obiettivo è scoprire il numero di prodotti venduti in ogni paese.
In questo tutorial imparerai-
- Primo programma Hadoop MapReduce
- Spiegazione della classe SalesMapper
- Spiegazione della classe SalesCountryReducer
- Spiegazione della classe SalesCountryDriver
Primo programma Hadoop MapReduce
Ora, in questo tutorial di MapReduce, creeremo il nostro primo programma Java MapReduce:
Assicurati di aver installato Hadoop. Prima di iniziare con il processo vero e proprio, cambia utente in "hduser" (id utilizzato durante la configurazione di Hadoop, puoi passare all'id utente utilizzato durante la configurazione della programmazione di Hadoop).
su - hduser_
Passo 1)
Crea una nuova directory con il nome MapReduceTutorial come mostrato nell'esempio MapReduce di seguito
sudo mkdir MapReduceTutorial
Concedi i permessi
sudo chmod -R 777 MapReduceTutorial
SalesMapper.java
package SalesCountry;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesMapper extends MapReduceBase implements Mapper{private final static IntWritable one = new IntWritable(1);public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {String valueString = value.toString();String[] SingleCountryData = valueString.split(",");output.collect(new Text(SingleCountryData[7]), one);}}
SalesCountryReducer.java
package SalesCountry;import java.io.IOException;import java.util.*;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesCountryReducer extends MapReduceBase implements Reducer{public void reduce(Text t_key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {Text key = t_key;int frequencyForCountry = 0;while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}output.collect(key, new IntWritable(frequencyForCountry));}}
SalesCountryDriver.java
package SalesCountry;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;public class SalesCountryDriver {public static void main(String[] args) {JobClient my_client = new JobClient();// Create a configuration object for the jobJobConf job_conf = new JobConf(SalesCountryDriver.class);// Set a name of the Jobjob_conf.setJobName("SalePerCountry");// Specify data type of output key and valuejob_conf.setOutputKeyClass(Text.class);job_conf.setOutputValueClass(IntWritable.class);// Specify names of Mapper and Reducer Classjob_conf.setMapperClass(SalesCountry.SalesMapper.class);job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);// Specify formats of the data type of Input and outputjob_conf.setInputFormat(TextInputFormat.class);job_conf.setOutputFormat(TextOutputFormat.class);// Set input and output directories using command line arguments,//arg[0] = name of input directory on HDFS, and arg[1] = name of output directory to be created to store the output file.FileInputFormat.setInputPaths(job_conf, new Path(args[0]));FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));my_client.setConf(job_conf);try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}}}
Scarica i file qui
Controlla i permessi dei file di tutti questi file
e se mancano le autorizzazioni di "lettura", concedi le stesse
Passo 2)
Esporta classpath come mostrato nell'esempio di Hadoop seguente
export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"
Passaggio 3)
Compila i file Java (questi file sono presenti nella directory Final-MapReduceHandsOn ). I suoi file di classe verranno inseriti nella directory del pacchetto
javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java
Questo avviso può essere tranquillamente ignorato.
Questa compilazione creerà una directory in una directory corrente denominata con il nome del pacchetto specificato nel file sorgente java (cioè SalesCountry nel nostro caso) e vi inserirà tutti i file di classe compilati.
Passaggio 4)
Crea un nuovo file Manifest.txt
sudo gedit Manifest.txt
aggiungi le seguenti righe,
Main-Class: SalesCountry.SalesCountryDriver
SalesCountry.SalesCountryDriver è il nome della classe principale. Tieni presente che devi premere il tasto Invio alla fine di questa riga.
Passaggio 5)
Crea un file Jar
jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class
Verifica che il file jar sia stato creato
Passaggio 6)
Avvia Hadoop
$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh
Passaggio 7)
Copia il file SalesJan2009.csv in ~ / inputMapReduce
Ora usa il comando seguente per copiare ~ / inputMapReduce su HDFS.
$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /
Possiamo tranquillamente ignorare questo avvertimento.
Verifica se un file viene effettivamente copiato o meno.
$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce
Passaggio 8)
Eseguire il lavoro MapReduce
$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales
Questo creerà una directory di output denominata mapreduce_output_sales su HDFS. Il contenuto di questa directory sarà un file contenente le vendite di prodotti per paese.
Passaggio 9)
Il risultato può essere visualizzato tramite l'interfaccia di comando come,
$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000
I risultati possono essere visualizzati anche tramite un'interfaccia web come-
Apri r in un browser web.
Ora seleziona "Sfoglia il filesystem" e vai a / mapreduce_output_sales
Apri part-r-00000
Spiegazione della classe SalesMapper
In questa sezione, comprenderemo l'implementazione della classe SalesMapper .
1. Iniziamo specificando un nome di pacchetto per la nostra classe. SalesCountry è un nome del nostro pacchetto. Si noti che l'output della compilazione, SalesMapper.class , andrà in una directory denominata con questo nome di pacchetto: SalesCountry .
Successivamente, importiamo pacchetti di librerie.
L'istantanea sottostante mostra un'implementazione della classe SalesMapper-
Spiegazione del codice di esempio:
1. Definizione della classe SalesMapper-
la classe pubblica SalesMapper estende MapReduceBase implementa Mapper
Ogni classe mapper deve essere estesa dalla classe MapReduceBase e deve implementare l' interfaccia Mapper .
2. Definizione della funzione "mappa"
public void map(LongWritable key,Text value,OutputCollectoroutput,Reporter reporter) throws IOException
La parte principale della classe Mapper è un metodo 'map ()' che accetta quattro argomenti.
Ad ogni chiamata al metodo "map ()" , viene passata una coppia chiave-valore ( "chiave" e "valore" in questo codice).
Il metodo 'map ()' inizia dividendo il testo di input ricevuto come argomento. Usa il tokenizer per dividere queste righe in parole.
String valueString = value.toString();String[] SingleCountryData = valueString.split(",");
Qui, "," viene utilizzato come delimitatore.
Successivamente, viene formata una coppia utilizzando un record al 7 ° indice dell'array "SingleCountryData" e un valore "1" .
output.collect (new Text (SingleCountryData [7]), one);
Stiamo scegliendo il record al 7 ° indice perché abbiamo bisogno dei dati del Paese e si trova al 7 ° indice nell'array "SingleCountryData" .
Si prega di notare che i nostri dati di input sono nel formato seguente (dove il Paese è al 7 ° indice, con 0 come indice iniziale) -
Transaction_date, Product, Price, Payment_Type, Name, City, State, Country , Account_Created, Last_Login, Latitude, Longitude
Un output di mapper è di nuovo una coppia chiave-valore che viene emessa utilizzando il metodo "collect ()" di "OutputCollector" .
Spiegazione della classe SalesCountryReducer
In questa sezione, comprenderemo l'implementazione della classe SalesCountryReducer .
1. Iniziamo specificando un nome del pacchetto per la nostra classe. SalesCountry è un nome del pacchetto out. Si noti che l'output della compilazione, SalesCountryReducer.class , andrà in una directory denominata con questo nome di pacchetto: SalesCountry .
Successivamente, importiamo pacchetti di librerie.
L'istantanea sottostante mostra un'implementazione della classe SalesCountryReducer-
Spiegazione del codice:
1. Definizione della classe SalesCountryReducer-
public class SalesCountryReducer estende MapReduceBase implementa Reducer
In questo caso, i primi due tipi di dati, "Testo" e "IntWritable", sono il tipo di dati del valore-chiave di input per il riduttore.
L'output del mappatore è nel formato
Gli ultimi due tipi di dati, "Testo" e "IntWritable", sono tipi di dati di output generati dal riduttore sotto forma di coppia chiave-valore.
Ogni classe Reducer deve essere estesa dalla classe MapReduceBase e deve implementare l' interfaccia Reducer .
2. Definizione della funzione "riduci"
public void reduce( Text t_key,Iteratorvalues,OutputCollector output,Reporter reporter) throws IOException {
Un input per il metodo reduce () è una chiave con un elenco di più valori.
Ad esempio, nel nostro caso, sarà-
Questo è dato al riduttore come
Quindi, per accettare argomenti di questa forma, vengono utilizzati i primi due tipi di dati, vale a dire Testo e Iteratore
L'argomento successivo è di tipo OutputCollector
Il metodo reduce () inizia copiando il valore della chiave e inizializzando il conteggio della frequenza su 0.
Chiave di testo = t_key; int frequencyForCountry = 0;
Quindi, utilizzando il ciclo " while" , iteriamo l'elenco dei valori associati alla chiave e calcoliamo la frequenza finale sommando tutti i valori.
while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}
Ora, inviamo il risultato al raccoglitore di output sotto forma di chiave e conteggio della frequenza ottenuto .
Di seguito il codice fa questo-
output.collect(key, new IntWritable(frequencyForCountry));
Spiegazione della classe SalesCountryDriver
In questa sezione, comprenderemo l'implementazione della classe SalesCountryDriver
1. Iniziamo specificando un nome di pacchetto per la nostra classe. SalesCountry è un nome del pacchetto out. Si noti che l'output della compilazione, SalesCountryDriver.class , andrà nella directory denominata con questo nome di pacchetto: SalesCountry .
Ecco una riga che specifica il nome del pacchetto seguito dal codice per importare i pacchetti della libreria.
2. Definire una classe driver che creerà un nuovo lavoro client, un oggetto di configurazione e pubblicizzerà le classi Mapper e Reducer.
La classe driver è responsabile dell'impostazione del nostro lavoro MapReduce da eseguire in Hadoop. In questa classe, specifichiamo il nome del lavoro, il tipo di dati di input / output e i nomi delle classi mapper e riduttore .
3. Nello snippet di codice sottostante, impostiamo le directory di input e output che vengono utilizzate rispettivamente per consumare il set di dati di input e produrre output.
arg [0] e arg [1] sono gli argomenti della riga di comando passati con un comando dato in MapReduce pratico, cioè
$ HADOOP_HOME / bin / hadoop jar ProductSalePerCountry.jar / inputMapReduce / mapreduce_output_sales
4. Avviare il nostro lavoro
Di seguito il codice avvia l'esecuzione del lavoro MapReduce-
try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}