StreamInsight: Cannot serialize value of type X only if X is passed in a Task.Run() block

135 views Asked by At

I have an IQStreamable object created by using DefineObservable of a Microsoft.ComplexEventProcessing.Application.

The code looks normally, but what I dont understand is when I use a passed-in argument from Task.Run() into DefineObservable, I got an exception.

However when I used the property directly without pass it into the method inside Task.Run(), it worked.

Exception

An unhandled exception of type 'System.InvalidOperationException' occurred in Microsoft.ComplexEventProcessing.Diagnostics.dll

Additional information: Cannot serialize value of type 'System.IObservable'1[ValueObjects.Price]'.

The Method

private void Monitor(IObservable<Price> priceObservable)
    {
        const string applicationName = "RealtimeMonitoring";

        Microsoft.ComplexEventProcessing.Application application = PriceObserver.Server.CreateApplication(applicationName);
        IQStreamable<Price> sStreamable = application
            //.DefineObservable<Price>(() => PriceRealtimeProvider.Instance.PriceObservable)
            .DefineObservable<Price>(() => PriceObservable)
            .ToPointStreamable( => PointEvent<Price>.CreateInsert(DateTime.Now, price), AdvanceTimeSettings.IncreasingStartTime);

        var standingQuery = from p in streamable select price ;
        var sink = application.DefineObserver(() => new PriceObserver());

        using (standingQuery.Bind(sink).Run())
        {
            // some code...
        }
    }

The call:

Task.Run(()=>Monitor(PriceRealtimeProvider.Instance.PriceObservable)

Question:

  1. Does StreamInsight serialze the observer object? And Why?

  2. What is different between

this .DefineObservable<Price>(() => PriceObservable) and

this DefineObservable<Price>(() => PriceRealtimeProvider.Instance.PriceObservable)

Why using the arugument causes the problem?

1

There are 1 answers

0
Vu Nguyen On
  1. Yes, but I still don't know the original design and reason for this.
  2. For this call: .DefineObservable<Price>(() => PriceObservable). It means the argument is in application memory. And the argument need to be serialized to be passed to the remote server. So after this, the argument is actually in the StreamInsight server memory. And the argument cannot be serialized due to it is type of interface.

For this call: DefineObservable<Price>(() => PriceRealtimeProvider.Instance.PriceObservable), my guess, this is considered as a delegate call, so the Instance.PriceObservable is not instantiated yet, until StreamInsight server calls the code. And when this happens, everything is in StreamInsight server memory. So no need for serialization.

In sum, serialization doesn't happen for the second call.

I'm open for correction.