I've managed to plug in the GCP PubSub dependency into the Flink Statefun JAR and then build the Docker image.
I've added the below to the pom.xml.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-gcp-pubsub</artifactId>
<version>1.16.0</version>
<scope>test</scope>
</dependency>
It's not too clear how I now specify my PubSub ingress and egress in the module.yaml that we use with the StateFun image.
https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/overview/
For example, for Kakfa you use:
kind: io.statefun.kafka.v1/egress
spec:
id: com.example/my-egress
address: kafka-broker:9092
deliverySemantic:
type: exactly-once
transactionTimeout: 15min
I can see the official connectors have a Kind const in the Java code that you use to reference the connectors within your module.yaml but I can't see in the docs how to reference the Flink connectors you plug in yourself to the StateFun image.
GCP PubSub is not officially supported as a standard Statefun IO component, only Kafka and Kinesis for now; however you can come up with your own custom ingress/egress connector relatively easily. Unfortunately you won't be able to provide a way to have a new yaml-based config item, as the modules configurators for Kafka and Kinesis seem to be hard-coded in the runtime. You'll have to do your configuration in your code:
Looking at the source/ingress example:
Your goal is going to be to provide the
new FlinkSource<>(), which is aorg.apache.flink.streaming.api.functions.source.SourceFunctionYou could declare it thus:
You'll also have to come up with a
new CustomRouter(), to determine which function instance should handle an event initially. You can take inspiration from here:Same thing for sink/egress, no router to provide:
With
new FlinkSink<>()replaced by thissink:That you would use like so, in the egress case:
You'll also have to make your Module known to the runtime using a file mentioning your Module in the META-INF/services directory of your jar, like so:
Alternatively if you prefer annotations you can use Google Autoservice like so
I hope it helps!