In December, the eKuiper team continued to focus on the development of new features in version 1.8.0. We refactored the format mechanism of the external connection (source/sink), separated the connection, format and schema more clearly, and supported the customization of the format; benefiting from the new format mechanism, we greatly improved the file source (file source ), supports regular monitoring of the file system and files in various formats, and consumes file system data in a streaming manner; finally, we have added the import and export function of complete data including rules and configurations to support node migration. In addition, we have fixed some bugs and released it into the 1.7.x version.
The December release includes:
Connection format optimization and customization: Serialization and Schema
eKuiper connects with external systems through source/sink, and reads or writes data. Taking source as an example, each type of source needs to go through two steps of connection and serialization when reading data. For example, for MQTT source, the connection means to connect to the broker according to the MQTT protocol, and the serialization is to parse the read data payload into the map format inside eKuiper.
concatenation and serialization
Previously, connection and serialization were usually implemented inside source, so when users need to parse custom formats, even if the connection protocol is an already supported protocol such as MQTT, they still need to write a complete source plug-in. In the new version, format and source type are further separated, users can customize the format, and various formats can be combined with different connection types.For the writing method of custom format, please refer toformat extension.
For example, various payload formats can be defined when creating an MQTT-type stream. Default JSON format:
CREATE STREAM demo1() WITH (FORMAT="json", TYPE="mqtt", DATASOURCE="demo")
The data stream of MQTT type uses a custom format. At this time, the data in the MQTT payload should use a custom format:
CREATE STREAM demo1() WITH (FORMAT="custom", SCHEMAID="myFormat.myMessage", TYPE="mqtt", DATASOURCE="demo")
Previously, eKuiper supported specifying the data structure type when creating Stream. However, this approach has several problems:
Additional performance consumption. The current Schema is not associated with the original format Schema of the data, so after the data is decoded, an additional validation/conversion is required; and this process is dynamically completed based on reflection, and the performance is poor. For example, when using a strong schema such as Protobuf, the data decoded by Protobuf should already conform to the format and should not be converted.
Schema definition is cumbersome. It is also impossible to use the Schema in the format of the data itself, but requires additional configuration.
In the new version, Stream definition supports logical Schema and physical Schema definition in format. When parsing SQL, it will automatically merge physical schema and logical schema to guide SQL verification and optimization. At the same time, we also provide an API for external systems to obtain the actual inference schema of the data stream.
In the new version, the supported formats are extended to the following types. Some formats include built-in serialization; some formats, such as Protobuf, can either use built-in dynamic serialization or provide static serialization plug-ins for better performance. In terms of Schema support, some formats have Schemas, and custom formats can also provide Schema implementations.
The file source of the previous version is mainly used to create Table, and the support for streaming processing is not perfect.In the new version, the file source is also supported as a stream, which usually needs to be set
interval Parameters are updated periodically. At the same time, support for folders, multiple file formats and more configuration items are added.
The supported file types in the new version are:
json: standard JSON array format file. If the file format is a line-separated JSON string, it needs to be defined in the lines format.
csv: supports comma-separated csv files, as well as custom delimiters.
lines: A file separated by lines. The decoding method for each line can be defined by the format parameter in the stream definition. For example, for a line-separated JSON string, file type should be set to lines and format should be set to JSON.
Create a data stream for reading csv files, the syntax is as follows:
CREATE STREAM cscFileDemo () WITH (FORMAT="DELIMITED", DATASOURCE="abc.csv", TYPE="file", DELIMITER=",", CONF_KEY="csv"
Data import and export
The new version provides REST API and CLI interfaces for importing and exporting all configurations (flows, tables, rules, plug-ins, source configurations, action configurations, patterns) in the current eKuiper instance. This allows for quick configuration backup or migration to new eKuiper instances. The imported and exported rule set is in text JSON format, which is more readable and can also be manually edited.
The rest interface of the export configuration is as follows, through this API, all configurations of the current node can be exported
The rest interface of the exported configuration is as follows, through which the existing configuration can be imported into the target eKuiper instance
If the imported configuration includes the update of plug-in (native) and static schema (static schema), you need to call the following interface
The status statistics of the imported configuration can be viewed through the following interface
This month we will continue to develop other functions of version 1.8.0, refactor the documentation, and promote the integration of Flow Editor into eKuiper manager. Stay tuned.
Copyright statement: This article is original by EMQ, please indicate the source for reprinting.
#eKuiper #connection #format #optimization #support #customization #News Fast Delivery #Chinese #Open #Source #Technology #Exchange #Community