The data collection ETL tool bboss-datatran v6.7.7 is released, supporting data synchronization between Elasticsearch 8 and other lower versions of Elasticsearch and Opensearch.

  1. Added a lightweight but powerful big data index analysis and calculation module, which can easily realize real-time index calculation and offline index calculation functions in multiple dimensions based on time windows, and is suitable for limited-dimensional index keys and unlimited-dimensional index keys. At the same time, it is very convenient to store the index analysis and calculation results in various databases, and quickly build enterprise-level big data analysis applications at extremely low cost, just import the following packages:

<dependency>
  <groupId>com.bbossgroups.plugins</groupId>
  <artifactId>bboss-datatran-metrics</artifactId>
  <version>6.7.7</version>
</dependency>

Use Cases

https://gitee.com/bboss/bboss-elastic-tran/tree/master/bboss-datatran-core/src/test/java/org/frameworkset/tran/metrics

  1. Incremental acquisition status table increment_tab structure adjustment, add the following fields:

    jobType varchar(500), used to save the input plug-in type, cannot be empty, see the table below for the corresponding values ​​of different plug-in types, to avoid mutual interference when loading incremental status of different types of jobs;

jobId varchar(500), used to save the job id of external settings, can be empty, when starting multiple jobs of the same type of input plug-in in the same process, the jobId of each job must be specified to avoid loading incremental status for each job time, interfere with each other

plugin typejobTypejobId
HttpInputDataTranPluginHttpInputDataTranPluginnull
DBInputDataTranPluginDBInputDataTranPluginnull
ElasticsearchInputDataTranPluginElasticsearchInputDataTranPluginnull
FileInputDataTranPluginFileInputDataTranPluginnull
HBaseInputDatatranPluginHBaseInputDatatranPluginnull
Kafka2InputDatatranPluginKafka2InputDatatranPluginnull
MongoDBInputDatatranPluginMongoDBInputDatatranPluginnull

Upgrade Notes:Before upgrading to 6.7.7, you need to manually add the two fields jobType and jobId, and modify the status record in the increment_tab table. According to the job input plug-in type, fill in the correct jobType, and then start the job, so that the job can continue to work normally.

  1. Optimize the task status record management function of the kafka output plug-in, use the indicator analysis Metrics to aggregate the data transmission status, perform the aggregation calculation according to the specified time window, and execute the callback task processing success method. The task taskMetrics is the statistical information after the aggregation calculation, which can be Use the switch to control whether to perform pre-aggregation:

kafkaOutputConfig.setEnableMetricsAgg(true);//启用预聚合功能
kafkaOutputConfig.setMetricsAggWindow(60);//指定统计时间窗口,单位:秒,默认值60秒

4. Optimize the function of the kafka input plug-in interceptor: regularly record and count the consumption of kafka data records by the plug-in, and call the aftercall method of the task interceptor to output statistical jobMetrics information, and the statistical time interval can be specified:


kafka2InputConfig.setMetricsInterval(300 * 1000L);//300秒做一次任务拦截调用,默认值

5. Some plug-ins add field mapping function, involving plug-ins: log collection plug-in, excel collection plug-in, generate log/excel file plug-in, kafka input plug-in

Example of file collection plug-in field mapping configuration:


FileInputConfig fileInputConfig = new FileInputConfig();
_fileInputConfig = fileInputConfig;
FileConfig fileConfig = new FileConfig();
    fileConfig.setFieldSplit(";");//指定日志记录字段分割符
  //指定字段映射配置
    fileConfig.addDateCellMapping(0, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue(), excelCellMapping.getDataFormat());

    fileConfig.addNumberCellMapping(1, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue(), excelCellMapping.getDataFormat());
    fileConfig.addCellMappingWithType(2, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue());

Kafka mapping configuration example:


Kafka2InputConfig kafka2InputConfig = new Kafka2InputConfig();

    kafka2InputConfig.setFieldSplit(";");//指定kafka记录字段分割符
  //指定字段映射配置
    kafka2InputConfig.addDateCellMapping(0, //记录切割得到的字段列表位置索引,从0开始
                excelCellMapping.getFieldName(), //映射的字段名称
                                          cellType, //字段值类型
                                          excelCellMapping.getDefaultValue(), //字段默认值
                                         excelCellMapping.getDataFormat());//字段格式:日期格式或者数字格式

    kafka2InputConfig.addNumberCellMapping(1, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue(), excelCellMapping.getDataFormat());
    kafka2InputConfig.addCellMappingWithType(2, excelCellMapping.getFieldName(), cellType, excelCellMapping.getDefaultValue());

cellType value range:


public static final int CELL_BOOLEAN = 5;
public static final int CELL_DATE = 3;

public static final int CELL_NUMBER = 2;
public static final int CELL_NUMBER_INTEGER = 6;
public static final int CELL_NUMBER_LONG = 7;
public static final int CELL_NUMBER_FLOAT = 8;
public static final int CELL_NUMBER_SHORT = 9;
public static final int CELL_STRING = 1;

6. Add the global JobContext to store the initialization data used in the job

7. Adjust the Exception type of the exception method parameter of the job interceptor and task interceptor to the Throwable type

For more changes, refer to the commit record: https://gitee.com/bboss/bboss-elastic-tran/commits/master

Data synchronization job development video tutorial

https://www.bilibili.com/video/BV1xf4y1Z7xu

bboss case collection

https://esdoc.bbossgroups.com/#/bboss-datasyn-demo

Quick Start

https://esdoc.bbossgroups.com/#/quickstart

development exchange

https://www.bbossgroups.com/forum.html

#Data #acquisition #ETL #tool #bbossdatatran #v677 #release #News Fast Delivery

Leave a Comment

Your email address will not be published. Required fields are marked *