Реализация паттерна saga на C#

Posted by VladymyrL on Sunday, January 14, 2018

Предисловие переводчика: Уже не помню, что сподвигло меня заинтересоваться этим паттерном, скорее всего это то что в какой-то момент мне пришлось столкнуться с javascript библиотекой redux-saga.
Знакомство началось с описания паттерна от Bernd Rücker и прекрасной презентации Caitie McCaffrey. После ознакомления с этими материалами у меня всё ещё оставались вопросы. Очень хотелось увидеть этот код “в живую” в виде какого либо PoC-a.

Вашему вниманию представлена статья Clemens Vasters-a, которая поможет разобраться уже в деталях с конкретной реализацией.

нужно всё переделать

Сегодня был оживленный день в тех уголках Twitter-а, где обсуждался паттерн Saga. Существует несколько .NET фреймворков, которые используют термин «сага» и которые представляют из себя реализацию конечного автомата или воркфлоу.

Проблема в том, что это всё не сага!

Что такое паттерн сага?

Сага - это шаблон управления сбоями. Saga появилась из осознания того, что долгоживущие транзакции (первоначально даже транзакции внутри базы данных), и особенно далеко распределенные транзакции, пересекающие границы местоположения и / или доверия, не могут быть обработаны с использованием классической модели ACID с двухфазной фиксацией и удерживанием блокировки на время работы (2PC). Вместо этого сага разбивает работу на отдельные транзакции, результаты которых могут быть каким-то образом отменены после того, как работа выполнена и совершена.

сага пример

На картинке показана простая сага в действии.

Если вы бронируете туристический маршрут, вам нужен автомобиль, отель и рейс. Если вы не можете получить все из них, поездку, вероятно, не стоит осуществлять. Также очень важно, что вы не можете привлечь всех этих поставщиков к распределенной транзакции ACID. Вместо этого у вас будет активности по бронированию прокатных автомобилей, номера отеля и билета на самолёт, которые знают, как выполнять и отменять бронирование. Действия группируются в составное задание (маршрутный лист), а это задание передается по цепочке действий. Если хотите, вы можете подписывать / шифровать элементы маршрутного листа, чтобы его можно было понять и использовать только предполагаемым получателем. Когда действие завершается, оно добавляет запись о завершении к маршрутному листу вместе с информацией о том, где может быть достигнута компенсационная операция (например, через очередь). Когда действие завершается неудачно, оно очищается локально, а затем отправляет прокрутку маршрута назад, сквозь все элементы списка к последнему адресу компенсации, чтобы отменить результат транзакции.

Если вы немного знакомы с путешествиями, вы также заметите, что я отсортировал эти шаги по риску. Резервирование арендованного автомобиля почти всегда удается, если вы забронируете заранее, потому что компания по прокату автомобилей может перемещать больше автомобилей на место, где появляется больший спрос. Резервирование отеля немного более рискованно, но вы обычно можете отказаться от бронирования без штрафа до 24 часов до прибывания. В стоимость авиабилетов часто входит лимитирование возврата средств, поэтому вы захотите сделать это в последнюю очередь.

Я создал Gist на Github, который можно запустить как консольное приложение. Он отображает эту модель в коде. Имейте в виду, что это макет, а не фреймворк. Я написал этот код менее чем за 90 минут, поэтому не ожидайте особой ре-юзабельности. Основная программа задаёт примерный маршрутный лист (все классы находятся в одном файле) и создает три полностью независимых «процесса» (хосты активности), каждый из которых несет ответственность за обработку определенного вида работы. «Процессы» связаны «сетью», и каждый вид деятельности имеет адрес для продвижения вперед а также адрес для работ по компенсации. Разрешение сети моделируется при помощи метода «send».

static ActivityHost[] processes;

static void Main(string[] args)
{
        var routingSlip = new RoutingSlip(new WorkItem[]
            {
                new WorkItem<ReserveCarActivity>(new WorkItemArguments),
                new WorkItem<ReserveHotelActivity>(new WorkItemArguments),
                new WorkItem<ReserveFlightActivity>(new WorkItemArguments)
            });

        // Представьте, что это полностью отдельные процессы с очередями между ними

        processes = new ActivityHost[]
                            {
                                new ActivityHost<ReserveCarActivity>(Send),
                                new ActivityHost<ReserveHotelActivity>(Send),
                                new ActivityHost<ReserveFlightActivity>(Send)
                            };
        // передать первый адрес

        Send(routingSlip.ProgressUri, routingSlip);
    }
    
static void Send(Uri uri, RoutingSlip routingSlip)
{
        // это фактически отправка по сети
        foreach (var process in processes)
        {

            if (process.AcceptMessage(uri, routingSlip))
            {
                break;
            }
        }
}

Каждая из этих операций выполняет этап резервирования и этап отмены. Вот для автомобилей:

class ReserveCarActivity : Activity
{
    static Random rnd = new Random(2);
    
    public override WorkLog DoWork(WorkItem workItem)
    {
        Console.WriteLine("Reserving car");
        var car = workItem.Arguments["vehicleType"];
        var reservationId = rnd.Next(100000);
        Console.WriteLine("Reserved car {0}", reservationId);
        return new WorkLog(this, new WorkResult { { "reservationId", reservationId } });
    }
    
    public override bool Compensate(WorkLog item, RoutingSlip routingSlip)
    {
        var reservationId = item.Result["reservationId"];
        Console.WriteLine("Cancelled car {0}", reservationId);
        return true;
    }
    
    public override Uri WorkItemQueueAddress
    {
        get { return new Uri("sb://./carReservations"); }
    }
    
    public override Uri CompensationQueueAddress
    {
        get { return new Uri("sb://./carCancellactions"); }
    }
}

Проход по последовательности операций происходит согласно маршрутному листу. Маршрутный лист является «сериализуемым» (в исходнике это не так, но в качестве примера этого достаточно), и это единственная составляющая информации, которая течет между совместными задачами. Центральная координация отсутствует. Вся работа является локальной на узлах, и как только узел завершен, он либо передает прокрутку маршрута вперед (при успешном завершении), либо назад (при сбое).

Для информации о продвижении вперед, маршрутный лист имеет очередь, а для элементов вовзрата она поддерживает стек. Список маршрутизации также занимается определением и вызовом того, что будет вызвано следующим при движении вперед и назад.

class RoutingSlip
{
    readonly Stack<WorkLog> completedWorkLogs = new Stack<WorkLog>();
    readonly Queue<WorkItem> nextWorkItem = new Queue<WorkItem>();
    
    public RoutingSlip()
    {
    }
    
    public RoutingSlip(IEnumerable<WorkItem> workItems)
    {
        foreach (var workItem in workItems)
        {
            this.nextWorkItem.Enqueue(workItem);
        }
    }
    
    public bool IsCompleted
    {
        get { return this.nextWorkItem.Count == 0; }
    }
    
    public bool IsInProgress
    {
        get { return this.completedWorkLogs.Count > 0; }
    }
    
    public bool ProcessNext()
    {
        if (this.IsCompleted)
        {
            throw new InvalidOperationException();
        }
    
        var currentItem = this.nextWorkItem.Dequeue();
        var activity = (Activity)Activator.CreateInstance(currentItem.ActivityType);
        try
        {
            var result = activity.DoWork(currentItem);
            if (result != null)
            {
                this.completedWorkLogs.Push(result);
                return true;
            }
        }
        catch (Exception e)
        {
            Console.WriteLine("Exception {0}", e.Message);
        }
        return false;
    }
    
    public Uri ProgressUri
    {
        get
        {
            if (IsCompleted)
            {
                return null;
            }
            else
            {
                return
                    ((Activity)Activator.CreateInstance(this.nextWorkItem.Peek().ActivityType)).
                        WorkItemQueueAddress;
            }
        }
    }
    
    public Uri CompensationUri
    {
        get
        {
            if (!IsInProgress)
            {
                return null;
            }
            else
            {
                return
                    ((Activity)Activator.CreateInstance(this.completedWorkLogs.Peek().ActivityType)).
                        CompensationQueueAddress;
            }
        }
    }
    
    public bool UndoLast()
    {
        if (!this.IsInProgress)
        {
            throw new InvalidOperationException();
        }
    
        var currentItem = this.completedWorkLogs.Pop();
        var activity = (Activity)Activator.CreateInstance(currentItem.ActivityType);
        try
        {
            return activity.Compensate(currentItem, this);
        }
        catch (Exception e)
        {
            Console.WriteLine("Exception {0}", e.Message);
            throw;
        }
    
    }
}

Локальная работа и принятие решений инкапсулируется в ActivityHost, который вызывает ProcessNext() у объекта маршрутного листа для определения следующего действия и вызывает его функцию DoWork() при движении вперед или определит последнюю выполненную операцию на обратном пути и вызовет функцию Compensate(). Опять же, здесь нет ничего централизованного; все, что работает, зависит от маршрутного листа, а три действия и их выполнение полностью не пересекаются.

abstract class ActivityHost
{
    Action<Uri, RoutingSlip> send;

    public ActivityHost(Action<Uri, RoutingSlip> send)
    {
        this.send = send;
    }

    public void ProcessForwardMessage(RoutingSlip routingSlip)
    {
        if (!routingSlip.IsCompleted)
        {
        // если текущий шаг будет успешным, продолжаем

        // в противном случае перейдём к пути назад

        if (routingSlip.ProcessNext())
        {
            // рекурсия оставлена для передачи контекста через сообщение

            // маршрутный лист может быть полностью сериализован и передадваться

            // между системами.

            this.send(routingSlip.ProgressUri, routingSlip);
        }
        else
        {
                // pass message to unwind message route
                this.send(routingSlip.CompensationUri, routingSlip);
        }
        }
    }

    public void ProcessBackwardMessage(RoutingSlip routingSlip)
    {
        if (routingSlip.IsInProgress)
        {
            // UndoLast может поместить новую работу в маршрутный лист

            // а также может вернуть false, чтобы вернуться к движению 

            //по направлению вперёд

            if (routingSlip.UndoLast())
            {
            // рекурсия оставлена для передачи контекста через сообщение

            // маршрутный лист может быть полностью сериализован и передадваться

            // между системами

                this.send(routingSlip.CompensationUri, routingSlip);
            }
            else
            {
                this.send(routingSlip.ProgressUri, routingSlip);
            }
        }
    }

    public abstract bool AcceptMessage(Uri uri, RoutingSlip routingSlip);
}

Вот это сага!

Картинка взята с www.commitstrip.com


comments powered by Disqus