Korrektur: Bug in dem Priority-Scheduler für Coroutinen im C++-Blog

Die in den letzten beiden Beiträgen vorgestellte Prioritäts-Scheduler für Coroutines hatten einen Fehler, der sich aber leicht beheben lässt.

In Pocket speichern vorlesen Druckansicht 4 Kommentare lesen

(Bild: Elena Abrazhevich/Shutterstock)

Lesezeit: 1 Min.
Von
  • Rainer Grimm

In meinen letzten beiden Blogbeiträgen habe ich einen Prioritäts-Scheduler für Coroutines vorgestellt. Der Code dazu hatte einen Fehler.

Modernes C++ – Rainer Grimm

Rainer Grimm ist seit vielen Jahren als Softwarearchitekt, Team- und Schulungsleiter tätig. Er schreibt gerne Artikel zu den Programmiersprachen C++, Python und Haskell, spricht aber auch gerne und häufig auf Fachkonferenzen. Auf seinem Blog Modernes C++ beschäftigt er sich intensiv mit seiner Leidenschaft C++.

So sieht der fehlerhafte Scheduler aus:

// priority_queueSchedulerPriority.cpp

#include <concepts>
#include <coroutine>
#include <functional>
#include <iostream>
#include <queue>
#include <utility>


struct Task {

  struct promise_type {
    std::suspend_always initial_suspend() noexcept { return {}; }
    std::suspend_always final_suspend() noexcept { return {}; }

    Task get_return_object() { 
        return std::coroutine_handle<promise_type>::from_promise(*this); 
    }
    void return_void() {}
    void unhandled_exception() {}
  };

  Task(std::coroutine_handle<promise_type> handle): handle{handle}{}

  auto get_handle() { return handle; }

  std::coroutine_handle<promise_type> handle;
};

using job = std::pair<int, std::coroutine_handle<>>;

template <typename Updater = std::identity,                         // (1)
          typename Comperator = std::ranges::less>                   
requires std::invocable<decltype(Updater()), int> &&                // (2)
         std::predicate<decltype(Comperator()), job, job>             
class Scheduler {

  std::priority_queue<job, std::vector<job>, Comperator> _prioTasks;

  public: 
    void emplace(int prio, std::coroutine_handle<> task) {
      _prioTasks.push(std::make_pair(prio, task));
    }

    void schedule() {
      Updater upd = {};                                            // (3)
      while(!_prioTasks.empty()) {
        auto [prio, task] = _prioTasks.top();
        _prioTasks.pop();
        task.resume();

        if(!task.done()) { 
          _prioTasks.push(std::make_pair(upd(prio), task));          // (4)
        }
        else {
          task.destroy();
        }
      }
    }

};


Task createTask(const std::string& name) {
  std::cout << name << " start\n";
  co_await std::suspend_always();
  for (int i = 0; i <= 3; ++i ) { 
    std::cout << name << " execute " << i << "\n";                  // (5)
    co_await std::suspend_always();
  }
  co_await std::suspend_always();
  std::cout << name << " finish\n";
}


int main() {

  std::cout << '\n';

  Scheduler scheduler1;                                               // (6)

  scheduler1.emplace(0, createTask("TaskA").get_handle());
  scheduler1.emplace(1, createTask("  TaskB").get_handle());
  scheduler1.emplace(2, createTask("    TaskC").get_handle());

  scheduler1.schedule();

  std::cout << '\n';

  Scheduler<decltype([](int a) { return a - 1; })> scheduler2;        // (7)

  scheduler2.emplace(0, createTask("TaskA").get_handle());
  scheduler2.emplace(1, createTask("  TaskB").get_handle());
  scheduler2.emplace(2, createTask("    TaskC").get_handle());

  scheduler2.schedule();

  std::cout << '\n';

}

Das war die Ausgabe des Programms, die ich erhalten habe:

Christof Meerwald hat mit dem GCC eine andere Ausgabe erhalten. Vielen Dank für diesen Hinweis. Hier ist die GCC-Ausgabe mit aktivierter Optimierung.

Auch die Windows-Ausgabe war fehlerhaft:

Hier sind die entscheidenden Zeilen mit dem Fehler:

Task createTask(const std::string& name) {                  // (1)
  std::cout << name << " start\n";
  co_await std::suspend_always();
  for (int i = 0; i <= 3; ++i ) { 
    std::cout << name << " execute " << i << "\n";                  
    co_await std::suspend_always();
  }
  co_await std::suspend_always();
  std::cout << name << " finish\n";
}


int main() {

  std::cout << '\n';

  Scheduler scheduler1;                                               

  scheduler1.emplace(0, createTask("TaskA").get_handle());    // (2)
  scheduler1.emplace(1, createTask("  TaskB").get_handle());  // (3)
  scheduler1.emplace(2, createTask("    TaskC").get_handle());// (4)

  scheduler1.schedule();

  std::cout << '\n';

  Scheduler<decltype([](int a) { return a - 1; })> scheduler2;        

  scheduler2.emplace(0, createTask("TaskA").get_handle());    // (5)
  scheduler2.emplace(1, createTask("  TaskB").get_handle());  // (6)
  scheduler2.emplace(2, createTask("    TaskC").get_handle());// (7)

  scheduler2.schedule();

  std::cout << '\n';

}

Die Coroutine createTask nimmt ihren String als const lvalue-Referenz (1) an, aber ihre Argumente "TaskA" - "TaskC" sind rvalues (2 - 7). Es ist undefiniertes Verhalten, eine Referenz auf eine temporäre Variable zu verwenden. Die weiteren Scheduler priority_SchedulerSimplified und priority_queueSchedulerComparator in den Beiträgen "Programmiersprache C++: Ein Prioritäts-Scheduler für Coroutinen" und "Programmiersprache C++: Ein anspruchsvoller Prioritäts-Scheduler für Coroutinen" besitzen das gleiche Problem.

Die Behebung des Problems ist einfach. Entweder nimmt die Coroutine createTask ihr Argument by Value an (Task createTask(std::string name)) oder ihre Argumente werden zu lvalues. Hier ist der zweite Ansatz in (1) - (3):

// priority_queueSchedulerPriority.cpp

#include <concepts>
#include <coroutine>
#include <functional>
#include <iostream>
#include <queue>
#include <utility>


struct Task {

  struct promise_type {
    std::suspend_always initial_suspend() noexcept { return {}; }
    std::suspend_always final_suspend() noexcept { return {}; }

    Task get_return_object() { 
        return std::coroutine_handle<promise_type>::from_promise(*this); 
    }
    void return_void() {}
    void unhandled_exception() {}
  };

  Task(std::coroutine_handle<promise_type> handle): handle{handle}{}

  auto get_handle() { return handle; }

  std::coroutine_handle<promise_type> handle;
};

using job = std::pair<int, std::coroutine_handle<>>;

template <typename Updater = std::identity,                         
          typename Comperator = std::ranges::less>                   
requires std::invocable<decltype(Updater()), int> &&                
         std::predicate<decltype(Comperator()), job, job>             
class Scheduler {

  std::priority_queue<job, std::vector<job>, Comperator> _prioTasks;

  public: 
    void emplace(int prio, std::coroutine_handle<> task) {
      _prioTasks.push(std::make_pair(prio, task));
    }

    void schedule() {
      Updater upd = {};                                            
      while(!_prioTasks.empty()) {
        auto [prio, task] = _prioTasks.top();
        _prioTasks.pop();
        task.resume();

        if(!task.done()) { 
          _prioTasks.push(std::make_pair(upd(prio), task));          
        }
        else {
          task.destroy();
        }
      }
    }

};


Task createTask(const std::string& name) {
  std::cout << name << " start\n";
  co_await std::suspend_always();
  for (int i = 0; i <= 3; ++i ) { 
    std::cout << name << " execute " << i << "\n";                  
    co_await std::suspend_always();
  }
  co_await std::suspend_always();
  std::cout << name << " finish\n";
}


int main() {

  std::cout << '\n';

  std::string taskA = "TaskA";                    // (1)
  std::string taskB = "  TaskB";                  // (2)
  std::string taskC = "    TaskC";                // (3)

  Scheduler scheduler1;                                          

  scheduler1.emplace(0, createTask(taskA).get_handle());
  scheduler1.emplace(1, createTask(taskB).get_handle());
  scheduler1.emplace(2, createTask(taskC).get_handle());

  scheduler1.schedule();

  std::cout << '\n';

  Scheduler<decltype([](int a) { return a - 1; })> scheduler2;        

  scheduler2.emplace(0, createTask(taskA).get_handle());
  scheduler2.emplace(1, createTask(taskB).get_handle());
  scheduler2.emplace(2, createTask(taskC).get_handle());

  scheduler2.schedule();

  std::cout << '\n';

}

Coroutines bieten eine intuitive Möglichkeit, asynchronen Code zu schreiben. Mein nächster Beitrag wird ein Gastbeitrag von Ljubic Damir sein, der einen Single-Producer - Single-Consumer-Workflow auf Basis von Coroutines vorstellt. (rme)