外贸网站推广 雅虎问答有用吗,设计师培训资料,免费软件下载网站入口,宿迁房产网丫丫概述
SourceFunction:非并行数据源(并行度只能1) --接口
RichSourceFunction:多功能非并行数据源(并行度只能1) --类
ParallelSourceFunction:并行数据源(并行度能够1) --接口
RichParallelSourceFunction:多功能并行数据源(并行度能够1) --类 【建议使用的】
——…概述
SourceFunction:非并行数据源(并行度只能1) --接口
RichSourceFunction:多功能非并行数据源(并行度只能1) --类
ParallelSourceFunction:并行数据源(并行度能够1) --接口
RichParallelSourceFunction:多功能并行数据源(并行度能够1) --类 【建议使用的】
——Rich 字样代表富有在编程中富有代表可以调用的方法很多功能很全的意思。 基础案例
package com.bigdata.day02;//1、SourceFunction
// public class ZidingyiSource implements SourceFunctionStudent {
//2、RichSourceFunction
// public class ZidingyiSource extends RichSourceFunctionStudent {
//3、ParallelSourceFunction
//public class ZidingyiSource implements ParallelSourceFunctionStudent {
//4、RichParallelSourceFunction
//public class ZidingyiSource extends RichParallelSourceFunctionStudent {
// 推荐的
public class ZidingyiSource extends RichParallelSourceFunctionStudent {// ctrl oprivate final Random random new Random();private boolean flag true;// 现在不用Overridepublic void open(Configuration parameters) throws Exception {System.out.println(实现一些资源的开启);}// 现在不用Overridepublic void close() throws Exception {System.out.println(实现一些资源的关闭);}Overridepublic void run(SourceContextStudent sourceContext) throws Exception {while (flag){String stu_id UUID.randomUUID().toString();String stu_name Student_stu_id;int stu_age random.nextInt(8)10;long stu_timestamp System.currentTimeMillis();Student student new Student(stu_id,stu_name,stu_age,stu_timestamp);sourceContext.collect(student);Thread.sleep(1000);}}// 具体什么时候 会调用还不知道Overridepublic void cancel() {flag false;System.out.println(停止运行);}
}//调用
public class ZiDingYi {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// add new DataStreamSourceStudent studentDataStreamSource env.addSource(new ZidingyiSource());int parallelism studentDataStreamSource.getParallelism();System.out.println(parallelism);// print之前与之后的并行度是不同的studentDataStreamSource.print().setParallelism(1);env.execute();}
}
cancelopenclose的调用时机
package com.bigdata.day02;import java.util.Objects;/*
* 1、这几个方法都会按照并行度调用多次 调度的次数 按照studentDataStreamSource的并行度
*
*/public class ZiDingYi {public static void main(String[] args) throws Exception {// 在上面案例的基础上实现StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceStudent studentDataStreamSource env.addSource(new ZidingyiSource());// 此时就只会调用一次了studentDataStreamSource.setParallelism(1);// 此时打印也会有多个并行度8个cpustudentDataStreamSource.print();// 异步调用 此时会调用open方法JobExecutionResult execute env.execute();JobClient flink_job env.executeAsync(Flink Job);Thread.sleep(3000);// 此时会调用 cancel 和 close flink_job.cancel();}
} kafkaSource
package com.bigdata.day02;import java.util.Properties;public class KafkaSource {public static void main(String[] args) throws Exception{//envStreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// properties Properties properties new Properties();properties.setProperty(bootstrap.servers, bigdata01:9092);properties.setProperty(group.id, g1);// consumerFlinkKafkaConsumerString consumer new FlinkKafkaConsumerString(yhedu,new SimpleStringSchema(),properties);// sourceDataStreamSourceString dataStreamSource env.addSource(consumer);dataStreamSource.filter(new FilterFunctionString() {Overridepublic boolean filter(String s) throws Exception {return s.contains(success);}}).print();env.execute();}
}