
Security News
Vite+ Joins the Push to Consolidate JavaScript Tooling
Evan You announces Vite+, a commercial, Rust-powered toolchain built on the Vite ecosystem to unify JavaScript development and fund open source.
ru.ivi.opensource:flink-clickhouse-sink
Advanced tools
Flink sink for ClickHouse database. Powered by Async Http Client. High-performance library for loading data to ClickHouse.
Flink sink for ClickHouse database. Powered by Async Http Client.
High-performance library for loading data to ClickHouse.
It has two triggers for loading data: by timeout and by buffer size.
flink | flink-clickhouse-sink |
---|---|
1.3.* | 1.0.0 |
1.9.* | 1.3.4 |
1.9.* | 1.4.* |
<dependency>
<groupId>ru.ivi.opensource</groupId>
<artifactId>flink-clickhouse-sink</artifactId>
<version>1.4.0</version>
</dependency>
The flink-clickhouse-sink uses two parts of configuration properties: common and for each sink in you operators chain.
The common part (use like global):
clickhouse.sink.num-writers
- number of writers, which build and send requests,
clickhouse.sink.queue-max-capacity
- max capacity (batches) of blank's queue,
clickhouse.sink.timeout-sec
- timeout for loading data,
clickhouse.sink.retries
- max number of retries,
clickhouse.sink.failed-records-path
- path for failed records,
clickhouse.sink.ignoring-clickhouse-sending-exception-enabled
- required boolean parameter responsible for raising (false) or not (true) ClickHouse sending exception in main thread.
if ignoring-clickhouse-sending-exception-enabled
is true, exception while clickhouse sending is ignored and failed data automatically goes to the disk.
if ignoring-clickhouse-sending-exception-enabled
is false, clickhouse sending exception thrown in "main" thread (thread which called ClickhHouseSink::invoke) and data also goes to the disk.
The sink part (use in chain):
clickhouse.sink.target-table
- target table in ClickHouse,
clickhouse.sink.max-buffer-size
- buffer size.
You have to add global parameters for Flink environment:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment();
Map<String, String> globalParameters = new HashMap<>();
// ClickHouse cluster properties
globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, ...);
globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_USER, ...);
globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, ...);
// sink common
globalParameters.put(ClickHouseSinkConst.TIMEOUT_SEC, ...);
globalParameters.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, ...);
globalParameters.put(ClickHouseSinkConst.NUM_WRITERS, ...);
globalParameters.put(ClickHouseSinkConst.NUM_RETRIES, ...);
globalParameters.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, ...);
globalParameters.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, ...);
// set global paramaters
ParameterTool parameters = ParameterTool.fromMap(buildGlobalParameters(config));
environment.getConfig().setGlobalJobParameters(parameters);
The main thing: the clickhouse-sink works with events in string (ClickHouse insert format, like CSV) format. You have to convert your event to csv format (like usual insert in database).
For example, you have event-pojo:
class A {
public final String str;
public final int integer;
public A(String str, int i){
this.str = str;
this.integer = i;
}
}
You have to implement a converter to csv, using
public interface ClickHouseSinkConverter<T> {
...
}
Example:
You have to convert this pojo like this:
import ru.ivi.opensource.flinkclickhousesink.ClickHouseSinkConverter;
public class YourEventConverter implements ClickHouseSinkConverter<A>{
@Override
public String convert(A record){
StringBuilder builder = new StringBuilder();
builder.append("(");
// add a.str
builder.append("'");
builder.append(a.str);
builder.append("', ");
// add a.integer
builder.append(String.valueOf(a.integer));
builder.append(" )");
return builder.toString();
}
}
And then add record to sink.
And add your sink like this:
// create table props for sink
Properties props = new Properties();
props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, "your_table");
props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, "10000");
// converter
YourEventConverter converter = new YourEventConverter();
// build chain
DataStream<YourEvent> dataStream = ...;
dataStream.addSink(new ClickHouseSink(props, converter))
.name("your_table ClickHouse sink);
FAQs
Flink sink for ClickHouse database. Powered by Async Http Client. High-performance library for loading data to ClickHouse.
We found that ru.ivi.opensource:flink-clickhouse-sink demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 0 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Evan You announces Vite+, a commercial, Rust-powered toolchain built on the Vite ecosystem to unify JavaScript development and fund open source.
Security News
Ruby Central’s incident report on the RubyGems.org access dispute sparks backlash from former maintainers and renewed debate over project governance.
Research
/Security News
Socket researchers uncover how threat actors weaponize Discord across the npm, PyPI, and RubyGems ecosystems to exfiltrate sensitive data.