mapreduce-customwrapper is a Virtual DataPort custom wrapper for running map and reduce operations using Hadoop.
The custom wrapper connects to the Hadoop machine via SSH, executes a MapReduce job and reads the results from HDFS.
MapReduce is a programming model which uses parallelism to handle large data sets in distributed systems. This model provides fault tolerance by re-scheduling failed tasks and also considers I/O locality to reduce the network traffic. The process consists of:
The model is inspired by the map and reduce functions commonly used in functional programming. Furthermore, the key contribution of the MapReduce framework are not the map and reduce functions, but:
MapReduce is supported by multiple implementations, being Apache Hadoop one of the most popular.
MapReduce job
MapReduce custom wrapper connects to the machine where Hadoop is running via SSH and executes a MapReduce job running the following command from the shell:
$ hadoop jar <jar> job-parameters
When the job finishes the wrapper reads the output stored in an HDFS directory.
Base views created from the MapReduceSSHWrapper need the following mandatory parameters:
Six optional parameters:
There are also parameters that are mutually exclusive:
and
Only one of these two latter parameters is mandatory when using AvroFileOutput.
MapReduceSSHWrapper base view edition
View schema
The execution of the view returns the results of the wordmean MapReduce job:
SELECT * FROM mapreduce_ds_job
View results
Classes provided for the base view parameters Key class and Value class should implement org.apache.hadoop.io.Writable interface. Value class also supports arrays, i.e. org.apache.hadoop.io.IntWritable[] represents an ArrayWritable of IntWritable.
The Writable classes supported by the custom wrapper are:
The custom wrapper executes a MapReduce job by connecting to the Hadoop machine via SSH and running the command “hadoop jar…”. This may seem a complicated way to execute jobs because Hadoop API allows to submit and run jobs programmatically.
When using Hadoop API:
While when using SSH:
Consequently, SSH is the preferred way as it avoids Denodo Platform having dependencies with the MapReduce jobs.
MapReduce custom wrapper is synchronous: it invokes a MapReduce job, wait until the job has finished and then reads the output from HDFS. For relatively fast MapReduce jobs synchronous execution would be desirable. However, in situations where a MapReduce job is expected to take a large amount of time asynchronous execution should be preferred.
Denodo Connect provides two components for achieve asynchronous execution of MapReduce jobs:
There are some of the issues to consider when developing a MapReduce job for this custom wrapper:
Below there is the skeleton of the main class of a MapReduce job.
public class MapReduceDriverExample { public static void main(String[] args) {
String input = args[0]; String output = args[1];
Job job = new Job(); job.setJarByClass(MapReduceDriverExample.class); job.setJobName("example-" + System.nanoTime()); // Input and output are in HDFS FileInputFormat.setInputPaths(conf, new Path(input)); FileOutputFormat.setOutputPath(conf, new Path(output)); job.setMapperClass(Mapper1.class); job.setReducerClass(Reducer1.class); job.setOutputFormat(SequenceFileOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
boolean success = job.waitForCompletion(true); System.exit(success ? 0 : 1); } } |
The configuration required for accessing a Hadoop cluster with Kerberos enabled is the same as the one needed to execute MapReduce jobs and, additionally, the user must supplied the Kerberos credentials.
The Kerberos parameters are:
hdfs-mapreducewrapper provides three ways for accessing a kerberized Hadoop cluster:
In this case only the Kerberos enabled parameter should be checked. The MapReduce wrapper would use the Kerberos ticket to authenticate itself against the Hadoop cluster.
In all these three scenarios the krb5.conf file should be present in the file system. Below there is an example of the Kerberos configuration file:
[libdefaults] renew_lifetime = 7d forwardable = true default_realm = EXAMPLE.COM ticket_lifetime = 24h dns_lookup_realm = false dns_lookup_kdc = false [domain_realm] sandbox.hortonworks.com = EXAMPLE.COM cloudera = CLOUDERA [realms] EXAMPLE.COM = { admin_server = sandbox.hortonworks.com kdc = sandbox.hortonworks.com } CLOUDERA = { kdc = quickstart.cloudera admin_server = quickstart.cloudera max_renewable_life = 7d 0h 0m 0s default_principal_flags = +renewable } [logging] default = FILE:/var/log/krb5kdc.log admin_server = FILE:/var/log/kadmind.log kdc = FILE:/var/log/krb5kdc.log |
The algorithm to locate the krb5.conf file is the following:
There is an exception. If you are planning to create MapReduce views that use the same Key Distribution Center and the same realm the Kerberos Distribution Center parameter can be provided instead of having the krb5.conf file in the file system.
View edition
Note: As the wrapper connects to the cluster via SSH and executes the MapReduce job with the command hadoop jar, the local user account must have a Kerberos valid ticket before running the MapReduce wrapper. Tickets can be requested with the kinit command.
Symptom
Error message: “Host Details : local host is: "<your domain/your IP>"; destination host is: "<hadoop IP":hadoop port>”.
Resolution
It is a version mismatch problem. Hadoop server version is older than the version distributed in the custom wrapper artifact denodo-mapreduce-customwrapper-<version>-xxx-jar-with-dependencies.
To solve the problem you should use the custom wrapper artifact denodo-mapreduce-customwrapper-<version>-xxx and copy the Hadoop server libraries to the $DENODO_PLATFORM_HOME/extensions/thirdparty/lib directory.
Symptom
Error message: “Server IPC version X cannot communicate with client version Y”.
Resolution
It is a version mismatch problem. Hadoop server version is newer than the version distributed in the custom wrapper artifact denodo-mapreduce-customwrapper-<version>-xxx-jar-with-dependencies.
To solve the problem you should use the custom wrapper artifact denodo-mapreduce-customwrapper-<version>-xxx and copy the Hadoop server libraries to the $DENODO_PLATFORM_HOME/extensions/thirdparty/lib directory.
Symptom
Error message: “denodo_output_XXX/_logs is not a file”.
Resolution
Job logs refers to the events and configuration for a completed job. They are retained to provide interesting information for the user running a job. These job history files are stored on the local filesystem of the jobtracker in a history subdirectory of the logs directory.
A second copy is also stored for the user in the _logs/history subdirectory of the job’s output directory. This is the directory the custom wrapper is complaining about because it expects only output files.
By setting to none the property hadoop.job.history.user.location in the mapred-site.xml config file or in the mapreduce job configuration no user job history is saved in the job’s output directory.
Symptom
Error message: “No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)”.
Resolution
As the wrapper connects to the cluster via SSH and executes the MapReduce job with the command hadoop jar, the local user account must have a Kerberos valid ticket before running the MapReduce wrapper. Tickets can be requested with the kinit command.
Symptom
Error message: “SIMPLE authentication is not enabled. Available:[TOKEN, KERBEROS]”.
Resolution
You are trying to connect to a Kerberos-enabled Hadoop cluster. You should configure the custom wrapper accordingly. See Secure cluster with Kerberos section for configuring Kerberos on this custom wrapper.
Symptom
Error message: “Cannot get Kerberos service ticket: KrbException: Server not found in Kerberos database (7) ”.
Resolution
Check that nslookup is returning the fully qualified hostname of the KDC. If not, modify the /etc/hosts of the client machine for the KDC entry to be of the form "IP address fully.qualified.hostname alias".