Flink Connector
Swim provides a Flink connector for writing data from Flink jobs to Swim servers.
For more information on Flink connectors see the Flink documentation.
Dependency
The Swim Flink connector can be built from source, instructions can be found on GitHub.
Swim Sink
The connector provides a Flink sink that writes data to a Swim server.
To create a SwimSink
use the builder, type parameter being the type of objects to be written to the Swim server.
SwimSink.<SomeObject>builder()
.setHostUri("hostUri")
.setNodeUri(obj -> "/object/" + obj.id)
.setLaneUri("laneUri")
.setRecordValueMolder(obj -> Form.forClass(SomeObject.class).mold(obj).toValue())
.build()
The destination Swim endpoint, consisting of hostUri
, nodeUri
and laneUri
, must be set with the corresponding methods shown above.
These methods accept a String
or a function mapping the source data type to a String
.
The other mandatory field, the RecordValueMolder
, defines a method of casting the source data type to a Swim Value
type.
Swim Form
provides a general method for molding an object into a Value
, as above, but structure of messages can be customized (see Forms).
Local Example
Here we demonstrate a full example where Flink User
events from some data source are streamed to specific user nodes on a Swim server.
//imports...
public class SwimSinkExampleFlinkJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<User> randomUserDataSource = env.addSource(new RandomUserDataSource());
randomUserDataSource.sinkTo(SwimSink.<User>builder()
.setHostUri("warp://localhost:9001")
.setNodeUri(user -> "/user/" + user.id)
.setLaneUri("addEvent")
.setRecordValueMolder(user -> Form.forClass(User.class).mold(user).toValue())
.build()
);
env.execute();
}
static class User {
final int id;
final double longitude;
final double latitude;
final double score;
public User(int id, double longitude, double latitude, double score) {
this.id = id;
this.longitude = longitude;
this.latitude = latitude;
this.score = score;
}
}
}