Writable是value,WritableComparable<T>可是key也可是value,key是需要比较的,因为在到reduce时,将排序,而value中传递过去就可以了。
key,value对,常用数据类型列表,都是用于实现WritableComparable接口
Class |Description
BooleanWritable|Wrapper for a standard Boolean variable
ByteWritable|Wrapper for a single byte
DoubleWritable|Wrapper for a Double
FloatWritable |Wrapper for a Float
IntWritable |Wrapper for a Integer
LongWritable| Wrapper for a Long
Text| Wrapper to store text using the UTF8 format
NullWritable| Placeholder when the key or value is not needed
自定义数据类型实现WritableComparable,边排序用
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class Edge implements WritableComparable<Edge> {
private String departureNode;
private String arrivalNode;
public String getDepartureNode() { return departureNode;}
@Override
public void readFields(DataInput in) throws IOException { //读入数据
departureNode = in.readUTF();
arrivalNode = in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException { //写数据
out.writeUTF(departureNode);
out.writeUTF(arrivalNode);
}
@Override
public int compareTo(Edge o) {
return (departureNode.compareTo(o.departureNode) != 0) //定义数据排序
? departureNode.compareTo(o.departureNode)
: arrivalNode.compareTo(o.arrivalNode);
}
}
Mapper<K1,V1,K2,V2>:
形式:
void map(K1 key,
V1 value,
OutputCollector<K2,V2> output,
Reporter reporter
) throws IOException
一些有用的预定义实现
IdentityMapper<K,V> Implements Mapper<K,V,K,V> and maps inputs directly to outputs 直接输出
InverseMapper<K,V> Implements Mapper<K,V,V,K> and reverses the key/value pair 反转输出
RegexMapper<K> Implements Mapper<K,Text,Text,LongWritable> and generates a (match, 1) pair for every regular expression match 每匹配生成对
TokenCountMapper<K> Implements Mapper<K,Text,Text,LongWritable> and generates a (token, 1) pair when the input value is tokenized 分词
Reducer:
void reduce(K2 key,
Iterator<V2> values,
OutputCollector<K3,V3> output,
Reporter reporter
) throws IOException
有用预定义:
IdentityReducer<K,V> Implements Reducer<K,V,K,V> and maps inputs directly to outputs
LongSumReducer<K> Implements Reducer<K,LongWritable,K,LongWritable> and
determines the sum of all values corresponding to the given key 求和
Partitioner 重定向输出
如果复合key
(San Francisco, Los Angeles) Chuck Lam
(San Francisco, Dallas) James Warren
有时会出错,所以可自定义
public class EdgePartitioner implements Partitioner<Edge, Writable>
{
@Override
public int getPartition(Edge key, Writable value, int numPartitions) //生成0-1的hash
{
return new Long(key.getDepartureNode()).hashCode() % numPartitions;
}
@Override
public void confi gure(JobConf conf) { } //获取设置参
}
修改后的wordcount:
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.TokenCountMapper;
import org.apache.hadoop.mapred.lib.LongSumReducer;
public class WordCount2 {
public static void main(String[] args) {
JobClient client = new JobClient();
JobConf conf = new JobConf(WordCount2.class);
FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(LongWritable.class);
conf.setMapperClass(TokenCountMapper.class); //hadoop自带的预定义实现就行了,简化!
conf.setCombinerClass(LongSumReducer.class);
conf.setReducerClass(LongSumReducer.class);
client.setConf(conf);
try {
JobClient.runJob(conf);
} catch (Exception e) {
e.printStackTrace();
}
}
}
读和写
读和hdfs的分块密不可分,FSDataInputStream 不是用java.io.DataInputStream . FSDataInputStream需要支持随机读.
分割读取输入用的是InputFormat,TextInputFormat是其默认,其返回键是每行数据偏移量
TextInputFormat
Each line in the text fi les is a record. Key is the byte
offset of the line, and value is the content of the line.
key: LongWritable 偏移量
value: Text 行值
KeyValueTextInputFormat
Each line in the text fi les is a record. The fi rst separator
character divides each line. Everything before the
separator is the key, and everything after is the value.
The separator is set by the key.value.separator.in.input.
line property, and the default is the tab (\t) character. 默认是tab来分
key: Text 分割符前字符串
value: Text 分割符后字符串
例:
17:16:18 http://hadoop.apache.org/core/docs/r0.19.0/api/index.html
conf.setInputFormat(KeyValueTextInputFormat.class); 来设置
SequenceFileInputFormat<K,V> 序列化输入出是hadoop特有的二进制方式,多个连接的MapReduce这种方法最优化。
An InputFormat for reading in sequence fi les. Key and
value are user defi ned. Sequence fi le is a Hadoopspecifi
c compressed binary fi le format. It’s optimized for
passing data between the output of one MapReduce job
to the input of some other MapReduce job.
key: K (user defi ned)
value: V (user defi ned)
NLineInputFormat 不同在于一次读n行
Same as TextInputFormat, but each split is guaranteed
to have exactly N lines. The mapred.line.input.format.
linespermap property, which defaults to one, sets N.
key: LongWritable
value: Text
它们都是InputFormat的子类
自定义也可以,需要做两件事情:
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException; //确定输出数据分片成Map任务
RecordReader<K, V> getRecordReader(InputSplit split, //Record对象,解成k,v
JobConf job,
Reporter reporter) throws IOException;
}
InputFormat实现了 getSplits() 方法,但保留 getRecordReader() 抽象给子类用
numSplits 分片数目,每个分片大于mapred.min.split.size,小于block,默认64M
inputFormat 的protected isSplitable(FileSystem fs, Path fi lename) 检查是否可以分片,默认true,如果分文件不分块就覆盖这个方法为false,如压缩文件不能分块,文件转换也需要
RecordReader自定义,他把块解析为记录,再解析为k,v对
其接口:
public interface RecordReader<K, V> {
boolean next(K key, V value) throws IOException;
K createKey();
V createValue();
long getPos() throws IOException;
public void close() throws IOException;
float getProgress() throws IOException;
}
很少去改这个,一般利hadoop的类如LineRecordReader,就行了。
我们来写个TimeUrlTextInputFormat 按时间类型来分,而不是KeyValueTextInputFormat按text来分
public class TimeUrlTextInputFormat extends FileInputFormat<Text, URLWritable> {
public RecordReader<Text, URLWritable> getRecordReader( InputSplit input, JobConf job, Reporter reporter) throws IOException {
return new TimeUrlLineRecordReader(job, (FileSplit)input);
}
}
=======
public class URLWritable implements Writable {
protected URL url;
public URLWritable() { }
public URLWritable(URL url) {
this.url = url;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(url.toString());
}
public void readFields(DataInput in) throws IOException {
url = new URL(in.readUTF());
}
public void set(String s) throws MalformedURLException {
url = new URL(s);
}
}
实现六种方法,把text转为URLWritable
class TimeUrlLineRecordReader implements RecordReader<Text, URLWritable> {
private KeyValueLineRecordReader lineReader;
private Text lineKey, lineValue;
public TimeUrlLineRecordReader(JobConf job, FileSplit split) throws
IOException {
lineReader = new KeyValueLineRecordReader(job, split);
lineKey = lineReader.createKey();
lineValue = lineReader.createValue();
}
public boolean next(Text key, URLWritable value) throws IOException {
if (!lineReader.next(lineKey, lineValue)) {
return false;
}
key.set(lineKey);
value.set(lineValue.toString());
return true;
}
public Text createKey() {
return new Text(“”);
}
public URLWritable createValue() {
return new URLWritable();
}
public long getPos() throws IOException {
return lineReader.getPos();
}
public fl oat getProgress() throws IOException {
return lineReader.getProgress();
}
public void close() throws IOException {
lineReader.close();
}
}
OutputFormat
输出,写入自已文件中,无需分片,一般part-nnnnn nnnnn代表分区ID,自定义setOutputFormat ()从JobConf 对象
主要的OutputFormat类,
TextOutputFormat<K,V> //默认,tab分割,并转为toString(),分割符可在这里设置 mapred.textoutputformat.separator
Writes each record as a line of text. Keys and values
are written as strings and separated by a tab (\t)
character, which can be changed in the mapred.
textoutputformat.separator property.
SequenceFileOutputFormat<K,V>
Writes the key/value pairs in Hadoop’s proprietary
sequence fi le format. Works in conjunction with
SequenceFileInputFormat.
NullOutputFormat<K,V> Outputs nothing. //无输出,不是继承自OutputFormat,其它都是。
HDFS存储,MapReduce计算,主要针对数据密集型处理。
梦翔儿,学习 hadoop in action 笔记摘要