博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SparkStreaming(11):高级数据源flume-pull方式(生产)
阅读量:4280 次
发布时间:2019-05-27

本文共 2406 字,大约阅读时间需要 8 分钟。

1.环境

(1)生产环境

flume1.6.0

spark2.1.0

(2)下载对应依赖

备注:一定要将依赖都放入flume的Flume’s classpath内,否则flume运行有问题。(遇到过坑~~~)

(i) Custom sink JAR: groupId = org.apache.spark artifactId = spark-streaming-flume-sink_2.11 version = 2.1.0(ii) Scala library JAR: groupId = org.scala-lang artifactId = scala-library version = 2.11.7(iii) Commons Lang 3 JAR:  groupId = org.apache.commons artifactId = commons-lang3 version = 3.5

 

2.fluem的配置文件flume_pull_streaming.conf

simple-agent.sources = netcat-sourcesimple-agent.sinks = spark-sinksimple-agent.channels = memory-channelsimple-agent.sources.netcat-source.type = netcatsimple-agent.sources.netcat-source.bind = hadoopsimple-agent.sources.netcat-source.port = 44444simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSinksimple-agent.sinks.spark-sink.hostname =  hadoopsimple-agent.sinks.spark-sink.port = 41414simple-agent.channels.memory-channel.type = memorysimple-agent.sources.netcat-source.channels = memory-channelsimple-agent.sinks.spark-sink.channel = memory-channel

3.scala代码

package Sparkimport org.apache.spark.SparkConfimport org.apache.spark.streaming.flume.FlumeUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}/**  * sparkstreaming 整合flume的第二种方式  */object FlumePullWordCount_product_server {  def main(args: Array[String]): Unit = {    //实际生产使用    if(args.length!=2){      System.err.println("Usage:FlumePullWordCount_product 
") System.exit(1) } val Array(hostname,port)=args var sparkConf=new SparkConf() //.setMaster("local[2]").setAppName("FlumePullWordCount_product") val ssc=new StreamingContext(sparkConf,Seconds(5)) //TODO:如何使用Sparkfluming 整合flume val flumeStream= FlumeUtils.createPollingStream(ssc,hostname,port.toInt) flumeStream.map(x=>new String(x.event.getBody.array()).trim) .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() }}

4.测试

(1)将代码打包

(2)启动flume

bin/flume-ng agent  \	--name simple-agent \	--conf conf \	--conf-file conf/flume_pull_streaming.conf \	-Dflume.root.logger=INFO,console

(3)启动telnet

telnet hadoop 44444

(4)开启hdfs(如不开启,会报错)

(5)提交spark任务

bin/spark-submit \	--class Spark.FlumePullWordCount_product_server \	--master local[2] \	--packages org.apache.spark:spark-streaming-flume_2.11:2.1.0 \	/opt/datas/lib/scalaProjectMaven.jar \	hadoop 41414

(6)telnet测试输入

OK	s d f s 	OK	sd  fd f	OK

(结果,成功!)

转载地址:http://utygi.baihongyu.com/

你可能感兴趣的文章
sourceInsight相关
查看>>
账户管理相关
查看>>
Linux 串口编程
查看>>
Git
查看>>
驱动 (3.2) copy_to_user
查看>>
getopt 解析
查看>>
文章标题
查看>>
linux前后台切换
查看>>
nmap
查看>>
uboot执行顺序main_loop
查看>>
uboot编译内容详解
查看>>
uboot Makefile 分析
查看>>
uboot网络验证
查看>>
烧写uboot
查看>>
QT安装
查看>>
QtCreator介绍
查看>>
QT工程实例
查看>>
pkg-config
查看>>
Linux内核分析-1/反汇编(堆栈)
查看>>
Linux内核分析-2/时间片轮转多道程序
查看>>