当前位置: 首页 > news >正文

成都网站制作软件全国企业信用公示系统查询

成都网站制作软件,全国企业信用公示系统查询,网站内容的作用,代理网站地址背景 在之前的文章中Apache Hudi初探(二)(与flink的结合)–flink写hudi的操作(JobManager端的提交操作) 有说到写hudi数据会涉及到写hudi真实数据以及写hudi元数据,这篇文章来说一下具体的实现 写hudi真实数据 这里的操作就是在HoodieFlinkWriteClient.upsert方法: public …背景 在之前的文章中Apache Hudi初探(二)(与flink的结合)–flink写hudi的操作(JobManager端的提交操作) 有说到写hudi数据会涉及到写hudi真实数据以及写hudi元数据,这篇文章来说一下具体的实现 写hudi真实数据 这里的操作就是在HoodieFlinkWriteClient.upsert方法: public ListWriteStatus upsert(ListHoodieRecordT records, String instantTime) {HoodieTableT, ListHoodieRecordT, ListHoodieKey, ListWriteStatus table initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));table.validateUpsertSchema();preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());final HoodieWriteHandle?, ?, ?, ? writeHandle getOrCreateWriteHandle(records.get(0), getConfig(),instantTime, table, records.listIterator());HoodieWriteMetadataListWriteStatus result ((HoodieFlinkTableT) table).upsert(context, writeHandle, instantTime, records);if (result.getIndexLookupDuration().isPresent()) {metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());}return postWrite(result, instantTime, table);}initTable 初始化HoodieFlinkTablepreWrite 在这里几乎没什么操作getOrCreateWriteHandle 创建一个写文件的handle(假如这里创建的是FlinkMergeAndReplaceHandle)这里会记录已有的文件路径后续FlinkMergeHelper.runMerge会从这里读取数 注意该构造函数中的init方法会创建一个ExternalSpillableMap类型的map来存储即将插入的记录这在后续upsert中会用到HoodieFlinkTable.upsert 这里进行真正的upsert操作会调用FlinkUpsertDeltaCommitActionExecutor.execute,最终会调用到BaseFlinkCommitActionExecutor.execute,从而调用到FlinkMergeHelper.newInstance().runMerge public void runMerge(HoodieTableT, ListHoodieRecordT, ListHoodieKey, ListWriteStatus table,..) {final boolean externalSchemaTransformation table.getConfig().shouldUseExternalSchemaTransformation();HoodieBaseFile baseFile mergeHandle.baseFileForMerge();if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) {readSchema baseFileReader.getSchema();gWriter new GenericDatumWriter(readSchema);gReader new GenericDatumReader(readSchema, mergeHandle.getWriterSchemaWithMetaFields());} else {gReader null;gWriter null;readSchema mergeHandle.getWriterSchemaWithMetaFields();}wrapper new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer(readerIterator),Option.of(new UpdateHandler(mergeHandle)), record - {if (!externalSchemaTransformation) {return record;}return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record);});wrapper.execute();。。。mergeHandle.close();} externalSchemaTransformation 这里有hoodie.avro.schema.external.transformation配置(默认是false)用来把在之前schame下的数据转换为新的schema下的数据wrapper.execute() 这里会最终调用到upsertHandle.write(record)也就是UpdateHandler.consumeOneRecord方法被调用的地方 public void write(GenericRecord oldRecord) {...if (keyToNewRecords.containsKey(key)) {if (combinedAvroRecord.isPresent() combinedAvroRecord.get().equals(IGNORE_RECORD)) {copyOldRecord true;} else if (writeUpdateRecord(hoodieRecord, oldRecord, combinedAvroRecord)) {copyOldRecord false;}writtenRecordKeys.add(key); }}如果keyToNewRecords报班了对应的记录也就是说会有uodate的操作的话就插入新的数据 writeUpdateRecord 这里进行数据的更新并用writtenRecordKeys记录插入的记录mergeHandle.close() public ListWriteStatus close() {writeIncomingRecords();...}...protected void writeIncomingRecords() throws IOException {// write out any pending records (this can happen when inserts are turned into updates)IteratorHoodieRecordT newRecordsItr (keyToNewRecords instanceof ExternalSpillableMap)? ((ExternalSpillableMap)keyToNewRecords).iterator() : keyToNewRecords.values().iterator();while (newRecordsItr.hasNext()) {HoodieRecordT hoodieRecord newRecordsItr.next();if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {writeInsertRecord(hoodieRecord);}}}这里的writeIncomingRecords会判断如果writtenRecordKeys没有包含该记录的话就直接插入数据而不是更新 总结一下upsert的关键点: mergeHandle.close()才是真正的写数据(insert)的时候在初始化handle的时候会把记录传导writtenRecordKeys中(在HoodieMergeHandle中的init方法)mergeHandle的write() 方法会在写入数据的时候如果发现有新的数据则会写入新的数据(update)写hudi元数据 这里的操作是StreamWriteOperatorCoordinator.notifyCheckpointComplete方法 public void notifyCheckpointComplete(long checkpointId) {...final boolean committed commitInstant(this.instant, checkpointId);... }... private boolean commitInstant(String instant, long checkpointId){...doCommit(instant, writeResults);... }... private void doCommit(String instant, ListWriteStatus writeResults) {// commit or rollbacklong totalErrorRecords writeResults.stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);long totalRecords writeResults.stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L);boolean hasErrors totalErrorRecords 0;if (!hasErrors || this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {HashMapString, String checkpointCommitMetadata new HashMap();if (hasErrors) {LOG.warn(Some records failed to merge but forcing commit since commitOnErrors set to true. Errors/Total totalErrorRecords / totalRecords);}final MapString, ListString partitionToReplacedFileIds tableState.isOverwrite? writeClient.getPartitionToReplacedFileIds(tableState.operationType, writeResults): Collections.emptyMap();boolean success writeClient.commit(instant, writeResults, Option.of(checkpointCommitMetadata),tableState.commitAction, partitionToReplacedFileIds);if (success) {reset();this.ckpMetadata.commitInstant(instant);LOG.info(Commit instant [{}] success!, instant);} else {throw new HoodieException(String.format(Commit instant [%s] failed!, instant));}} else {LOG.error(Error when writing. Errors/Total totalErrorRecords / totalRecords);LOG.error(The first 100 error messages);writeResults.stream().filter(WriteStatus::hasErrors).limit(100).forEach(ws - {LOG.error(Global error for partition path {} and fileID {}: {},ws.getGlobalError(), ws.getPartitionPath(), ws.getFileId());if (ws.getErrors().size() 0) {ws.getErrors().forEach((key, value) - LOG.trace(Error for key: key and value value));}});// Rolls back instantwriteClient.rollback(instant);throw new HoodieException(String.format(Commit instant [%s] failed and rolled back !, instant));} } 主要在commitInstant涉及动的方法doCommit(instant, writeResults) 如果说没有错误发生的话就继续下一步 这里的提交过程和spark中一样具体参考Apache Hudi初探(五)(与spark的结合) 其他 在flink和spark中新写入的文件是在哪里分配对一个的fieldId: //Flink中 BucketAssignFunction 中processRecord getNewRecordLocation 分配新的 fieldId//Spark中 BaseSparkCommitActionExecutor 中execute方法 中 handleUpsertPartition 涉及到的UpsertPartitioner getBucketInfo方法 其中UpsertPartitioner构造函数中 assignInserts 方法涉及到分配新的 fieldId
http://www.sczhlp.com/news/201723/

相关文章:

  • 石家庄开发网站河北高阳做网站的
  • 纪检监察网站建设的意义用那个程序做网站收录好
  • 深圳企业网站建设费用8469网站
  • 怎么给公司做网站网站建设湖南
  • 网站开发的形式有关于建设人才网站的竞争对手分析
  • 苏州工业园区建设主管部门网站怎么进入广告联盟看广告赚钱
  • 网站建设后台程序用什么语言沈阳做网站好的
  • 昆明铁路局建设工程网站手机单页网站
  • php网站开发学习网站图片上传不上去是什么情况
  • 石家庄英文网站建设官方网站链接如何做
  • 做淘客网站需要什么咸阳今天的新消息
  • 延庆青岛网站建设邯郸在哪个省
  • 网站美工设计什么是平衡scratch编程软件
  • 上海企业网站建设报价河南省南阳市建设局网站
  • 专业的聊城做网站费用产业园门户网站建设方案
  • 成都响应式网站建设解析网站咋做的
  • 山东外贸网站推广辽宁建设工程信息网开标大厅我的项目中没有显示
  • 学校网站内容网站上传空间的ip地址
  • 购物平台大全宁波做网站seo
  • 做签名的网站网站服务器怎么维护
  • 题解:P8019 [ONTAK2015] OR-XOR
  • DP 思维好题(转载)
  • 黄景行电脑软件
  • 开源许可协议 gpl vs mit?
  • 兰州市城乡建设局网站苏州有啥好玩的地方
  • 用户体验做的好的网站网站模板 兼容ie8
  • 哪些网站做的比较好外国做挂的网站是多少钱
  • 中国十佳网站建设公司企业网站制作官网
  • PHP关于简单企业网站开发过程简介校园网站建设的参考文献
  • 开封网站建设培训学校网站建设数据库系统