https://lafor.ge/feed.xml

Les Streams

2023-08-04

Bonjour à toutes et à tous 😀

La boucle for

Lorsque j'ai découvert les boucles en Rust, je suis tombé sur cet étrange bout de code.

for i in 0..4 {
    println!("{}", i)
} 

Lorsque l'exécute, on obtient

0                                                            
1                                                            
2                                                            
3                                                            

Le moi de l'époque n'a pensé qu'une chose : "Mais c'est quoi cette magie noire!! 😱"

Mais nous sommes en Rust et en Rust, presque tout est explicite. 🦀

Il suffit, de dérouler le fil. 🧶

Voici le vrai code qui se déroule:

let x = ???;
loop {
    if let Some(i) = x.next() {
        println!("{}", i)
    } else {
        break
    }
}

Nous allons détailler ce que x représente dans un instant.

Pour l'instant il faut comprendre que next retourne un Option<i32>.

Tant que l'on a pas atteint i = 3, on obtient un Some(i). Puis un None.

Ce que l'on peut redéfinir en

x.next() // Some(0)
x.next() // Some(1)
x.next() // Some(2)
x.next() // Some(3)
x.next() // None

Bon maintenant que nous avons le cahier des charges du comportement voulu par notre x, créons-le.

Nous savons qu'il se doit de poosséder un état interne.

struct X {
    state: i32
}

Il possède également une méthode next() -> Option<i32>.

Implémentons-la

impl X {
    fn next(& mut self) {
        if self.state < 4 {
            let i = self.state;
            self.state +=1;
            Some(i)
        } else {
            None
        }
    }
}

A chaque fois que l'on appelle next(), on vérifie le state,

  • si l'on est dans l'intervalle $[0; 3]$, on incrément le state et on renvoie un Some(i)
  • si celui excède $3$, on renvoie None

On peut alors utiliser notre super code

let mut x = X { state: 0 };
loop {
    if let Some(i) = x.next() {
        println!("{}", i)
    } else {
        break;
    }
}

Qui nous affiche comme prévu

0
1
2
3

Victory ! 😎

Le trait Iterator

Du coup faisons une petite expérience 🧪

let mut x = X { state: 0 };
for i in x {
    println!("{i}")
}

Nop ! 💥

error[E0277]: `X` is not an iterator
   |
   |     for i in x {
   |              ^ `X` is not an iterator
   |
   = help: the trait `Iterator` is not implemented for `X`

Quand je vous disais que ça se passait en réalité comme ça je vous ai à demi menti.

Afin de normaliser les choses, Rust s'appuie de manière massive sur les traits.

Ici le compilateur nous signale que X n'est pas un itérateur. parce qu'il n'implémente pas le trait Iterator.

Mais du coup cela signifie que la boucle for peut se résumer à:

for i in #quelque chose qui implémente Iterator# {

}

Implémentons Iterator pour X.

Voici la coquille vide de son implémentation

impl Iterator for X {
    type Item = ();

    fn next(&mut self) -> Option<Self::Item> {
        todo!()
    }
}

Ici deux choses nous intéresse:

La ligne

type Item = ()

on appelle ça un associated type en Rust, cela permet de définir un type présent dans toute l'implémentation du trait.

On le retrouve d'ailleurs dans

 Option<Self::Item>

La deuxième chose intéressante c'est que

fn next(&mut self) -> Option<Self::Item> {}

Ressemble très fortement à notre

fn next(&mut self) -> Option<i32> {}

Par conséquent on peut remplir par mimétisme le

type Item = i32

Et implémenter le reste:

impl Iterator for X {
    type Item = i32;

    fn next(&mut self) -> Option<Self::Item> {
        if self.state < 4 {
            let i = self.state;
            self.state +=1;
            Some(i)
        } else {
            None
        }
    }
}

Ce qui nous permet de l'utiliser dans notre boucle for

let x = X { state: 0 };
for i in x {
    println!("{i}")
}

Cool 😎

Mais que se passe-t-il si on tente de redemander un tour de boucle ?

let x = X { state: 0 };
for i in x {
    println!("{i}")
}

for i in x {
    println!("{i}")
}

Réponse une erreur, enfin plusieurs:

error[E0382]: use of moved value: `x`
   --> src\main.rs:39:14
    |
    |     let x = X { state: 0 };
    |         - move occurs because `x` has type `X`, which does not implement the `Copy` trait
    |     for i in x {
    |              - `x` moved due to this implicit call to `.into_iter()`
...
    |     for i in x {
    |              ^ value used here after move
    |
note: `into_iter` takes ownership of the receiver `self`, which moves `x`
    |
    |     fn into_iter(self) -> Self::IntoIter;

La première est que x a été déplacé, et ce déplacement a été provoqué par for qui implicitement (ouais c'est moche pour le coup 😫) écrit ce code

for i in x.into_iter() {}

Sauf que into_iter consomme ce que l'on lui donne à manger.

fn into_iter(self)

Donc cela signifie qu'avant même que la boucle for ne débute ses appels à x.next(). x n'existe déjà plus ... 😭

Ainsi quand on tente de réappliquer le x.into_iter(), nous n'avons plus de x pour bosser!

Ce comportement bien que perturbant est très intéressant car il empêche d'utiliser un itérateur qui a déjà été utilisé et donc consommé.

La deuxième erreur c'est que l'on ne possède pas le trait Copy sur X, donc comme le compilo peut rien faire il déplace x, mais s'il est Copy alors il pourra faire quelque chose.

Pour implémenter Copy, nous devons implémenter Clone, nous passons par les derives pour nous simplifier le travail.

#[derive(Copy, Clone)]
struct X {
    state: i32,
}

Ainsi, on peut maintenant réutiliser x autant de fois que l'on désire.

let x = X { state: 0 };
for i in x {
    println!("{i}")
}

for i in x {
    println!("{i}")
}

Comme le trait Copy est appelé pour chacune des boucles comme ceci:

let x = X { state: 0 };

let x_copy1 = x; // x_copy1 = X { state: 0 } -- copie de x en x_copy1
let iterator1 = x_copy1.into_iter(); // x_copy1 est consommé par into_iter mais pas x
for i in iterator1 {
    println!("{i}")
}

let x_copy2 = x; // x_copy2 = X { state: 0 } -- copie de x en x_copy2
let iterator2 = x_copy2.into_iter(); // x_copy2 est consommé par into_iter mais pas x
for i in iterator2 {
    println!("{i}")
}

Le x n'est jamais modifié puisque copié en l'état et donc state = 0 pour chaque copies en début de boucle.

Ce qui donne:

0
1
2
3
0
1
2
3

Le trait IntoIterator

Il s'est passé un truc étrange tout à l'heure, le compilo a rajouté un x.into_iter() et ça n'a pas bronché alors que ni X, ni le trait Iterator ne définit de méthode into_iter.

Ceci c'est encore un coup des traits !!

En fait tout Iterator implémente automatiquement IntoIterator au travers de

impl<I: Iterator> IntoIterator for I {
    type Item = I::Item;
    type IntoIter = I;

    #[inline]
    fn into_iter(self) -> I {
        self
    }
}

Ceci est fourni dans le langage directement.

Du coup ici

x.into_iterator() <=> x

Cela ne change rien à notre vie mais c'est intéressant pour autre chose. ^^

Il existe également une implémentation pour Vec<T>, et là on commence à causer sérieusement. 😀

for i in vec![0, 1, 2, 3] {

}

Ainsi le code réel deviens alors:

let iterator = (vec![0, 1, 2, 3]).into_iterator();
for i in iterator {

}

Et on revient en terrain connu.

En parlant de terrain connu, on peut maintenant presque deviner le fonctionnement de

for i in 0..4 {
    println!("{}", i)
} 

On peut supposer que en fait c'est plus

let iterator = (0..4).into_iterator();
for i in iterator {
    println!("{}", i)
} 

Plus qu'à comprendre le 0..4.

En Rust, ceci se nomme une Range, la notation du dessus n'est qu'un sucre syntaxique.

Donc 0..4 <=> Range { start: 0, end: 4 }.

Un rapide coup d'oeil dans la doc nous montre que Range implémente Iterator et donc par conséquent IntoIterator.

La boucle est bouclée! 🤣

Je vous laisse implémenter X pour avoir une borne de début et de fin si vous voulez.

Le trait Future

Rappelez-vous lorsque vous aviez 10 ans et que vous attiendiez le livreur arriver avec votre nouveau jeu.

Vous alliez tout les 5 min à la fenètre pour vérifier si le camion n'était pas au coin de la rue.

Deux solutions:

  • il n'est pas encore là et vous reviendrez dans 5 min vérifier pour la 20 ème fois de la journée ⌛
  • le camion est là !! vous vous précipitez pour ouvrir la porte 😀

En Rust on peut modéliser ces deux état via une énumération qui possède deux variantes.

  • Here : le camion est là ! 😀
  • NotYet : il n'est pas encore arrivé ⏳
enum DeliveryState {
    Here(i32)
    NotYet
}

Et pour simuler notre moi passé qui va à la fenêtre on faire quelque chose comme ceci.

On va là aussi détailler ce qu'est x.

let x = ???;
loop {
    if let DeliveryState::Here(i) = x.check_delivery() {
        println!("The truck is here {i}");
        break
    }
    // on attend 1 seconde avant de réessayer
    println!("Truck not yet here");
    sleep(Duration::from_secs(1))
}

Ce qu'il faut comprendre c'est que l'on désire que x possède une fonction check_delivery qui doit renvoyer un DeliveryState.

fn delivery(&self) -> DeliveryState;

Du coup on peut se créer cette structure qui correspond au cahier des charges du dessus.

struct X;

impl X {
    fn check_delivery(&self) -> DeliveryState {
        DeliveryState::NotYet
    }
}

Ce qui donne

    let x = X;
    loop {
        if let DeliveryState::Here(i) = x.check_delivery() {
            println!("The truck is here {i}");
            break;
        }
        // on attend 1 seconde avant de réessayer
        println!("Truck not yet here");
        sleep(Duration::from_secs(1))
    }

Si on lance ce code, nous allons obtenir:

Truck not here yet
Truck not here yet
Truck not here yet
...
Truck not here yet
Truck not here yet
Truck not here yet

Indéfininiment, oui, normal, vu que l'on a pas d'état, cela ne risque pas de bouger.

Créons nous un état un peu stupide, mais suffisant pour la démonstration.

Lorsque le compteur atteint $0$, le paquet est livré.

struct X { counter: i32, data: i32 }

impl X {
    fn check_delivery(&mut self) -> DeliveryState {
        self.counter -= 1;
        if self.counter == 0 {
            DeliveryState::Here(self.data)
        } else {
            DeliveryState::NotYet
        }

    }
}

On retente notre chance avec la nouvelle implémentation

let mut x = X {
    data: 42,
    counter: 4,
};

loop {
    if let DeliveryState::Here(i) = x.check_delivery() {
        println!("The truck is here {i}");
        break;
    }
    // on attend 1 seconde avant de réessayer
    println!("Truck not here yet");
    sleep(Duration::from_secs(1))
}

Cette fois-ci c'est mieux 😁

Truck not here yet
Truck not here yet
Truck not here yet
The truck is here 42

Bien-sûr, on se doute que l'on ne va pas devoir réimplémenter cette logique à chaque fois.

Tout comme notre implémentation naïve d'itérateur avait la version trait Iterator.

Il existe également un trait qui se nomme Future, il est fourni par la bibliothèque standard.

En voici son squelette:

impl Future for X {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        todo!()
    }
}

Donc ici pas de grosse surprise

type Output

Permet de définir ce que l'on veut renvoyer lorsque que ce sera le moment.

Le type de retour est

Poll<Self::Output>

Voyons ce qu'est Poll.

Il s'agit d'une énumération.

pub enum Poll<T> {
    Ready(T),
    Pending,
}

Je ne vous fait pas l'affront de vous expliquer en quoi c'est un peu beacoup ressemblant à DeliveryState. 😛

Seul petite amélioration, on peut mettre ce que l'on veut dans le Ready et pas seulement un i32.

Les paramètres de fonctions c'est un peu plus compliqué.

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>);

Le self est un peu bizarre, nous sommes d'habitude habitué à voir &mut self et c'est tout.

Ici le self est de type Pin.

Je ne vais pas expliquer ici en détail, mais ce qu'il faut en comprendre c'est que l'on est assuré de pouvoir modifier correctement self grâce à ce type.

Le cx représente le contexte d'exécution, on revient à lui plus tard 😀

Bon implémentons notre trait Future pour X.

let mut x = X {
    data: 42,
    counter: 4,
};

loop {
    if let Poll::Ready(i) = x.poll() {
        println!("The truck is here {i}");
        break;
    }
    // on attend 1 seconde avant de réessayer
    println!("Truck not here yet");
    sleep(Duration::from_secs(1))
}

Enfin, essayons, parce que pour le moment ce n'est pas brillant...

error[E0599]: no method named `poll` found for struct `X` in the current scope
   | 
   | struct X {
   | -------- method `poll` not found for this struct
...
   |         if let Poll::Ready(i) = x.poll() {
   |                                   ^^^^ method not found in `X`
   |
   = help: items from traits can only be used if the trait is implemented and in scope
   = note: the following trait defines an item `poll`, perhaps you need to implement it:
           candidate #1: `Future`

Mais, mais ... On a un trait Future pour X donc x.poll() devrait exister, non ?

Ah non! on a implémenté

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>);

Pas

fn poll(&mut self, cx: &mut Context<'_>);

Donc le vrai variable qui possède poll() c'est:

Pin::new(&mut x)

Et non x tout seul.

On change:

let mut x = X {
   data: 42,
   counter: 4,
};

loop {
   if let Poll::Ready(i) = Pin::new(&mut x).poll() {
       println!("The truck is here {i}");
       break;
   }
   // on attend 1 seconde avant de réessayer
   println!("Truck not here yet");
   sleep(Duration::from_secs(1))
}

Mieux mais pas top ...

error[E0061]: this method takes 1 argument but 0 arguments were supplied
   |
   |         if let Poll::Ready(i) = Pin::new(&mut x).poll() {
   |                                                  ^^^^-- an argument of type `&mut Context<'_>` is missing
   |
note: method defined here
  --> C:\Users\Noa\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\future\future.rs:105:8
   |
   |     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
   |        ^^^^
help: provide the argument
   |
   |         if let Poll::Ready(i) = Pin::new(&mut x).poll(/* &mut Context<'_> */) {
   |    

Ah oui c'est vrai que poll prend un deuxième paramètre de type Context.

Plus qu'à le créer 🙂

Sauf qu'il ne possède pas de constructeur ce type, et il nécessite un champ waker de type Waker.

struct Context<'a> {
    waker: &'a Waker,

Bon, du coup créons un Waker, sauf que rebelote pas de constructeur non plus. la structure demande un RaWaker.

struct Waker {
    waker: RawWaker,
}

On continue à suivre les miettes de pains ...

Un constructeur mais RawWakerVTable, c'est quoi ça encore 😫

impl RawWaker {
    fn new(data: *const (), vtable: &'static RawWakerVTable) -> RawWaker {...}
}

De toute façon on a descendu dans les entrailles donc pourquoi pas continuer ... 🥱

impl RawWakerVTable {
    fn new(
            clone: unsafe fn(*const ()) -> RawWaker,
            wake: unsafe fn(*const ()),
            wake_by_ref: unsafe fn(*const ()),
            drop: unsafe fn(*const ()),
        ) -> Self {
            Self { clone, wake, wake_by_ref, drop }
    }
}

Allez ! de l'unsafe, bon on ira pas plus loin 🤣

Du coup, nous sommes bloqué ?

Mais non, il y a toujours une porte de sortie.

La structure Waker possède une méthode noop qui a pour signature

fn Waker::noop() -> Waker

Elle est juste caché derrière un flag unstable. On peut débrayer ce comportement en rajoutant un attribut à notre fichier.

#![feature(noop_waker)]

Ok, donc on a un Waker, plus qu'à créer notre contexte et on est bon non ? ^^

Et comme Context possède le bon goût d'avoir une méthode from_waker(&Waker).

Nous pouvons terminer de rajouter ce qu'il nous manque.

let mut x = X {
    data: 42,
    counter: 4,
};

// création du waker
let waker = Waker::noop();
// création du contexte
let mut cx = Context::from_waker(&waker);

loop {
    if let Poll::Ready(i) = Pin::new(&mut x).poll(&mut cx) {
        println!("The truck is here {i}");
        break;
    }
    // on attend 1 seconde avant de réessayer
    println!("Truck not here yet");
    sleep(Duration::from_secs(1))
}

Victoire

Truck not here yet
Truck not here yet
Truck not here yet
The truck is here 42

Mais en demi teinte, c'est passablement plus compliqué à écrire, et je n'ai pas du tout apprécié mon aventure "à la recheche du Context" perdu ... 🙄

Les Runtimes

Tout semble compliqué parce que nous ne sommes pas sensé faire tout cela.

En réalité, nous sommes sensé utiliser un Runtime qui va se charger de tout le travail pénible à notre place.

En Rust, il existe plein de runtimes asynchrones différents, et aucun fourni par la bibliothèque standard.

On va donc devoir en récupérer un. J'ai choisi de le faire avec la crate smol.

Mais ça aurait pu aussi être du tokio ou de l'async-std.

Bref, installons smol.

cargo add smol

La crate smol fourni une fonction block_on qui prend une Future<Output=T>.

Et ça tombe bien, X implémente Future<Output=i32>.

Modifions notre code pour utiliser smol

fn main() {
    let i = smol::block_on(X {
            data: 42,
            counter: 4,
        });
    println!("The truck is here {i}");
}

Comme la boucle semble avoir disparu, modfions un peu X::poll pour faire apparaître le print du "not yet".

impl Future for X {
    type Output = i32;

    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.counter -= 1;
        if self.counter == 0 {
            Poll::Ready(self.data)
        } else {
            println!("Truck not here yet");
            Poll::Pending
        }
    }
}

Bon on essaie 🙂

Truck not here yet

Une ligne, pas une de plus, le thread est bloqué...

Mais ça c'est parce que notre hypothèse de travail est fausse.

Si on ouvre le smol::block_on, on peut le découper grosso modo en ce ceci

(je schématise c'est plus compliqué que cela, mais ça sera suffisant pour l'explication)

fn block_on(future: Future<Output=T>) -> T {
    // On créé un tuyaux qui lie unparker à parker
    let (parker, unparker) = channel();

    // On créé un waker
    let waker = waker_fn(|| {
        // On envoie un message à parker
        unparker.unpark()
    });

    // on créé le contexte
    let cx = &mut Context::from_waker(&waker);

    loop {
        if let Poll::Ready(i) = Pin::new(&mut future).poll(cx) {
            return i;
        }

        // Met en pause le thread ⏸
        // Attend que unparker lui envoie un signal
        parker.park()
    }

}

La deuxième pièce du puzzle est la méthode waker_fn, elle créé un Waker qui possède une méthode wake_by_ref().

Donc si l'on fait

let waker = waker_fn(|| println!("dring ⏰ c'est l'heure de se réveiller"));
waker.wake_by_ref();

Cela affiche dans la console

dring ⏰ c'est l'heure de se réveiller

Or il est possible au travers du Context d'atteindre le Waker et par conséquent le wake_by_ref.

cx.waker().wake_by_ref();

Comme le Context est passé à la fonction poll(&mut cx).

Alors cela signifie que nous avons accès dans l'implémentation de Future de X à ce wake_by_ref

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.counter -= 1;
        if self.counter == 0 {
            Poll::Ready(self.data)
        } else {
            println!("Truck not here yet");
            // On averti le runtime que de la donnée est déjà disponible et 
            // qu'il peut relancer poll() immédiatement
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }

Qui a la possibilité d'appeler le unparker.unpark()

let waker = waker_fn(|| {
    // On envoie un message à parker
    unparker.unpark()
});

et ainsi débloquer le thread

    loop {
        if let Poll::Ready(i) = Pin::new(&mut future).poll(cx) {
            return i;
        }

        // Un signal est reçu ⚡
        parker.park()
        // le thread n'est plus en pause 
        // la boucle peut reprendre ▶
    }

Si on graphe temporellement, cela donne quelque chose comme cela.

  sequenceDiagram


        activate Runtime
            Runtime->>Future: poll
            activate Future
            Future -->>Runtime : Poll::Pending
            Runtime -->> Runtime: park
            Future --)Runtime : wake_by_ref
            deactivate Future
            Runtime -->> Runtime: unpark
        deactivate Runtime 

        Note left of Runtime: Fin du premier poll

        activate Runtime
            Runtime->>Future: poll
            activate Future
            Future -->>Runtime : Poll::Pending
            Runtime -->> Runtime: park
            Future --)Runtime : wake_by_ref
            deactivate Future
            Runtime -->> Runtime: unpark
        deactivate Runtime 

        Note left of Runtime: Fin du second poll

        activate Runtime
            Runtime->>Future: poll
            activate Future
            Future -->>Runtime : Poll::Ready
            deactivate Future
        deactivate Runtime 
  
    Note left of Runtime: Retour

Le Runtime va poll un première fois la future, se park en attente d'un unpark.

La future va lui déclencher son unpark au travers de wake_by_ref. Et le Runtime re-poll et ainsi de suite, jusqu'au Poll::Ready qui coupe la boucle.

Récapitualtif de notre code:

struct X {
    counter: i32,
    data: i32,
}

impl Future for X {
    type Output = i32;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.counter -= 1;
        if self.counter == 0 {
            Poll::Ready(self.data)
        } else {
            println!("Truck not here yet");
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

fn main() {
    let i = smol::block_on(X {
            data: 42,
            counter: 4,
        });
        println!("The truck is here {i}");
}

On lance et joie !

Truck not here yet
Truck not here yet
Truck not here yet
The truck is here 42

Tout marche !! 😎

Finalement, ce n'est pas si compliqué ou fastidieux ^^

Des futures de futures

Enfin presque, il nous manque le fait que l'on attend 1 seconde entre chaque poll.

On va donc simuler ça.

Attention

Cette implémentation est très mauvaise, c'est simplement pour se donner une idée.

Pour bien faire il faudrait utiliser un Reactor et des timers, mais ça sera pour un prochain article.

struct Timer {
    target: Instant,
}

impl Timer {
    fn after(delay: u64) -> Self {
        Self {
            // timestamp actuel + "delay" secondes
            target: Instant::now().add(Duration::from_secs(delay)),
        }
    }
}

impl Future for Timer {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // timestamp actuel
        let last = Instant::now();

        // si le temps est écoulé
        if last >= self.target {
            return Poll::Ready(());
        }

        // sinon on réveille le Runtime
        cx.waker().wake_by_ref();
        Poll::Pending
    }
}

C'est une future, on peut donc l'utiliser dans notre Runtime et la poll.

fn main() {
    let now = Instant::now();
    smol::block_on(Timer::after(5));
    // elasped() calcul le temps passé depuis la création de `now`
    println!("Il s'est passé {:?}", now.elapsed());
}

Ce qui affiche

Il s'est passé 5.0000061s

Nous avons bien une future qui se résout au bout de 5 secondes.

Du coup, on peut la poll dans un polling !!

impl Future for X {
    type Output = i32;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.counter -= 1;
        if self.counter == 0 {
            Poll::Ready(self.data)
        } else {
            println!("Truck not here yet");

            // on créé le timer
            let mut timer = Timer::after(1);

            // on le poll de manière horrible !!
            // Nous ne sommes pas sensé être ici
            loop {
                if let Poll::Ready(_) = Pin::new(&mut timer).poll(cx) {
                    break;
                }
            }

            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

Attention

Je répète : NE FAITES PAS CA c'est pour l'exemple.

Vous allez tuer vos performances !!!

le wake interne au timer n'est pas le bon et plein d'autres choses ne vont pas.

Excepté ce petit avertissement ^^

fn main() {
    let now = Instant::now();
    let i = smol::block_on(X {
        counter: 4,
        data: 42,
    });
    // elasped() calcul le temps passé depuis la création de `now`
    println!("Il s'est passé {:?}", now.elapsed());
    println!("The truck is here {i}");
}

C'est quand même ce qu'on désire ^^

Truck not here yet
Truck not here yet
Truck not here yet
Il s'est passé 3.0014149s
The truck is here 42

Les sucres syntaxiques async/.await

Maintenant que l'on a globalement fait n'importe quoi, essayons de rentrer dans le rang. 😁

Petite expérience avant.

Voici une future qui finit au premier poll

struct AlwaysReady<T>(T);

impl<T: Clone> Future for AlwaysReady<T> {
    type Output = T;

    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
        Poll::Ready(self.0.clone())
    }
}

Elle s'utilise ainsi:

fn main() {
    let i = smol::block_on(AlwaysReady(42));
    println!("La valeur est {i}");
}

Ce qui affiche

La valeur est 42

Mais si je vous disais que c'est totalement inutile ^^

Rust fourni un sucre syntaxique pour ce genre d'usage au travers du mot-clef async.

fn main() {
    let future = async {
        42
    };

    let i = smol::block_on(future);
    println!("La valeur est {i}");
}

Qui écrit dans la console

La valeur est 42

Cela fonction car aussi bien async {42} que AlwaysReady(42) implémente Future<Output=i32>.

On peut même simplifier en:

fn main() {
    let i = smol::block_on(async { 42 });
    println!("La valeur est {i}");
}

Ok donc on peut faire implémenter le comportement de X.

fn main() {
    let now = Instant::now();
    let i = smol::block_on(async { 
        let mut counter = 0;
        loop {
            counter += 1;
            if counter > 4 {
                break 42;
            }
            Timer::after(1);
            println!("Truck not here yet");
        }
     });
    println!("Il s'est passé {:?}", now.elapsed());
    println!("La valeur est {i}");
}

Alors oui mais c'est un peu rapide ...

Truck not here yet
Truck not here yet
Truck not here yet
Truck not here yet
Il s'est passé 475.2µs
La valeur est 42

Pourtant le timer est bien là. 🙄

Oui mais une future qui n'est pas poll ne fait rien.

Pollons-la !

fn main() {
    let now = Instant::now();
    let i = smol::block_on(async {
        let mut counter = 0;
        loop {
            counter += 1;
            if counter > 4 {
                break 42;
            }
            let mut timer = Timer::after(1);
            loop {
                // on a pas de contexte à fournir à poll()
                if Pin::new(&mut timer).poll(???).is_ready() {
                    break
                }
            }
            println!("Truck not here yet");
        }
    });
    println!("Il s'est passé {:?}", now.elapsed());
    println!("La valeur est {i}");
}

Oui mais avec quel contexte ?

Alors oui notre arnaque moldave marche ...

let waker = Waker::noop();
let mut cx = Context::from_waker(&waker);
let mut timer = Timer::after(1);
loop {
    if Pin::new(&mut timer).poll(&mut cx).is_ready() {
        break;
    }
}

Et affiche bien ce qu'on désire

Truck not here yet
Truck not here yet
Truck not here yet
Truck not here yet
Il s'est passé 4.0007765s
La valeur est 42

Mais là aussi on sent que ce n'est pas élégant.

C'est pour cela que Rust fourni un dual à async qui se nomme .await.

C'est un sucre syntaxique qui s'utilise ainsi.

fn main() {
    let now = Instant::now();
    let i = smol::block_on(async { 
        let mut counter = 0;
        loop {
            counter += 1;
            if counter > 4 {
                break 42;
            }
            // on poll la future via la syntaxe `.await`
            Timer::after(1).await;
            println!("Truck not here yet");
        }
     });
    println!("Il s'est passé {:?}", now.elapsed());
    println!("La valeur est {i}");
}

Bon là on parle !

Truck not here yet
Truck not here yet
Truck not here yet
Truck not here yet
Il s'est passé 4.0006858s
La valeur est 42

le mot-clef .await s'utilise sur n'importe quelle Future. Il a pour résultat d'écrire l'arnaque moldave mais en mieux 😂

je ne ferai aucun commentaire, ce n'est que pour votre culture.

C'est pas du vrai Rust mais du pseudo-rust, ça compile pas.

<expr>.await

devient

match ::std::future::IntoFuture::into_future(<expr>) {
    mut __awaitee => loop {
        match unsafe { ::std::future::Future::poll(
            <::std::pin::Pin>::new_unchecked(&mut __awaitee),
            ::std::future::get_context(task_context),
        ) } {
            ::std::task::Poll::Ready(result) => break result,
            ::std::task::Poll::Pending => {}
        }
        task_context = yield ();
    }
}

Mais ça ressemble vaguement à ce qu'on a écrit. La récupération du Context en mieux.

source

Fonctions asynchrones

Mais on ne va tout de même pas écrire tout notre code dans un block? si ? Non biensûr que non.

On peut découper en fonction.

fn iteration(counter: i32) -> bool {
    if counter > 4 {
        return true
    }
    Timer::after(1).await;
    println!("Truck not here yet");
    false
}

fn main() {
    let now = Instant::now();
    let i = smol::block_on(async { 
        let mut counter = 0;
        loop {
            counter += 1;
            if iteration(counter) {
                break 42;
            }
            
        }
     });
    println!("Il s'est passé {:?}", now.elapsed());
    println!("La valeur est {i}");
}

(Le découpage est un peu nul, mais c'est pour l'exemple ^^')

Si on essaie d'exécuter

error[E0728]: `await` is only allowed inside `async` functions and blocks
   |
   | fn iteration(counter: i32) -> bool {
   |    --------- this is not `async`
...
   |     Timer::after(1).await;
   |                     ^^^^^ only allowed inside `async` functions and blocks

Rust nous dit que nous n'avons pas le droit d'utiliser .await en dehors d'un block async ou d'une fonction async.

Tiens, tiens.

On peut donc rendre une fonction asynchrone. Essayons.

async fn iteration(counter: i32) -> bool {
    if counter > 4 {
        return true
    }
    Timer::after(1).await;
    println!("Truck not here yet");
    false
}

fn main() {
    let now = Instant::now();
    let i = smol::block_on(async { 
        let mut counter = 0;
        loop {
            counter += 1;
            if iteration(counter) {
                break 42;
            }
            
        }
     });
    println!("Il s'est passé {:?}", now.elapsed());
    println!("La valeur est {i}");
}

Il est bien gentil il nous mâche tout le boulot, même plus besoin de moi pour expliquer 😎

error[E0308]: mismatched types
    |
    |             if iteration(counter) {
    |                ^^^^^^^^^^^^^^^^^^ expected `bool`, found future
    |
note: calling an async function returns a future
   --> src\main.rs:104:16
    |
    |             if iteration(counter) {
    |                ^^^^^^^^^^^^^^^^^^
help: consider `await`ing on the `Future`
    |
    |             if iteration(counter).await {
    |                                  ++++++

Bien Grand-Chef !

async fn iteration(counter: i32) -> bool {
    if counter > 4 {
        return true
    }
    Timer::after(1).await;
    println!("Truck not here yet");
    false
}

fn main() {
    let now = Instant::now();
    let i = smol::block_on(async { 
        let mut counter = 0;
        loop {
            counter += 1;
            // on poll la future
            if iteration(counter).await {
                break 42;
            }
            
        }
     });
    println!("Il s'est passé {:?}", now.elapsed());
    println!("La valeur est {i}");
}

Pas mal !

Truck not here yet
Truck not here yet
Truck not here yet
Truck not here yet
Il s'est passé 4.0006166s
La valeur est 42

Du coup c'est aussi du sucre mais pourquoi ?

Prenons du plus simple.

async fn f() -> i32 {
    42
}

Clairement c'est une Future qui est renvoyée.

fn main() {
    let i = smol::block_on(f());
    println!("La valeur est {i}"); // affiche La valeur est 42
}

Donc égal à


fn g() -> impl Future<Output = i32> {
    async { 42 }
}

fn main() {
    let i = smol::block_on(g());
    println!("La valeur est {i}"); // affiche La valeur est 42
}

D'où

async fn iteration(counter: i32) -> bool {
    if counter > 4 {
        return true
    }
    Timer::after(1).await;
    println!("Truck not here yet");
    false
}

// Equivalent à

fn iteration(counter: i32) -> impl Future<Output=bool> {
    async {        
        if counter > 4 {
            return true
        }
        Timer::after(1).await;
        println!("Truck not here yet");
        false
    }
}

On retrouve notre async, on peut utiliser notre .await. 😀

Itérer sur des futures

Tout ça commence à être très sympa, maintenant, nous allons exploiter la lazyness des futures à notre avantage.

Autrement dit, tant qu'une future n'est pas poll, elle ne fait rien, ce n'est qu'une boîte que l'on peut transporter et référencer.

On va donc s'en faire une collections et itérer dessus.

Et oui ! Tout le laïus interminable sur les itérateurs d'il y a plus de 1000 lignes c'était pas pour faire du remplissage. 😁

Donc première chose, nous allons créer une fonction delivery qui nous renverra une Future.

async fn delivery(delay: i32, value: i32) -> i32 {
    let mut counter = 0;
    let now = Instant::now();
    loop {
        counter += 1;
        if iteration(counter, delay).await {
            println!("Il s'est passé {:?}", now.elapsed());
            break value;
        }
    }
}

On modifie également la méthode itération

async fn iteration(counter: i32, target: i32) -> bool {
    if counter > target {
        return true;
    }
    Timer::after(1).await;
    println!("Truck not here yet");
    false
}

Notez que tout est en async fn donc tout est Future.

On peut donc mettre notre Future dans un tableau et itéré dessus. 😀

fn main() {
    // on créer un tableau de future
    let deliveries = vec![
        delivery(4, 42), 
        delivery(2, 12)
    ];

    smol::block_on(async {
        // on boucle dessus
        for delivery_process in deliveries {
            // on poll la future
            println!("La valeur est {}", delivery_process.await);
            println!()
        }
    });
}

On affiche bien

Truck not here yet
Truck not here yet
Truck not here yet
Truck not here yet
Il s'est passé 4.0006349s
La valeur est 42

Truck not here yet
Truck not here yet
Il s'est passé 2.0002937s
La valeur est 12
use std::future::Future;
use std::ops::Add;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

struct Timer {
    target: Instant,
}

impl Timer {
    fn after(delay: u64) -> Self {
        Self {
            target: Instant::now().add(Duration::from_secs(delay)),
        }
    }
}

impl Future for Timer {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let last = Instant::now();

        if last >= self.target {
            return Poll::Ready(());
        }

        cx.waker().wake_by_ref();
        Poll::Pending
    }
}

async fn iteration(counter: i32, target: i32) -> bool {
    if counter > target {
        return true;
    }
    Timer::after(1).await;
    println!("Truck not here yet");
    false
}

async fn delivery(delay: i32, value: i32) -> i32 {
    let mut counter = 0;
    let now = Instant::now();
    loop {
        counter += 1;
        if iteration(counter, delay).await {
            println!("Il s'est passé {:?}", now.elapsed());
            break value;
        }
    }
}

fn main() {
    // on créer un tableau de future
    let deliveries = vec![
        delivery(4, 42), 
        delivery(2, 12)
    ];

    smol::block_on(async {
        // on boucle dessus
        for delivery_process in deliveries {
            // on poll la future
            println!("La valeur est {}", delivery_process.await);
            println!()
        }
    });
}

Donc on sait itérer mais il manque un dernier morceau, pour le moment nous avons une relation 1:1 entre le tableau et ce qu'on itère.

Nous allons voir comment créer une indirection.

Itérateur asynchrone

Cahier des charges

Voici le jeu de données que l'on possède. On va le transformer en un tableau de futures pour simuler une latence réseau par exemple.

let data = vec![
    ("prefix_1_data1", "jeu"),
    ("prefix_1_data2", "de"),
    ("prefix_1_data3", "données 1"),
    ("prefix_2_data1", "le"),
    ("prefix_2_data2", "ciel"),
    ("prefix_2_data3", "est"),
    ("prefix_2_data4", "gris"),
    ("prefix_3_data1", "L'asynchrone"),
    ("prefix_3_data2", "ce n'est"),
    ("prefix_3_data3", "pas si"),
    ("prefix_3_data4", "compliqué"),
];

Voici le résultat que l'on désire

let expected = [
    "jeu de données 1", 
    "le ciel est gris", 
    "L'asynchrone ce n'est pas si compliqué"
];

Attention ça va être un peu tordu, l'abstraction nécessite parfois de tordre la réalité ^^

Tout d'abord que voulons nous comme API haut-niveau ?

Dans l'idéal ceci :

fn main() {

    let data = vec![....]; // Un tableau de futures

    smol::block_on(async {
        let mut x = X::new(data); 

        for sentence in x.await {
            // ...
        }

    });
}

Mais nous ne pourrons pas faire ça, rappellez vous, si l'on desugar cela donnerait:

let iterator = (x).into_iterator(); // iterator = x
for sentence in iterator.await {}

Or cela nous oblige à implémenter à la fois Future et Iterator.

Mais du coup, le .await n'est là que pour sortir l'iterateur, pas poller de la nouvelle données.

Bref on se retrouve coincé assez vite dans notre tentative de for.

Bon et par un while. On implémente Future sur X et fin.

fn main() {

    let data = vec![....]; // Un tableau de futures

    smol::block_on(async {
        let mut x = X::new(data); 

        while let Some(sentence) = x.await {

        } 

    });
}

Mais non 🙄

error[E0382]: use of moved value: `x`
    |
    |         let mut x = X::new(data);
    |             ----- move occurs because `x` has type `X`, which does not implement the `Copy` trait
    |
    |         while let Some(t) = x.await {}
    |         --------------------^------
    |         |                     |
    |         |                     `x` moved due to this await, in previous iteration of loop
    |         inside of this loop
    |
note: `into_future` takes ownership of the receiver `self`, which moves `x`
    |
    |     fn into_future(self) -> Self::IntoFuture;

Car derrière le rideau, .await applle into_future qui consomme x,

Nous n'allons pas non plus pouvoir copier car les data seront allouées et donc non-copiable.

Nous devons donc partir sur une double indirection.

fn main() {

    let data = vec![....];

    smol::block_on(async {
        let mut x = X::new(data); 

        while let Some(sentence) = x.next().await {

        } 

    });
}

Qu'il faut lire comme

loop {
    let next_future = x.next();
    if let Some(sentence) = next_future.await {

    } else {
        break
    }
}

On va donc appelé x.next() à chaque tour de boucle, on sait aussi que next_future est une impl Future<Output=Option<String>> vu que l'on veut des Option<String> en bout de chaîne.

On sait également que next() ne peut pas consommer x sinon on retombe dans le même travers que into_future.

Donc X possède une fonction next qui retourne quelque chose Y qui implémente Future<Output=Option<String>>.

Pour plus de lisibilité, nous allons appeler notre Y une NextFuture.

Il faudra également que NextFuture connaisse le X qu'il l'a créé.

Par conséquent la signature de X::next est :

fn next(&mut self) -> NextFuture;

Nous pouvons en déterminer également le

type Output = Option<String>

Et donc implémenter NextFuture.

Alors c'est parti pour implémenter tout ce beau monde.

Avec une future fixe

D'abord X.

On se créé un type custom.

type FutureInput = AlwaysReady<(String, String)>;

Puis X

struct X {
    data: Vec<FutureInput>,
    current: Option<FutureInput>,
}

Puis on implémente le constructeur et next

impl X {
    fn new(data: Vec<(&str, &str)>) -> Self {
        let data = data
            .into_iter()
            // on créé un tableau de futures de nos données
            .map(|(x, y)| AlwaysReady((x.to_string(), y.to_string())))
            // on inverse parce que l'on va pop par la fin, or on veut le début
            .rev()
            .collect();

        Self {
            data,
            current: None,
        }
    }

    fn next(&mut self) -> NextFuture<'_> {
        NextFuture { x: self }
    }
}

Le <'_> après le NextFuture c'est pour pas s'ennuyer avec la lifetime, on laisse Rust bosser comme un grand.

Comme promis, NextFuture prend une référence de X mutable.

struct NextFuture<'a> {
    x: &'a mut X,
}

L'implémentation de la Future appelle le poll de la référence de X.

impl<'a> Future for NextFuture<'a> {
    type Output = Option<String>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        Pin::new(&mut self.x).poll(cx)
    }
}

Du coup, on implémente également Future pour X.

impl Future for X {
    // peut-être un nouvel élément
    type Output = Option<String>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // on initialise le poll à venir
        if self.current.is_none() {
            self.current = self.data.pop();
        }

        match &mut self.current {
            // s'il n'y a plus rien a poll on envoie None
            None => Poll::Ready(None),
            // si on peut poll, on poll
            Some(ref mut current_future) => match Pin::new(current_future).poll(cx) {
                // si les données son disponible
                Poll::Ready(data) => {
                    // on réinitialise le current poll
                    self.current = None;
                    // on renvoie les données
                    Poll::Ready(Some(data.0))
                }
                // sinon on demande gentiement de revenir plus tard
                Poll::Pending => {
                    // on réveille le runtime
                    cx.waker().wake_by_ref();
                    // on lui explique que ce n'est pas pour tout de suite les données
                    Poll::Pending
                },
            },
        }
    }
}

Et en exécutant,

fn main() {
    let data = vec![
        ("prefix_1_data1", "jeu"),
        ("prefix_1_data2", "de"),
        ("prefix_1_data3", "données 1"),
        ("prefix_2_data1", "le"),
        ("prefix_2_data2", "ciel"),
        ("prefix_2_data3", "est"),
        ("prefix_2_data4", "gris"),
        ("prefix_3_data1", "L'asynchrone"),
        ("prefix_3_data2", "ce n'est"),
        ("prefix_3_data3", "pas si"),
        ("prefix_3_data4", "compliqué"),
    ];

    smol::block_on(async move {
        let mut x = X::new(data);

        while let Some(key) = x.next().await {
            println!("la clef vaut {key}");
        }
    })
}

on obtient

la clef vaut prefix_1_data1
la clef vaut prefix_1_data2
la clef vaut prefix_1_data3
la clef vaut prefix_2_data1
la clef vaut prefix_2_data2
la clef vaut prefix_2_data3
la clef vaut prefix_2_data4
la clef vaut prefix_3_data1
la clef vaut prefix_3_data2
la clef vaut prefix_3_data3
la clef vaut prefix_3_data4

Cool mais mais c'est pas très souple, nous avons seulement une future qui se résout tout de seul.

Nous, nous voulons du délai pour simuler le réseau.

On va procéder un remaniement ministériel.

Avec une future quelconque

Premièrement, nous nons créons une fonction asynchrone qui sera notre future avec délai.

use async_io::Timer;

async fn delivery<T>(delay: u64, value: T) -> T {
    let now = Instant::now();
    Timer::after(Duration::from_millis(delay)).await;
    println!("Il s'est passé {:?}", now.elapsed());
    value
}

Pour l'occasion, je remplace mon Timer foireux par un vrai, celui fourni par async_io au travers de smol.

Je créé deux types pour me faciliter la vie

type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
type FutureInput = (String, String);

Le premier permet de ranger une future quelconque dans une boîte et de pouvoir la transbahuter sans y penser.

Le second est l'entré des données: (clef, valeur)

Je modifie X pour refléter se nouveau type.

struct X {
    data: Vec<BoxFuture<FutureInput>>,
    current: Option<BoxFuture<FutureInput>>,
}

Je créé une fonction boxed qui va transformer une Future en BoxFuture, le type est compliqué mais globalement, ça veut juste dire "c'est open bar, fait ce que tu veux, j'ai calé ta future en mémoire, je te file de quoi y accéder et je te promet qu'elle sera là quand tu en aura besoin".

fn boxed<'a, U, T: Future<Output = U>>(value: T) -> Pin<Box<dyn Future<Output = U> + Send + 'a>>
where
    T: Sized + Send + 'a,
{
    Box::pin(value)
}

Je modifie la fonction new en conséquence, et en rajoutant un peu d'aléatoire.

cargo add rand
impl X {
    fn new(data: Vec<(&str, &str)>) -> Self {
        let mut rng = rand::thread_rng();
        let data = data
            .into_iter()
            .map(|(x, y)| {
                // on génère le délai
                let delay = rng.gen_range(100..2000);
                // on créé la future
                let fut = delivery(delay, (x.to_string(), y.to_string()));
                // on box la future
                boxed(fut)
            })
            .rev()
            .collect();

        Self {
            data,
            current: None,
        }
    }
}

Le reste est identique

use async_io::Timer;
use rand::Rng;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

// déclaration des types
type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
type FutureInput = (String, String);

// future avec délai
async fn delivery<T>(delay: u64, value: T) -> T {
    let now = Instant::now();
    Timer::after(Duration::from_millis(delay)).await;
    println!("Il s'est passé {:?}", now.elapsed());
    value
}

// boxing de future
fn boxed<'a, U, T: Future<Output = U>>(value: T) -> Pin<Box<dyn Future<Output = U> + Send + 'a>>
where
    T: Sized + Send + 'a,
{
    Box::pin(value)
}

/// Représente le retour de `X::next()`
struct NextFuture<'a> {
    x: &'a mut X,
}

impl<'a> Future for NextFuture<'a> {
    type Output = Option<String>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        Pin::new(&mut self.x).poll(cx)
    }
}

/// Itérateur asynchrone
struct X {
    /// Futures à itérer
    data: Vec<BoxFuture<FutureInput>>,
    /// Futur actuellement itérée
    current: Option<BoxFuture<FutureInput>>,
}

impl X {
    fn new(data: Vec<(&str, &str)>) -> Self {
        let mut rng = rand::thread_rng();
        let data = data
            .into_iter()
            .map(|(x, y)| {
                // on génère le délai
                let delay = rng.gen_range(100..2000);
                // on créé la future
                let fut = delivery(delay, (x.to_string(), y.to_string()));
                // on box la future
                boxed(fut)
            })
            .rev()
            .collect();

        Self {
            data,
            current: None,
        }
    }

    /// Retourne une instance de `NextFuture`
    fn next(&mut self) -> NextFuture<'_> {
        NextFuture { x: self }
    }
}

impl Future for X {
    type Output = Option<String>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // on définit la future à poller s'il n'y en a pas
        if self.current.is_none() {
            // en popant le dernier élément de notre
            // tableau de futures
            self.current = self.data.pop();
        }
        
        // on vérifie s'il a toujours quelque chose à poller
        match &mut self.current {
            // si non on indique que c'était le dernier élément
            None => Poll::Ready(None),
            // si oui, on poll la futur
            Some(ref mut current_future) => match Pin::new(current_future).poll(cx) {
                // si la futur est complétée
                Poll::Ready(data) => {
                    // on réinitialise la future à poller
                    self.current = None;
                    // on renvoie les données
                    Poll::Ready(Some(data.0))
                }
                // sinon
                Poll::Pending => {
                    // on réveille le runtime 
                    cx.waker().wake_by_ref();
                    // on lui dit que ce n'est pas encore prêt
                    Poll::Pending
                }
            },
        }
    }
}

fn main() {
    let data = vec![
        ("prefix_1_data1", "jeu"),
        ("prefix_1_data2", "de"),
        ("prefix_1_data3", "données 1"),
        ("prefix_2_data1", "le"),
        ("prefix_2_data2", "ciel"),
        ("prefix_2_data3", "est"),
        ("prefix_2_data4", "gris"),
        ("prefix_3_data1", "L'asynchrone"),
        ("prefix_3_data2", "ce n'est"),
        ("prefix_3_data3", "pas si"),
        ("prefix_3_data4", "compliqué"),
    ];

    smol::block_on(async move {
        let mut x = X::new(data);

        while let Some(key) = x.next().await {
            println!("la clef vaut {key}");
            println!()
        }
    })
}

Il s'est passé 185.0034ms
la clef vaut prefix_1_data1

Il s'est passé 1.7600028s
la clef vaut prefix_1_data2

Il s'est passé 1.2960022s
la clef vaut prefix_1_data3

Il s'est passé 1.2860019s
la clef vaut prefix_2_data1

Il s'est passé 331.0027ms
la clef vaut prefix_2_data2

Il s'est passé 406.0027ms
la clef vaut prefix_2_data3

Il s'est passé 1.7650035s
la clef vaut prefix_2_data4

Il s'est passé 688.0042ms
la clef vaut prefix_3_data1

Il s'est passé 1.5060066s
la clef vaut prefix_3_data2

Il s'est passé 1.8030035s
la clef vaut prefix_3_data3

Il s'est passé 124.0034ms
la clef vaut prefix_3_data4

Yeah ! Du délai ! 🤩

On simule du réseau pas stable ou de la DB lente ^^

Découpler le flux d'entré de la sortie

Bon c'est cool mais ce n'est pas ce qu'on désire.

Pour rappel on en entré

let data = vec![
    ("prefix_1_data1", "jeu"),
    ("prefix_1_data2", "de"),
    ("prefix_1_data3", "données 1"),
    ("prefix_2_data1", "le"),
    ("prefix_2_data2", "ciel"),
    ("prefix_2_data3", "est"),
    ("prefix_2_data4", "gris"),
    ("prefix_3_data1", "L'asynchrone"),
    ("prefix_3_data2", "ce n'est"),
    ("prefix_3_data3", "pas si"),
    ("prefix_3_data4", "compliqué"),
];

Et voici le résultat que l'on désire

let expected = [
    "jeu de données 1", 
    "le ciel est gris", 
    "L'asynchrone ce n'est pas si compliqué"
];

Or pour le moment, nous avons:

let result = vec![
    "prefix_1_data1",
    "prefix_1_data2",
    "prefix_1_data3",
    "prefix_2_data1",
    "prefix_2_data2",
    "prefix_2_data3",
    "prefix_2_data4",
    "prefix_3_data1",
    "prefix_3_data2",
    "prefix_3_data3",
    "prefix_3_data4",
];

Nous allons donc retarder la sortie de la données et accumuler assez de résultat.

Pour accumuler nous allons utiliser le fait que les clefs sont toutes sous le modèle

prefix_<id>_data<n>

Accumuler sur ce prefix_<id> et lorsque l'on passe au suivant relâcher la concaténation de ce qu'on a accumulé.

Pour le coup c'est vraiment de l'algoritmique, le seul point de vigilence est de bien renvoyer le dernier morceau d'accumulateur

use async_io::Timer;
use rand::Rng;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

// déclaration des types
type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
type FutureInput = (String, String);

// future avec délai
async fn delivery(delay: u64, value: FutureInput) -> FutureInput {
    let now = Instant::now();
    Timer::after(Duration::from_millis(delay)).await;
    println!("La clef {} est arrivée en  {:?}", value.0, now.elapsed());
    value
}

// boxing de future
fn boxed<'a, U, T: Future<Output = U>>(value: T) -> Pin<Box<dyn Future<Output = U> + Send + 'a>>
where
    T: Sized + Send + 'a,
{
    Box::pin(value)
}

/// Représente le retour de `X::next()`
struct NextFuture<'a> {
    x: &'a mut X,
}

impl<'a> Future for NextFuture<'a> {
    type Output = Option<String>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        Pin::new(&mut self.x).poll(cx)
    }
}

/// Itérateur asynchrone
struct X {
    /// Futures à itérer
    data: Vec<BoxFuture<FutureInput>>,
    /// Futur actuellement itérée
    current_futur: Option<BoxFuture<FutureInput>>,
    /// Prefix courant
    prefix: Option<String>,
    /// Accumulateur
    accumulator: Vec<String>,
}

impl X {
    fn new(data: Vec<(&str, &str)>) -> Self {
        let mut rng = rand::thread_rng();
        let data = data
            .into_iter()
            .map(|(x, y)| {
                // on génère le délai
                let delay = rng.gen_range(100..2000);
                // on créé la future
                let fut = delivery(delay, (x.to_string(), y.to_string()));
                // on box la future
                boxed(fut)
            })
            .rev()
            .collect();

        Self {
            data,
            current_futur: None,
            prefix: None,
            accumulator: vec![],
        }
    }

    /// Retourne une instance de `NextFuture`
    fn next(&mut self) -> NextFuture<'_> {
        NextFuture { x: self }
    }
}

impl Future for X {
    type Output = Option<String>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // on définit la future à poller s'il n'y en a pas
        if self.current_futur.is_none() {
            // en popant le dernier élément de notre
            // tableau de futures
            self.current_futur = self.data.pop();
        }

        // on vérifie s'il a toujours quelque chose à poller
        match &mut self.current_futur {
            // si non on indique que c'était le dernier élément
            None => {
                // on vérifie si l'accumulateur n'est pas vide
                if !self.accumulator.is_empty() {
                    // on draine l'accumulateur
                    let data = self.accumulator.drain(..).collect::<Vec<String>>();
                    // on renvoie les données
                    return Poll::Ready(Some(data.join(" ")));
                }
                // au prochain poll l'itération sera terminée
                Poll::Ready(None)
            }
            // si oui, on poll la futur
            Some(ref mut current_future) => match Pin::new(current_future).poll(cx) {
                // si la futur est complétée
                Poll::Ready((key, value)) => {
                    // on split sur la clef
                    let key = key.split('_').skip(1).take(1).collect::<String>();

                    // on vérifie si un préfix existe déjà
                    if self.prefix.is_none() {
                        // sinon on le créé
                        self.prefix = Some(key.clone())
                    }

                    // on défini un tableau pour un potentiel renvoie de données
                    let mut ready_data = vec![];

                    if let Some(ref actual_prefix) = self.prefix {
                        // Le prefix a changé nous pouvons relâcher une valeur
                        if &key != actual_prefix {
                            // on vide l'accumulateur
                            ready_data = self.accumulator.drain(..).collect();
                            // on initialise au prochain prefix
                            self.prefix = Some(key);
                        }
                    }
                    // on accumule le résultat
                    self.accumulator.push(value);

                    // on réinitialise la future à poller
                    self.current_futur = None;

                    // on vérifie si des données sont prête à être release
                    if ready_data.is_empty() {
                        // on réveille le runtime
                        cx.waker().wake_by_ref();
                        // on lui dit que ce n'est pas encore prêt
                        Poll::Pending
                    } else {
                        // on fusionne le tableau
                        let data = ready_data.join(" ");
                        // on renvoie les données
                        Poll::Ready(Some(data))
                    }
                }
                // sinon
                Poll::Pending => {
                    // on réveille le runtime
                    cx.waker().wake_by_ref();
                    // on lui dit que ce n'est pas encore prêt
                    Poll::Pending
                }
            },
        }
    }
}

fn main() {
    let data = vec![
        ("prefix_1_data1", "jeu"),
        ("prefix_1_data2", "de"),
        ("prefix_1_data3", "données 1"),
        ("prefix_2_data1", "le"),
        ("prefix_2_data2", "ciel"),
        ("prefix_2_data3", "est"),
        ("prefix_2_data4", "gris"),
        ("prefix_3_data1", "L'asynchrone"),
        ("prefix_3_data2", "ce n'est"),
        ("prefix_3_data3", "pas si"),
        ("prefix_3_data4", "compliqué"),
    ];

    smol::block_on(async move {
        let mut x = X::new(data);
        let mut now = Instant::now();
        while let Some(message) = x.next().await {
            println!("le message \"{message}\" est arrivée en {:?}", now.elapsed());
            now = Instant::now();
            println!()
        }
    })
}

Ce qui nous donne

La clef prefix_1_data1 est arrivée en  1.1680078s
La clef prefix_1_data2 est arrivée en  1.6620032s
La clef prefix_1_data3 est arrivée en  152.0027ms
La clef prefix_2_data1 est arrivée en  1.3760026s
le message "jeu de données 1" est arrivée en 4.3585695s

La clef prefix_2_data2 est arrivée en  1.3740032s
La clef prefix_2_data3 est arrivée en  1.4890043s
La clef prefix_2_data4 est arrivée en  1.9170056s
La clef prefix_3_data1 est arrivée en  1.9830028s
le message "le ciel est gris" est arrivée en 6.7637874s

La clef prefix_3_data2 est arrivée en  310.0058ms
La clef prefix_3_data3 est arrivée en  587.0023ms
La clef prefix_3_data4 est arrivée en  527.0066ms
le message "L'asynchrone ce n'est pas si compliqué" est arrivée en 1.4246736s

Et là c'est vraiment le Champagne 🍾 🤩

Nous venons de créer un système qui est capable de retarder la création d'un flux avec des données qui elle même sont retardée de manière aléatoire.

Bref en un mot comme en 100, nous avons un Itérateur Asynchrone !!!

S'il reste des futures à poller cela donne quelque chose comme cela.

  sequenceDiagram

participant Future

X ->> NextFuture : next()

activate NextFuture
    Note left of X: création du NextFuture

    Runtime ->> NextFuture : poll()
    Note right of Runtime: on poll NextFuture

    NextFuture ->> X : poll() 

    Note left of Future: On poll la première future
    activate Future
        X ->> Future : poll()

        Future -->> X : Poll::Pending
        Note left of X: La future n'est pas prête

        X -->> NextFuture : Poll::Pending 
        NextFuture -->> Runtime : Poll::Pending 

        Runtime ->> NextFuture : poll()
        Note right of Runtime: on poll NextFuture
        NextFuture ->> X : poll()

        X ->> Future : poll()
        Future -->> X : Poll::Ready(data)
        Note left of X: La future est prête

        X -->> NextFuture : Poll::Pending 
    deactivate Future

    Note left of X: Mais X n'a pas assez accumulé
    NextFuture -->> Runtime : Poll::Pending 

    Runtime ->> NextFuture : poll()
    Note right of Runtime: on poll NextFuture
    NextFuture ->> X : poll()

    Note left of Future: On passe à la future suivante
    activate Future
    X ->> Future : poll()
    Future -->> X : Poll::Ready(data)

    X -->> NextFuture : Poll::Ready(Some(data)) 
    deactivate Future
    Note left of X: X a assez accumulé
    NextFuture -->> Runtime : Poll::Ready(Some(data)) 
    Note left of Runtime: on relâche une donnée

deactivate NextFuture

Lorsque l'on arrive au bout de l'itération cela donne

  sequenceDiagram


X ->> NextFuture : next()

activate NextFuture
    Note left of X: création du NextFuture

    Runtime ->> NextFuture : poll()
    Note right of Runtime: on poll NextFuture

    NextFuture ->> X : poll()    

    X -->> NextFuture : Poll::Ready(Some(data))
    NextFuture -->> Runtime : Poll::Ready(Some(data)) 
    Note left of X: Avec data

deactivate NextFuture


X ->> NextFuture : next()
activate NextFuture

    Note left of X: nouvelle itération
    Runtime ->> NextFuture : poll()
    Note right of Runtime: on poll NextFuture

    NextFuture ->> X : poll()    

    X -->> NextFuture : Poll::Ready(None)
    NextFuture -->> Runtime : Poll::Ready(None) 
    Note left of X: Plus de data

deactivate NextFuture

le trait Stream et l'écosystème asynchrone

Bon, on a bien rigolé à réinventer la roue, mais vous vous douter que tout ça existe déjà et n'est pas à réplémenter à chaque fois.

La première chose à comprendre c'est que même si la librairie ne fournie pas les outils, les choses sont déjà plus ou moins normailisées et n'attende que d'être officialisée.

Prenez par exemple la crate future_util, véritable couteau suisse des Futures.

La méthode boxed que nous avons implémenté existe déjà.

fn boxed<'a, U, T: Future<Output = U>>(value: T) -> Pin<Box<dyn Future<Output = U> + Send + 'a>>
where
    T: Sized + Send + 'a,
{
    Box::pin(value)
}

Seulement elle est directement rattaché au trait FutureExt.

Trait qui est automatiquement implémenté pour toute future.

impl<F: Future + ?Sized> FutureExt for F {}

Ainsi on peut faire ceci

cargo add futures-lite

On utilise le type Boxed de futures-utils

pub type Boxed<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;

dans notre X

// Attention a importer le trait, sinon .boxed() n'existe pas !
use futures_lite::FutureExt;

struct X {
    /// Futures à itérer
    data: Vec<Boxed<FutureInput>>,
    /// Futur actuellement itérée
    current_futur: Option<Boxed<FutureInput>>,
    /// Prefix courant
    prefix: Option<String>,
    /// Accumulateur
    accumulator: Vec<String>,
}

impl X {
    fn new(data: Vec<(&str, &str)>) -> Self {
        let mut rng = rand::thread_rng();
        let data = data
            .into_iter()
            .map(|(x, y)| {
                // on génère le délai
                let delay = rng.gen_range(100..2000);
                // on créé la future
                let fut = delivery(delay, (x.to_string(), y.to_string()));
                // on box la future avec le trait FutureExt::boxed
                fut.boxed()
            })
            .rev()
            .collect();

        Self {
            data,
            current_futur: None,
            prefix: None,
            accumulator: vec![],
        }
    }
}

Et du coup, nous pouvons nous séparer de notre fonction boxed maison, et de notre BoxFuture.

type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;

Bon et maintenant si je vous disais que le NextFuture existe déjà ici.

Utilisons-le à la place

use futures_lite::stream::NextFuture;

impl X {
    /// Retourne une instance de `NextFuture`
    fn next(&mut self) -> NextFuture<'_, Self> {
        NextFuture { stream: self }
    }
}

Oui mais on ne peut pas le créer, le champ est privé et il n'y a pas de constructeur.

error[E0451]: field `stream` of struct `NextFuture` is private
   |
   |         NextFuture { stream: self }
   |                      ^^^^^^^^^^^^ private field

Mais ça c'est parce que encore une fois, on ne travail pas comme il faut.

La vraie manière est d'implémenter le trait Stream.

Elle demande de définir une méthode poll_next qui a l'exacte signature par repport à poll.

impl Future for X {
    type Output = Option<String>;
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {}
}

impl Stream for X {
    type Item = String;
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {}
}

Allons-y.

impl Stream for X {
    type Item = String;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        // on définit la future à poller s'il n'y en a pas
        if self.current_futur.is_none() {
            // en popant le dernier élément de notre
            // tableau de futures
            self.current_futur = self.data.pop();
        }

        // on vérifie s'il a toujours quelque chose à poller
        match &mut self.current_futur {
            // si non on indique que c'était le dernier élément
            None => {
                // on vérifie si l'accumulateur n'est pas vide
                if !self.accumulator.is_empty() {
                    // on draine l'accumulateur
                    let data = self.accumulator.drain(..).collect::<Vec<String>>();
                    // on renvoie les données
                    return Poll::Ready(Some(data.join(" ")));
                }
                // au prochain poll l'itération sera terminée
                Poll::Ready(None)
            }
            // si oui, on poll la futur
            Some(ref mut current_future) => match Pin::new(current_future).poll(cx) {
                // si la futur est complétée
                Poll::Ready((key, value)) => {
                    // on split sur la clef
                    let key = key.split('_').skip(1).take(1).collect::<String>();

                    // on vérifie si un préfix existe déjà
                    if self.prefix.is_none() {
                        // sinon on le créé
                        self.prefix = Some(key.clone())
                    }

                    // on défini un tableau pour un potentiel renvoie de données
                    let mut ready_data = vec![];

                    if let Some(ref actual_prefix) = self.prefix {
                        // Le prefix a changé nous pouvons relâcher une valeur
                        if &key != actual_prefix {
                            // on vide l'accumulateur
                            ready_data = self.accumulator.drain(..).collect();
                            // on initialise au prochain prefix
                            self.prefix = Some(key);
                        }
                    }
                    // on accumule le résultat
                    self.accumulator.push(value);

                    // on réinitialise la future à poller
                    self.current_futur = None;

                    // on vérifie si des données sont prête à être release
                    if ready_data.is_empty() {
                        // on réveille le runtime
                        cx.waker().wake_by_ref();
                        // on lui dit que ce n'est pas encore prêt
                        Poll::Pending
                    } else {
                        // on fusionne le tableau
                        let data = ready_data.join(" ");
                        // on renvoie les données
                        Poll::Ready(Some(data))
                    }
                }
                // sinon
                Poll::Pending => {
                    // on réveille le runtime
                    cx.waker().wake_by_ref();
                    // on lui dit que ce n'est pas encore prêt
                    Poll::Pending
                }
            },
        }
    }
}

On implémente poll_next et pas poll car le NextFuture de la lib appelle poll_next.

Ce qui nous donne finalement ce code

use async_io::Timer;
use futures_lite::future::Boxed;
use futures_lite::stream::StreamExt;
use futures_lite::{FutureExt, Stream};
use rand::Rng;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

// déclaration des types
type FutureInput = (String, String);

// future avec délai
async fn delivery(delay: u64, value: FutureInput) -> FutureInput {
    let now = Instant::now();
    Timer::after(Duration::from_millis(delay)).await;
    println!("La clef {} est arrivée en  {:?}", value.0, now.elapsed());
    value
}

/// Itérateur asynchrone
struct X {
    /// Futures à itérer
    data: Vec<Boxed<FutureInput>>,
    /// Futur actuellement itérée
    current_futur: Option<Boxed<FutureInput>>,
    /// Prefix courant
    prefix: Option<String>,
    /// Accumulateur
    accumulator: Vec<String>,
}

impl X {
    fn new(data: Vec<(&str, &str)>) -> Self {
        let mut rng = rand::thread_rng();
        let data = data
            .into_iter()
            .map(|(x, y)| {
                // on génère le délai
                let delay = rng.gen_range(100..2000);
                // on créé la future
                let fut = delivery(delay, (x.to_string(), y.to_string()));
                // on box la future
                fut.boxed()
            })
            .rev()
            .collect();

        Self {
            data,
            current_futur: None,
            prefix: None,
            accumulator: vec![],
        }
    }
}

// on implémente Stream
impl Stream for X {
    // pas besoin de l'option elle est dans le type de retour de poll_next
    type Item = String;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        // on définit la future à poller s'il n'y en a pas
        if self.current_futur.is_none() {
            // en popant le dernier élément de notre
            // tableau de futures
            self.current_futur = self.data.pop();
        }

        // on vérifie s'il a toujours quelque chose à poller
        match &mut self.current_futur {
            // si non on indique que c'était le dernier élément
            None => {
                // on vérifie si l'accumulateur n'est pas vide
                if !self.accumulator.is_empty() {
                    // on draine l'accumulateur
                    let data = self.accumulator.drain(..).collect::<Vec<String>>();
                    // on renvoie les données
                    return Poll::Ready(Some(data.join(" ")));
                }
                // au prochain poll l'itération sera terminée
                Poll::Ready(None)
            }
            // si oui, on poll la futur
            Some(ref mut current_future) => match Pin::new(current_future).poll(cx) {
                // si la futur est complétée
                Poll::Ready((key, value)) => {
                    // on split sur la clef
                    let key = key.split('_').skip(1).take(1).collect::<String>();

                    // on vérifie si un préfix existe déjà
                    if self.prefix.is_none() {
                        // sinon on le créé
                        self.prefix = Some(key.clone())
                    }

                    // on défini un tableau pour un potentiel renvoie de données
                    let mut ready_data = vec![];

                    if let Some(ref actual_prefix) = self.prefix {
                        // Le prefix a changé nous pouvons relâcher une valeur
                        if &key != actual_prefix {
                            // on vide l'accumulateur
                            ready_data = self.accumulator.drain(..).collect();
                            // on initialise au prochain prefix
                            self.prefix = Some(key);
                        }
                    }
                    // on accumule le résultat
                    self.accumulator.push(value);

                    // on réinitialise la future à poller
                    self.current_futur = None;

                    // on vérifie si des données sont prête à être release
                    if ready_data.is_empty() {
                        // on réveille le runtime
                        cx.waker().wake_by_ref();
                        // on lui dit que ce n'est pas encore prêt
                        Poll::Pending
                    } else {
                        // on fusionne le tableau
                        let data = ready_data.join(" ");
                        // on renvoie les données
                        Poll::Ready(Some(data))
                    }
                }
                // sinon
                Poll::Pending => {
                    // on réveille le runtime
                    cx.waker().wake_by_ref();
                    // on lui dit que ce n'est pas encore prêt
                    Poll::Pending
                }
            },
        }
    }
}

fn main() {
    let data = vec![
        ("prefix_1_data1", "jeu"),
        ("prefix_1_data2", "de"),
        ("prefix_1_data3", "données 1"),
        ("prefix_2_data1", "le"),
        ("prefix_2_data2", "ciel"),
        ("prefix_2_data3", "est"),
        ("prefix_2_data4", "gris"),
        ("prefix_3_data1", "L'asynchrone"),
        ("prefix_3_data2", "ce n'est"),
        ("prefix_3_data3", "pas si"),
        ("prefix_3_data4", "compliqué"),
    ];

    smol::block_on(async move {
        let mut x = X::new(data);
        let mut now = Instant::now();
        while let Some(message) = x.next().await {
            println!("le message \"{message}\" est arrivée en {:?}", now.elapsed());
            now = Instant::now();
            println!()
        }
    })
}

Qui a pour affichage

La clef prefix_1_data1 est arrivée en  308.0041ms
La clef prefix_1_data2 est arrivée en  1.1420077s
La clef prefix_1_data3 est arrivée en  1.251002s
La clef prefix_2_data1 est arrivée en  1.1130028s
le message "jeu de données 1" est arrivée en 3.8145345s

La clef prefix_2_data2 est arrivée en  354.0151ms
La clef prefix_2_data3 est arrivée en  669.003ms
La clef prefix_2_data4 est arrivée en  439.0069ms
La clef prefix_3_data1 est arrivée en  682.0036ms
le message "le ciel est gris" est arrivée en 2.1447636s

La clef prefix_3_data2 est arrivée en  239.0026ms
La clef prefix_3_data3 est arrivée en  1.1480031s
La clef prefix_3_data4 est arrivée en  1.6000024s
le message "L'asynchrone ce n'est pas si compliqué" est arrivée en 2.9876462s

On a rien cassé 😁

API Haut niveau

Continuons à ne pas écrire du code, c'est assez reposant.

Nous allons nous créer un stream de nos futures.

unfold

Pour cela nous utilisons une fonction appelée unfold.

Celle-ci a pour signature

unfold(state, |mut state| async move {
    // on fait des trucs

    // on renvoit soit le prochain état avec la valeur généré
    Some(data_courrante, next_state)
    // soit on finit l'itération
})

Nous, nous allons mettre ce que nous avion dans le

use futures_lite::stream::{unfold, StreamExt};
fn main()
    // on utilise VecDeque pour pouvoir récupérer le premier élément 
    let data = VecDeque::from([...]);

    let rng = rand::thread_rng();

    // on donne le data et le rng comme état
    let stream = unfold((data, rng), |(mut data, mut rng)| async move {
        // on récupère le premier élément
        data.pop_front()
            .map(|(key, value)| {
                let delay = rng.gen_range(100..2000);
                // on créé la future
                let fut = delivery(delay, (key.to_string(), value.to_string()));
                // on box la future
                let boxed_fut = fut.boxed();
                (boxed_fut, (data, rng))
            })
            // ou on renvoit none
            .or(None)
    });
}

Pour l'utiliser, cela donne

use std::pin::{pin, Pin};

fn main() {
    // créé un Box::pin de notre stream
    let mut stream = pin!(stream);

    smol::block_on(async move {
        while let Some(future) = stream.next().await {
            let result = future.await;
            println!("{:?}", result)
        }
    })
}

Cool !

La clef prefix_1_data1 est arrivé en  1.9214232s
("prefix_1_data1", "jeu")
La clef prefix_1_data2 est arrivé en  251.7301ms
("prefix_1_data2", "de")
La clef prefix_1_data3 est arrivé en  1.9595308s
("prefix_1_data3", "données 1")
La clef prefix_2_data1 est arrivé en  1.9699186s
("prefix_2_data1", "le")
La clef prefix_2_data2 est arrivé en  1.6447015s
("prefix_2_data2", "ciel")
La clef prefix_2_data3 est arrivé en  202.1345ms
("prefix_2_data3", "est")
La clef prefix_2_data4 est arrivé en  897.8317ms
("prefix_2_data4", "gris")
La clef prefix_3_data1 est arrivé en  616.9607ms
("prefix_3_data1", "L'asynchrone")
La clef prefix_3_data2 est arrivé en  1.5978526s
("prefix_3_data2", "ce n'est")
La clef prefix_3_data3 est arrivé en  1.1858525s
("prefix_3_data3", "pas si")
La clef prefix_3_data4 est arrivé en  1.4951382s
("prefix_3_data4", "compliqué")

Mais ce n'est pas ce qu'on veut, du coup on continue dans l'exploration de l'ecosystème

async-stream

Pour finir, je veux vous montrer une lib très pratique async-stream

cargo add async-stream

Elle fourni une macro stream! qui permet de faire ce que l'on désire.

Le fonctionnement est simple.

Tout ce qu'on yield est rajouté dans le stream.

async fn f() { 
    let stream = async_stream::stream! {
        for i in 0..3 {
            yield i;
        }
    };

    let mut stream = pin!(stream); // needed for iteration

    while let Some(value) = stream.next().await {
        println!("got {}", value); 
    }
}

Affiche

0
1
2

Comment cela marche ?

Et bien en fait, si on démonte la macro, ça donne ceci

// on créé un tuyau
let (rx, tx) = channel();
// une partie est dans `AsyncStream`
let stream = AsyncStream::new(rx, async move {
    for i in 0..3 {
            // L'autre sert à envoyer des données
            tx.send(Some(i)).await;
    }
    tx.send(None)
});

A chaque fois que l'on yield on envoie dans un tuyau qui débouche sur AsyncStream.

Et lui à l'autre bout dans son poll_next.

Il attend alors rx pour la prochaine valeur, jusqu'au None.

Ainsi nous pouvons l'utiliser pour ce que l'on désire

use async_io::Timer;
use futures_lite::stream::{unfold, StreamExt};
use futures_lite::FutureExt;
use rand::Rng;
use std::collections::VecDeque;
use std::pin::pin;
use std::time::{Duration, Instant};

// déclaration des types
type FutureInput = (String, String);

// future avec délai
async fn delivery(delay: u64, value: FutureInput) -> FutureInput {
    let now = Instant::now();
    Timer::after(Duration::from_millis(delay)).await;
    println!("La clef {} est arrivé en  {:?}", value.0, now.elapsed());
    value
}

fn main() {
    let data = VecDeque::from([
        ("prefix_1_data1", "jeu"),
        ("prefix_1_data2", "de"),
        ("prefix_1_data3", "données 1"),
        ("prefix_2_data1", "le"),
        ("prefix_2_data2", "ciel"),
        ("prefix_2_data3", "est"),
        ("prefix_2_data4", "gris"),
        ("prefix_3_data1", "L'asynchrone"),
        ("prefix_3_data2", "ce n'est"),
        ("prefix_3_data3", "pas si"),
        ("prefix_3_data4", "compliqué"),
    ]);

    let rng = rand::thread_rng();

    let stream = unfold((data, rng), |(mut data, mut rng)| async move {
        data.pop_front()
            .map(|(key, value)| {
                let delay = rng.gen_range(100..2000);
                // on créé la future
                let fut = delivery(delay, (key.to_string(), value.to_string()));
                // on box la future
                let boxed_fut = fut.boxed();
                //on retourne la valeur suivante et notre futur dans sa boîte
                (boxed_fut, (data, rng))
            })
            .or(None)
    });

    let mut stream = pin!(stream);

    let stream = async_stream::stream! {
        let mut accumulator = vec![];
        let mut prefix = None;
        while let Some(message_futur) = stream.next().await {
            let (key, value) = message_futur.await;
            // on split sur la clef
            let key = key.split('_').skip(1).take(1).collect::<String>();

            // on vérifie si un préfix existe déjà
            if prefix.is_none() {
                // sinon on le créé
                prefix = Some(key.clone())
            }

            // on défini un tableau pour un potentiel renvoie de données
            let mut ready_data = vec![];

            if let Some(ref actual_prefix) = prefix {
                // Le prefix a changé nous pouvons relâcher une valeur
                if &key != actual_prefix {
                    // on vide l'accumulateur
                    ready_data = accumulator.drain(..).collect();
                    // on initialise au prochain prefix
                    prefix = Some(key);
                }
            }
            // on accumule le résultat
            accumulator.push(value);

            if !ready_data.is_empty() {
                yield ready_data.join(" ")
            }
        }

        // on oublie pas la dernière donnée
        if !accumulator.is_empty() {
            yield accumulator.join(" ")
        }
    };

    let mut stream = pin!(stream);

    smol::block_on(async move {
        let mut now = Instant::now();
        while let Some(message) = stream.next().await {
            println!(
                "le message \"{message}\" est arrivée en {:?}",
                now.elapsed()
            );
            now = Instant::now();
            println!()
        }
    })
}

Adieu les implémentations, vous faites du Rust asynchrone !

Conclusion

J'en ai pas vraiment, article beaucoup trop long, mais ça on a l'habitude ^^

J'espère qu'il vous a plu.

A la prochaine ❤️

avatar

Auteur: Akanoa

Je découvre, j'apprends, je comprends et j'explique ce que j'ai compris dans ce blog.

Ce travail est sous licence CC BY-NC-SA 4.0.