Attach Sink to Overlapping Windows in StreamInsight

86 views Asked by At

I'm working on porting over some Reactive Extension queries to StreamInsight but have run into an issue with an overlapping window query.

I have a source setup in my StreamInsight server and I'm trying to write an overlapping window query like this:

var source = streamInsightServer.GetObservable<EventPattern<MyEventArg>>("EventSource");

var query = source.Window(new TimeSpan(0, 0, 1), new TimeSpan(0, 0, 0, 250));

where source is IQbservable<EventPattern<MyEventArg>> and query is then IQbservable<IObserverable<EventPattern<MyEventArg>>>

With Reactive the observer was created as follows:

_observer = query.Subscribe(evts =>
            {
                evts.Count().Subscribe(c =>
                {
                    //push output here
                });
            });

How can I attach an observer to retrieve the equivalent output from StreamInsight?

2

There are 2 answers

0
SignalRichard On

Okay, so I've managed to create two sinks that achieve the same output as the Rx subscribtions as follows:

var query = source.Window(TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(250));
query.Deploy("windowQuery");
var innerSink = applicationStatus.DefineObserver(() => Observer.Create<int>(c => /*output here*/));
innerSink.Deploy("innerSink");
var sink = applicationStatus.DefineObserver(() => Observer.Create<IObservable<EventPattern<MyEventArg>>>(e => e.Count().AsQbservable().Bind(innerSink).Run().Void()));
sink.Deploy("mySink");
ProcessBinding = query.Bind(sink).Run("processBindingName");

To note though the .Void() method is simply an extension method that does nothing and returns void which causes the expression to be interpreted as an Action instead of a Func to match the required signature. Also, all bindings created by running the inner sink are NOT disposed of and build up; these can be seen in the StreamInsight Event FLow Debugger.

While this achieves the same results I am not sure it qualifies as a good solution due to the extension method hack and not disposing of the inner process bindings. Still looking if anyone knows how to alternatively write this without these issues!

2
TXPower275 On

If your goal is to share a source of data, then you need to use .With().

So your process should be:

var process = query.Bind(innerSink).With(query.Bind(sink).Run("processBindingName");

When you are done with the process or you want to shut down the application, you should just have to call .Dispose().

As far as your windowing goes, you should use a HoppingWindow like this:

var query = from win in source.HoppingWindow(new TimeSpan(0, 0, 1), new TimeSpan(0, 0, 0, 250))
select win.Count();