Akka.net Executing long-running actions inside an actor’s Receive method

169 views Asked by At

I'm newbie in Akka.net framework

want to execute job long-running operation inside Receive method then my actors will be unable to process any messages Plus how to cancel (terminate) running job , including system messages, until that operation finishes. And if it’s possible that the operation will never finish, it’s possible to deadlock my actor.

Expected behavior: once click on cancel button should should stop current execution for loop Plus ui must be reliable to start new job as current code run job in Task with cancellation

Current Result behavior: it won't cancel current execution for loop and cannot start new job while a current job executing

here JobActor.cs

public class JobActor : ReceiveActor, IWithUnboundedStash
    {
        private readonly IActorRef supervisor_;
        private readonly List<KeyValuePair<int, IJob>> jobs_;
        private IJobManagerFactory _jobFactory;
        private Stopwatch _timer;
        private ILoggingAdapter _loggingAdapter;
        private Task _runningTask;
        private CancellationTokenSource _cancel;

        public IStash Stash { get; set; }


        public JobActor(IActorRef supervisor, IJobManagerFactory jobFactory)
        {
            _jobFactory = jobFactory;
            _cancel = new CancellationTokenSource();
            supervisor_ = supervisor;
            jobs_ = new List<KeyValuePair<int, IJob>>();
            _loggingAdapter = Context.GetLogger();
            _timer = new Stopwatch();
            Ready();

        }

        private void Ready()
        {
            Receive<Run>(run =>
            {
                var self = Self; // closure
                _runningTask = Task.Run(() =>
                {
                    // ... work
                    var job = _jobFactory.CreateJob(run);
                    job.Started += () =>
                    {
                        _timer.Start();
                        supervisor_.Tell(new Started(job.Id));
                        _loggingAdapter.Info($"Job {job.Id} started.");
                    };
                    job.Progressed += (percent) =>
                    {
                        supervisor_.Tell(new Progress(job.Id, Convert.ToInt32(_timer.Elapsed.TotalSeconds)));
                        _loggingAdapter.Debug($"Job {job.Id} is now {percent}% complete.");
                    };

                    job.StatusChanged += status =>
                    {
                        supervisor_.Tell(new StatusChanged(job.Id, job.Status));
                        _loggingAdapter.Info($"Job {job.Id} has changed status to {job.Status}.");
                    };
                    job.Finished += () =>
                    {
                        _timer.Stop();
                        supervisor_.Tell(new Progress(job.Id, Convert.ToInt32(_timer.Elapsed.TotalSeconds)));
                        supervisor_.Tell(new Finished(job.Id));
                        _loggingAdapter.Info($"Job {job.Id} finished.");
                    };
                    var jobStore = new KeyValuePair<int, IJob>(job.Id, job);
                    _loggingAdapter.Info($"Job {job.Id} starting.");
                    jobs_.Add(jobStore);
                    job.Run();


                }, _cancel.Token).ContinueWith(x =>
                {
                    if (x.IsCanceled || x.IsFaulted)
                        new Cancel(run.Id);
                    new Finished(run.Id);
                }, TaskContinuationOptions.ExecuteSynchronously)
                .PipeTo(self);

                // switch behavior
                Become(Working);
            });

            Receive<Cancel>(cancel =>
            {
                _timer.Stop();
                IJob job = jobs_.FirstOrDefault(x => x.Key == cancel.Id).Value;
                _loggingAdapter.Info($"Canceling job {cancel.Id}.");
                job?.Cancel();

            });
            
        }

        private void Working()
        {
            Receive<Cancel>(cancel =>
            {
                _cancel.Cancel(); // cancel work
                BecomeReady();
            });
            Receive<Failed>(f => BecomeReady());
            Receive<Finished>(f => BecomeReady());
            ReceiveAny(o => Stash.Stash());
        }

        private void BecomeReady()
        {
            _cancel = new CancellationTokenSource();
            Stash.UnstashAll();
            Become(Ready);
        }
    }

here JobManager.cs

 public class JobManager : IJob
    {
        private readonly Timer timer_;
        private int percentCounter_;
        private JobStatus status_;
        public JobManager(Run run)
        {
            Id = run.Id;

            Status = JobStatus.New;
            timer_ = new Timer(1000) { AutoReset = true };
            timer_.Elapsed += (sender, eventargs) =>
            {
                Progressed?.Invoke(percentCounter_);
            };
        }


        public int Id { get; }

        public JobStatus Status
        {
            get { return status_; }
            set
            {
                status_ = value;
                StatusChanged?.Invoke(status_);
            }
        }

        public event Action<int> Progressed;
        public event Action Finished;
        public event Action Started;

        public event Action<JobStatus> StatusChanged;
        public void Run()
        {
            timer_.Start();
            Started?.Invoke();
            Status = JobStatus.Running;
            Console.ForegroundColor = ConsoleColor.Green;
            for (int i = 0; i < 100000; i++)
            {
                Console.WriteLine(i);
            }
        }

        public void Cancel()
        {
            timer_.Stop();
            timer_.Close();
            Status = JobStatus.Canceled;
        }
    }

0

There are 0 answers