In December, the eKuiper team continued to focus on the development of new features in version 1.8.0. We refactored the format mechanism of the external connection (source/sink), separated the connection, format and schema more clearly, and supported the customization of the format; benefiting from the new format mechanism, we greatly improved the file source (file source ), supports regular monitoring of the file system and files in various formats, and consumes file system data in a streaming manner; finally, we have added the import and export function of complete data including rules and configurations to support node migration. In addition, we have fixed some bugs and released it into the 1.7.x version.

The December release includes:

Connection format optimization and customization: Serialization and Schema

eKuiper connects with external systems through source/sink, and reads or writes data. Taking source as an example, each type of source needs to go through two steps of connection and serialization when reading data. For example, for MQTT source, the connection means to connect to the broker according to the MQTT protocol, and the serialization is to parse the read data payload into the map format inside eKuiper.

concatenation and serialization

Previously, connection and serialization were usually implemented inside source, so when users need to parse custom formats, even if the connection protocol is an already supported protocol such as MQTT, they still need to write a complete source plug-in. In the new version, format and source type are further separated, users can customize the format, and various formats can be combined with different connection types.For the writing method of custom format, please refer toformat extension.

For example, various payload formats can be defined when creating an MQTT-type stream. Default JSON format:

CREATE STREAM demo1() WITH (FORMAT="json", TYPE="mqtt", DATASOURCE="demo")

The data stream of MQTT type uses a custom format. At this time, the data in the MQTT payload should use a custom format:

CREATE STREAM demo1() WITH (FORMAT="custom", SCHEMAID="myFormat.myMessage", TYPE="mqtt", DATASOURCE="demo")


Previously, eKuiper supported specifying the data structure type when creating Stream. However, this approach has several problems:

  • Additional performance consumption. The current Schema is not associated with the original format Schema of the data, so after the data is decoded, an additional validation/conversion is required; and this process is dynamically completed based on reflection, and the performance is poor. For example, when using a strong schema such as Protobuf, the data decoded by Protobuf should already conform to the format and should not be converted.

  • Schema definition is cumbersome. It is also impossible to use the Schema in the format of the data itself, but requires additional configuration.

In the new version, Stream definition supports logical Schema and physical Schema definition in format. When parsing SQL, it will automatically merge physical schema and logical schema to guide SQL verification and optimization. At the same time, we also provide an API for external systems to obtain the actual inference schema of the data stream.

GET /streams/{streamName}/schema

format list

In the new version, the supported formats are extended to the following types. Some formats include built-in serialization; some formats, such as Protobuf, can either use built-in dynamic serialization or provide static serialization plug-ins for better performance. In terms of Schema support, some formats have Schemas, and custom formats can also provide Schema implementations.

file source

The file source of the previous version is mainly used to create Table, and the support for streaming processing is not perfect.In the new version, the file source is also supported as a stream, which usually needs to be set interval Parameters are updated periodically. At the same time, support for folders, multiple file formats and more configuration items are added.

The supported file types in the new version are:

  • json: standard JSON array format file. If the file format is a line-separated JSON string, it needs to be defined in the lines format.

  • csv: supports comma-separated csv files, as well as custom delimiters.

  • lines: A file separated by lines. The decoding method for each line can be defined by the format parameter in the stream definition. For example, for a line-separated JSON string, file type should be set to lines and format should be set to JSON.

Create a data stream for reading csv files, the syntax is as follows:


Data import and export

The new version provides REST API and CLI interfaces for importing and exporting all configurations (flows, tables, rules, plug-ins, source configurations, action configurations, patterns) in the current eKuiper instance. This allows for quick configuration backup or migration to new eKuiper instances. The imported and exported rule set is in text JSON format, which is more readable and can also be manually edited.

The rest interface of the export configuration is as follows, through this API, all configurations of the current node can be exported

GET /data/export

The rest interface of the exported configuration is as follows, through which the existing configuration can be imported into the target eKuiper instance

POST /data/import

If the imported configuration includes the update of plug-in (native) and static schema (static schema), you need to call the following interface

POST /data/import?stop=1

The status statistics of the imported configuration can be viewed through the following interface

GET /data/import/status

coming soon

This month we will continue to develop other functions of version 1.8.0, refactor the documentation, and promote the integration of Flow Editor into eKuiper manager. Stay tuned.

Copyright statement: This article is original by EMQ, please indicate the source for reprinting.

Original link:

#eKuiper #connection #format #optimization #support #customization #News Fast Delivery #Chinese #Open #Source #Technology #Exchange #Community

Leave a Comment

Your email address will not be published. Required fields are marked *