083月

详解mrjob输入输出协议(Protocols) – Never-Giveup的博客

率先,包含正式的证明。,与写东西小窥测来包含。。更不隐瞒的的心甘情愿的可以适用于最新的文档和源码()

输出输出协议

正式的关联:https://pythonhosted.org/mrjob/guides/writing-mrjobs.html#protocols

MRPoice推测缠住录音都是由断线分派的八位位组。。它运用protocols序列和去序列化这些八位位组。每个job(作业)缠住输出。protocol,东西输出procotol内地procotol
东西procotol具有read()方法和write()方法。该 read()方法元气八位位组替换为Python女朋友的键布置(键和值)对。。该write()方法元气一对Python女朋友(表现由键和值结合的键布置对)替换为。

输出procotol用于读取八位位组到第东西八位位组。mapper(或增电子剂,条件在第一步中不运用映照器。输出procotol以八位位组花样输出最初一步输出到输出证明。。内地procotol将一步的输出替换为下一步的输出。,条件伪造有东西外面的移动。
可以布置要运用的作业。procotol,如次所示:

classMyMRJob(mrjob.job.MRJob):
    INPUT_PROTOCOL = mrjob.protocol.RawValueProtocol
    INTERNAL_PROTOCOL = mrjob.protocol.JSONProtocol
    OUTPUT_PROTOCOL = mrjob.protocol.JSONProtocol

默许输出协议是RawValueProtocol,它只读取每不育系作为STR。。(行不能的有嵌上断线。,因MRJob它会剪下它。 )。这么,默许使适应下,在第一步伪造中,您可以瞥见输出的每不育系都是CONV。, 行的键布置对。

默许输出和内地协议都是JSONProtocol,它读取和写信由选择能力卡分派的JSON字母串。。(默许使适应下),Hadoop 流运用选择能力卡在排序录音时分离行击中要害键布置。。)

条件你的头当然啦疼,你可以这样地想。:RawValueProtocol当您想里德或写信原始倒转术行时运用。。JSONProtocol当您想读取或写信键布置对(j的键布置对)时。

小心: Hadoop 流不运用JSON或MRPoice协议。。它只经过比力第东西选择能力卡前的字母串来小群行。。

可以经过检查(https://pythonhosted.org/mrjob/protocols.html#module-) 领会MRPoice内置协议的十分列表。。

小注:

  • [1] 有经历的巨蜥可能会小心到,Python击中要害STR 2是八位位组串。,但在大蟒蛇中 3是Unicode。。这是批改的。!RawValueProtocol是两个差数协议的别号。,这静止你的Python版本。。
  • [2] JSONProtocol是四种差数意识到的别号。 我们家尝试运用(更快)UJSON库,条件引起的话。,条件无,试试Rabijson或SimJeason库。。在刊登于头版无的使适应下。, 运用Python内置的JSON模块。
录音流生动的建议

让我们家从多移动的分派中重行谛视我们家的建议。。它有两个移动。,并运用纯倒转术证明作为输出。

classMRMostUsedWord(MRJob):defsteps(self):return[
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_max_word)]

第一步呼叫mapper_get_words()作用:

defmapper_get_words(self, _, line):for word in WORD_RE.芬德尔(line):yield(word.lower(),1)

因输出协议是RawValueProtocol,坩埚常常None ,该值将是行的倒转术。
作用保持正中的键(键),并现场恢复到每个单词的每行, 1)花样元组。因内地协议是JSONProtocol,输出被序列化为JSON。。序列化部件被写信STDUT(规范输出),经过制表符和完毕符与行打断划分。,如次所示:

"mrjob"1"is"1"a"1"python"1

下东西是combinerreducer

defcombiner_count_words(self, word, counts):yield(word,sum(counts))defreducer_count_words(self, word, counts):yieldNone,(sum(counts), word)

在这两种使适应下,八位位组去世JSONProtocol序列化为键布置对(Word), 计数) ,以异样看待的方法将输出序列化为JSON(因两个都是尾随的)。它演出像第东西映照器的输出。,但算是总结如次:

"mrjob"31"is"2"a"2"Python"1

最初一步是制动器。

defreducer_find_max_word(self, _, word_count_pairs):yieldmax(word_count_pairs)

因大约移动的缠住输出都有异样看待的电键(无),这么,东西分派将接球缠住行。。异样,JSONProtocol将处置反序列化并增大参量给reducer_find_max_word()

输出协议亦JSONProtocol,因而终极的输出将是

31"mrjob"

我们家走完了!但当然啦丑。 无必要写电键(电键)。因而我们家运用JSONValueProtocol协议,因而我们家只瞥见JSON编码的值。:

classMRMostUsedWord(MRJob):

    OUTPUT_PROTOCOL = JSONValueProtocol

现时我们家理应从事examples/源编码异样看待的编码。让我们家试着运转它。-q 预防调试日记输出:

$ python mr_most_used_word.py README.txt -q
"mrjob"

Hooray!

为你的家作业分派一份协议。

通常使适应下,只需设置东西或多个类属性那就够了。 INPUT_PROTOCOL, INTERNAL_PROTOCOL然后 OUTPUT_PROTOCOL:

classBasicProtocolJob(MRJob):
    INPUT_PROTOCOL = RawValueProtocol
    
    INTERNAL_PROTOCOL = PickleProtocol
    
    OUTPUT_PROTOCOL = JSONProtocol

条件你需求更复杂的行动,你可以重写。 input_protocol(), internal_protocol()或许 OutPuxTebug()方法现场恢复协议女朋友生动的建议。。这是命令行选择能力文档的示例。:

classCommandLineProtocolJob(MRJob):defconfigure_options(self):super(CommandLineProtocolJob, self).configure_options()
        self.add_passthrough_option(''--output-format'', default=原始, choices=[原始,''json''],help="Specify the output format of the job")defoutput_protocol(self):if self.options.output_format ==''json'':return JSONValueProtocol()elif self.options.output_format ==原始:return RawValueProtocol()

最初,条件需求运用完整差数的协议分派意向,它可以相交PICKIX协议:

classWhatIsThisIDontEvenProtocolJob(MRJob):defpick_protocols(self, step_num, step_type):return random.choice([Protocololol, ROFLcol, Trolltocol, Locotorp])
笔墨自定义协议

协议是一种懂得。里德(自己), 线)笔墨(自己), key, 付出代价)方法的女朋友。read()方法同意东西。bytestring并现场恢复2元组的解码女朋友。,write()方法获取电键和值,并现场恢复要现场恢复到Hadoop的八位位组。 流或输出。

协议不用担忧添加或剪下断线。 这是由MRJob不自觉动作处置的。。

这是MRJOB的JSON协议的预先消化版本。:

import json


classJSONProtocol(object):defread(self, line):
        k_str, v_str = line.split(''\t'',1)return json.loads(k_str), json.loads(v_str)defwrite(self, key, value):return''%s\t%s''%(json.dumps(key), json.dumps(value))

还可以经过缓存/序列化来明显借款功能。。详细请检查检查(https://pythonhosted.org/mrjob/protocols.html#module-) 示例源编码 。

我的教育演示

  1. 编码
from mrjob.job import MRJob

classMRWordCount(MRJob):defmapper(self, key, line):print(''key---'',key)for word in line.split():yield(word,1)defreducer(self, word, counts):yield(word,sum(counts))if __name__ ==''__main__'':
   MRWordCount.run()
  1. 输出倒转术输出
jack be nimble
jack be quick
jack jumped over the candlestick
  1. 运转
    有别于运转映照器RawValueProtocol是方式处置的
    python3 demo.py --mapper /home/hadoop/HadoopWithPython-master/resources/input.txt
  2. 输出
key--- None
"jack"	1
"be"	1
"nimble"	1
key--- None
"jack"	1
"be"	1
"quick"	1
key--- None
"jack"	1
"jumped"	1
"over"	1
"the"	1
"candlestick"	1

发表评论

电子邮件地址不会被公开。 必填项已用*标注