I have an IQStreamable object created by using DefineObservable of a Microsoft.ComplexEventProcessing.Application.
The code looks normally, but what I dont understand is when I use a passed-in argument from Task.Run() into DefineObservable, I got an exception.
However when I used the property directly without pass it into the method inside Task.Run(), it worked.
Exception
An unhandled exception of type 'System.InvalidOperationException' occurred in Microsoft.ComplexEventProcessing.Diagnostics.dll
Additional information: Cannot serialize value of type 'System.IObservable'1[ValueObjects.Price]'.
The Method
private void Monitor(IObservable<Price> priceObservable)
{
const string applicationName = "RealtimeMonitoring";
Microsoft.ComplexEventProcessing.Application application = PriceObserver.Server.CreateApplication(applicationName);
IQStreamable<Price> sStreamable = application
//.DefineObservable<Price>(() => PriceRealtimeProvider.Instance.PriceObservable)
.DefineObservable<Price>(() => PriceObservable)
.ToPointStreamable( => PointEvent<Price>.CreateInsert(DateTime.Now, price), AdvanceTimeSettings.IncreasingStartTime);
var standingQuery = from p in streamable select price ;
var sink = application.DefineObserver(() => new PriceObserver());
using (standingQuery.Bind(sink).Run())
{
// some code...
}
}
The call:
Task.Run(()=>Monitor(PriceRealtimeProvider.Instance.PriceObservable)
Question:
Does StreamInsight serialze the observer object? And Why?
What is different between
this
.DefineObservable<Price>(() => PriceObservable)
and
this
DefineObservable<Price>(() => PriceRealtimeProvider.Instance.PriceObservable)
Why using the arugument causes the problem?
.DefineObservable<Price>(() => PriceObservable). It means the argument is in application memory. And the argument need to be serialized to be passed to the remote server. So after this, the argument is actually in the StreamInsight server memory. And the argument cannot be serialized due to it is type of interface.For this call:
DefineObservable<Price>(() => PriceRealtimeProvider.Instance.PriceObservable), my guess, this is considered as a delegate call, so theInstance.PriceObservableis not instantiated yet, until StreamInsight server calls the code. And when this happens, everything is in StreamInsight server memory. So no need for serialization.In sum, serialization doesn't happen for the second call.
I'm open for correction.