本文共 2406 字,大约阅读时间需要 8 分钟。
flume1.6.0
spark2.1.0
备注:一定要将依赖都放入flume的Flume’s classpath内,否则flume运行有问题。(遇到过坑~~~)
(i) Custom sink JAR: groupId = org.apache.spark artifactId = spark-streaming-flume-sink_2.11 version = 2.1.0(ii) Scala library JAR: groupId = org.scala-lang artifactId = scala-library version = 2.11.7(iii) Commons Lang 3 JAR: groupId = org.apache.commons artifactId = commons-lang3 version = 3.5
simple-agent.sources = netcat-sourcesimple-agent.sinks = spark-sinksimple-agent.channels = memory-channelsimple-agent.sources.netcat-source.type = netcatsimple-agent.sources.netcat-source.bind = hadoopsimple-agent.sources.netcat-source.port = 44444simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSinksimple-agent.sinks.spark-sink.hostname = hadoopsimple-agent.sinks.spark-sink.port = 41414simple-agent.channels.memory-channel.type = memorysimple-agent.sources.netcat-source.channels = memory-channelsimple-agent.sinks.spark-sink.channel = memory-channel
package Sparkimport org.apache.spark.SparkConfimport org.apache.spark.streaming.flume.FlumeUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}/** * sparkstreaming 整合flume的第二种方式 */object FlumePullWordCount_product_server { def main(args: Array[String]): Unit = { //实际生产使用 if(args.length!=2){ System.err.println("Usage:FlumePullWordCount_product") System.exit(1) } val Array(hostname,port)=args var sparkConf=new SparkConf() //.setMaster("local[2]").setAppName("FlumePullWordCount_product") val ssc=new StreamingContext(sparkConf,Seconds(5)) //TODO:如何使用Sparkfluming 整合flume val flumeStream= FlumeUtils.createPollingStream(ssc,hostname,port.toInt) flumeStream.map(x=>new String(x.event.getBody.array()).trim) .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() }}
bin/flume-ng agent \ --name simple-agent \ --conf conf \ --conf-file conf/flume_pull_streaming.conf \ -Dflume.root.logger=INFO,console
telnet hadoop 44444
bin/spark-submit \ --class Spark.FlumePullWordCount_product_server \ --master local[2] \ --packages org.apache.spark:spark-streaming-flume_2.11:2.1.0 \ /opt/datas/lib/scalaProjectMaven.jar \ hadoop 41414
OK s d f s OK sd fd f OK
(结果,成功!)
转载地址:http://utygi.baihongyu.com/