Les Streams
Bonjour à toutes et à tous 😀
La boucle for
Lorsque j'ai découvert les boucles en Rust, je suis tombé sur cet étrange bout de code.
for i in 0..4
Lorsque l'exécute, on obtient
0
1
2
3
Le moi de l'époque n'a pensé qu'une chose : "Mais c'est quoi cette magie noire!! 😱"
Mais nous sommes en Rust et en Rust, presque tout est explicite. 🦀
Il suffit, de dérouler le fil. 🧶
Voici le vrai code qui se déroule:
let x = ???;
loop
Nous allons détailler ce que x
représente dans un instant.
Pour l'instant il faut comprendre que next
retourne un Option<i32>
.
Tant que l'on a pas atteint i = 3
, on obtient un Some(i)
. Puis un None
.
Ce que l'on peut redéfinir en
x.next // Some(0)
x.next // Some(1)
x.next // Some(2)
x.next // Some(3)
x.next // None
Bon maintenant que nous avons le cahier des charges du comportement voulu par notre x
, créons-le.
Nous savons qu'il se doit de poosséder un état interne.
Il possède également une méthode next() -> Option<i32>
.
Implémentons-la
A chaque fois que l'on appelle next()
, on vérifie le state
,
- si l'on est dans l'intervalle $[0; 3]$, on incrément le
state
et on renvoie unSome(i)
- si celui excède $3$, on renvoie
None
On peut alors utiliser notre super code
let mut x = X ;
loop
Qui nous affiche comme prévu
0
1
2
3
Victory ! 😎
Le trait Iterator
Du coup faisons une petite expérience 🧪
let mut x = X ;
for i in x
Nop ! 💥
error[E0277]: `X` is not an iterator
|
| for i in x {
| ^ `X` is not an iterator
|
= help: the trait `Iterator` is not implemented for `X`
Quand je vous disais que ça se passait en réalité comme ça je vous ai à demi menti.
Afin de normaliser les choses, Rust s'appuie de manière massive sur les traits
.
Ici le compilateur nous signale que X
n'est pas un itérateur. parce qu'il n'implémente pas le trait Iterator
.
Mais du coup cela signifie que la boucle for
peut se résumer à:
for i in #quelque chose qui implémente Iterator#
Implémentons Iterator
pour X
.
Voici la coquille vide de son implémentation
Ici deux choses nous intéresse:
La ligne
type Item =
on appelle ça un associated type en Rust, cela permet de définir un type présent dans toute l'implémentation du trait.
On le retrouve d'ailleurs dans
La deuxième chose intéressante c'est que
Ressemble très fortement à notre
Par conséquent on peut remplir par mimétisme le
type Item = i32
Et implémenter le reste:
Ce qui nous permet de l'utiliser dans notre boucle for
let x = X ;
for i in x
Cool 😎
Mais que se passe-t-il si on tente de redemander un tour de boucle ?
let x = X ;
for i in x
for i in x
Réponse une erreur, enfin plusieurs:
error[E0382]: use of moved value: `x`
--> src\main.rs:39:14
|
| let x = X { state: 0 };
| - move occurs because `x` has type `X`, which does not implement the `Copy` trait
| for i in x {
| - `x` moved due to this implicit call to `.into_iter()`
...
| for i in x {
| ^ value used here after move
|
note: `into_iter` takes ownership of the receiver `self`, which moves `x`
|
| fn into_iter(self) -> Self::IntoIter;
La première est que x
a été déplacé, et ce déplacement a été provoqué par for
qui implicitement (ouais c'est moche pour le coup 😫) écrit ce code
for i in x.into_iter
Sauf que into_iter
consomme ce que l'on lui donne à manger.
Donc cela signifie qu'avant même que la boucle for
ne débute ses appels à x.next()
. x
n'existe déjà plus ... 😭
Ainsi quand on tente de réappliquer le x.into_iter()
, nous n'avons plus de x
pour bosser!
Ce comportement bien que perturbant est très intéressant car il empêche d'utiliser un itérateur qui a déjà été utilisé et donc consommé.
La deuxième erreur c'est que l'on ne possède pas le trait Copy
sur X
, donc comme le compilo peut rien faire il déplace x
, mais s'il est Copy
alors il pourra faire quelque chose.
Pour implémenter Copy
, nous devons implémenter Clone
, nous passons par les derives pour nous simplifier le travail.
Ainsi, on peut maintenant réutiliser x
autant de fois que l'on désire.
let x = X ;
for i in x
for i in x
Comme le trait Copy
est appelé pour chacune des boucles comme ceci:
let x = X ;
let x_copy1 = x; // x_copy1 = X { state: 0 } -- copie de x en x_copy1
let iterator1 = x_copy1.into_iter; // x_copy1 est consommé par into_iter mais pas x
for i in iterator1
let x_copy2 = x; // x_copy2 = X { state: 0 } -- copie de x en x_copy2
let iterator2 = x_copy2.into_iter; // x_copy2 est consommé par into_iter mais pas x
for i in iterator2
Le x
n'est jamais modifié puisque copié en l'état et donc state = 0
pour chaque copies en début de boucle.
Ce qui donne:
0
1
2
3
0
1
2
3
Le trait IntoIterator
Il s'est passé un truc étrange tout à l'heure, le compilo a rajouté un x.into_iter()
et ça n'a pas bronché alors que ni X
, ni le trait Iterator
ne définit de méthode into_iter
.
Ceci c'est encore un coup des traits !!
En fait tout Iterator
implémente automatiquement IntoIterator
au travers de
Ceci est fourni dans le langage directement.
Du coup ici
x.into_iterator() <=> x
Cela ne change rien à notre vie mais c'est intéressant pour autre chose. ^^
Il existe également une implémentation pour Vec<T>
, et là on commence à causer sérieusement. 😀
for i in vec!
Ainsi le code réel deviens alors:
let iterator = .into_iterator;
for i in iterator
Et on revient en terrain connu.
En parlant de terrain connu, on peut maintenant presque deviner le fonctionnement de
for i in 0..4
On peut supposer que en fait c'est plus
let iterator = .into_iterator;
for i in iterator
Plus qu'à comprendre le 0..4
.
En Rust, ceci se nomme une Range, la notation du dessus n'est qu'un sucre syntaxique.
Donc 0..4 <=> Range { start: 0, end: 4 }
.
Un rapide coup d'oeil dans la doc nous montre que Range
implémente Iterator
et donc par conséquent IntoIterator
.
La boucle est bouclée! 🤣
Je vous laisse implémenter X pour avoir une borne de début et de fin si vous voulez.
Le trait Future
Rappelez-vous lorsque vous aviez 10 ans et que vous attiendiez le livreur arriver avec votre nouveau jeu.
Vous alliez tout les 5 min à la fenètre pour vérifier si le camion n'était pas au coin de la rue.
Deux solutions:
- il n'est pas encore là et vous reviendrez dans 5 min vérifier pour la 20 ème fois de la journée ⌛
- le camion est là !! vous vous précipitez pour ouvrir la porte 😀
En Rust on peut modéliser ces deux état via une énumération qui possède deux variantes.
Here
: le camion est là ! 😀NotYet
: il n'est pas encore arrivé ⏳
Et pour simuler notre moi passé qui va à la fenêtre on faire quelque chose comme ceci.
On va là aussi détailler ce qu'est x
.
let x = ???;
loop
Ce qu'il faut comprendre c'est que l'on désire que x
possède une fonction check_delivery
qui doit renvoyer un DeliveryState
.
;
Du coup on peut se créer cette structure qui correspond au cahier des charges du dessus.
;
Ce qui donne
let x = X;
loop
Si on lance ce code, nous allons obtenir:
Truck not here yet
Truck not here yet
Truck not here yet
...
Truck not here yet
Truck not here yet
Truck not here yet
Indéfininiment, oui, normal, vu que l'on a pas d'état, cela ne risque pas de bouger.
Créons nous un état un peu stupide, mais suffisant pour la démonstration.
Lorsque le compteur atteint $0$, le paquet est livré.
On retente notre chance avec la nouvelle implémentation
let mut x = X ;
loop
Cette fois-ci c'est mieux 😁
Truck not here yet
Truck not here yet
Truck not here yet
The truck is here 42
Bien-sûr, on se doute que l'on ne va pas devoir réimplémenter cette logique à chaque fois.
Tout comme notre implémentation naïve d'itérateur avait la version trait Iterator
.
Il existe également un trait qui se nomme Future
, il est fourni par la bibliothèque standard.
En voici son squelette:
Donc ici pas de grosse surprise
type Output
Permet de définir ce que l'on veut renvoyer lorsque que ce sera le moment.
Le type de retour est
Voyons ce qu'est Poll
.
Il s'agit d'une énumération.
Je ne vous fait pas l'affront de vous expliquer en quoi c'est un peu beacoup ressemblant à DeliveryState
. 😛
Seul petite amélioration, on peut mettre ce que l'on veut dans le Ready
et pas seulement un i32
.
Les paramètres de fonctions c'est un peu plus compliqué.
;
Le self
est un peu bizarre, nous sommes d'habitude habitué à voir &mut self
et c'est tout.
Ici le self
est de type Pin
.
Je ne vais pas expliquer ici en détail, mais ce qu'il faut en comprendre c'est que l'on est assuré de pouvoir modifier correctement self
grâce à ce type.
Le cx
représente le contexte d'exécution, on revient à lui plus tard 😀
Bon implémentons notre trait Future
pour X
.
let mut x = X ;
loop
Enfin, essayons, parce que pour le moment ce n'est pas brillant...
error[E0599]: no method named `poll` found for struct `X` in the current scope
|
| struct X {
| -------- method `poll` not found for this struct
...
| if let Poll::Ready(i) = x.poll() {
| ^^^^ method not found in `X`
|
= help: items from traits can only be used if the trait is implemented and in scope
= note: the following trait defines an item `poll`, perhaps you need to implement it:
candidate #1: `Future`
Mais, mais ... On a un trait Future
pour X
donc x.poll()
devrait exister, non ?
Ah non! on a implémenté
;
Pas
;
Donc le vrai variable qui possède poll()
c'est:
new
Et non x
tout seul.
On change:
let mut x = X ;
loop
Mieux mais pas top ...
error[E0061]: this method takes 1 argument but 0 arguments were supplied
|
| if let Poll::Ready(i) = Pin::new(&mut x).poll() {
| ^^^^-- an argument of type `&mut Context<'_>` is missing
|
note: method defined here
--> C:\Users\Noa\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib/rustlib/src/rust\library\core\src\future\future.rs:105:8
|
| fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
| ^^^^
help: provide the argument
|
| if let Poll::Ready(i) = Pin::new(&mut x).poll(/* &mut Context<'_> */) {
|
Ah oui c'est vrai que poll
prend un deuxième paramètre de type Context
.
Plus qu'à le créer 🙂
Sauf qu'il ne possède pas de constructeur ce type, et il nécessite un champ waker
de type Waker
.
Bon, du coup créons un Waker
, sauf que rebelote pas de constructeur non plus. la structure demande un RaWaker
.
On continue à suivre les miettes de pains ...
Un constructeur mais RawWakerVTable
, c'est quoi ça encore 😫
De toute façon on a descendu dans les entrailles donc pourquoi pas continuer ... 🥱
Allez ! de l'unsafe
, bon on ira pas plus loin 🤣
Du coup, nous sommes bloqué ?
Mais non, il y a toujours une porte de sortie.
La structure Waker
possède une méthode noop
qui a pour signature
noop
Elle est juste caché derrière un flag unstable. On peut débrayer ce comportement en rajoutant un attribut à notre fichier.
Ok, donc on a un Waker
, plus qu'à créer notre contexte et on est bon non ? ^^
Et comme Context
possède le bon goût d'avoir une méthode from_waker(&Waker)
.
Nous pouvons terminer de rajouter ce qu'il nous manque.
let mut x = X ;
// création du waker
let waker = noop;
// création du contexte
let mut cx = from_waker;
loop
Victoire
Truck not here yet
Truck not here yet
Truck not here yet
The truck is here 42
Mais en demi teinte, c'est passablement plus compliqué à écrire, et je n'ai pas du tout apprécié mon aventure "à la recheche du Context" perdu ... 🙄
Les Runtimes
Tout semble compliqué parce que nous ne sommes pas sensé faire tout cela.
En réalité, nous sommes sensé utiliser un Runtime qui va se charger de tout le travail pénible à notre place.
En Rust, il existe plein de runtimes asynchrones différents, et aucun fourni par la bibliothèque standard.
On va donc devoir en récupérer un. J'ai choisi de le faire avec la crate smol.
Mais ça aurait pu aussi être du tokio ou de l'async-std.
Bref, installons smol.
La crate smol fourni une fonction block_on qui prend une Future<Output=T>
.
Et ça tombe bien, X
implémente Future<Output=i32>
.
Modifions notre code pour utiliser smol
Comme la boucle semble avoir disparu, modfions un peu X::poll
pour faire apparaître le print du "not yet".
Bon on essaie 🙂
Truck not here yet
Une ligne, pas une de plus, le thread est bloqué...
Mais ça c'est parce que notre hypothèse de travail est fausse.
Si on ouvre le smol::block_on
, on peut le découper grosso modo en ce ceci
(je schématise c'est plus compliqué que cela, mais ça sera suffisant pour l'explication)
La deuxième pièce du puzzle est la méthode waker_fn
, elle créé un Waker
qui possède une méthode wake_by_ref()
.
Donc si l'on fait
let waker = waker_fn;
waker.wake_by_ref;
Cela affiche dans la console
dring ⏰ c'est l'heure de se réveiller
Or il est possible au travers du Context
d'atteindre le Waker
et par conséquent le wake_by_ref
.
cx.waker.wake_by_ref;
Comme le Context
est passé à la fonction poll(&mut cx)
.
Alors cela signifie que nous avons accès dans l'implémentation de Future
de X
à ce wake_by_ref
Qui a la possibilité d'appeler le unparker.unpark()
let waker = waker_fn;
et ainsi débloquer le thread
loop
Si on graphe temporellement, cela donne quelque chose comme cela.
sequenceDiagram activate Runtime Runtime->>Future: poll activate Future Future -->>Runtime : Poll::Pending Runtime -->> Runtime: park Future --)Runtime : wake_by_ref deactivate Future Runtime -->> Runtime: unpark deactivate Runtime Note left of Runtime: Fin du premier poll activate Runtime Runtime->>Future: poll activate Future Future -->>Runtime : Poll::Pending Runtime -->> Runtime: park Future --)Runtime : wake_by_ref deactivate Future Runtime -->> Runtime: unpark deactivate Runtime Note left of Runtime: Fin du second poll activate Runtime Runtime->>Future: poll activate Future Future -->>Runtime : Poll::Ready deactivate Future deactivate Runtime Note left of Runtime: Retour
Le Runtime va poll un première fois la future, se park en attente d'un unpark.
La future va lui déclencher son unpark au travers de wake_by_ref
. Et le Runtime re-poll et ainsi de suite, jusqu'au Poll::Ready qui coupe la boucle.
Récapitualtif de notre code:
On lance et joie !
Truck not here yet
Truck not here yet
Truck not here yet
The truck is here 42
Tout marche !! 😎
Finalement, ce n'est pas si compliqué ou fastidieux ^^
Des futures de futures
Enfin presque, il nous manque le fait que l'on attend 1 seconde entre chaque poll.
On va donc simuler ça.
Attention
Cette implémentation est très mauvaise, c'est simplement pour se donner une idée.
Pour bien faire il faudrait utiliser un Reactor et des timers, mais ça sera pour un prochain article.
C'est une future, on peut donc l'utiliser dans notre Runtime et la poll.
Ce qui affiche
Il s'est passé 5.0000061s
Nous avons bien une future qui se résout au bout de 5 secondes.
Du coup, on peut la poll dans un polling !!
Attention
Je répète : NE FAITES PAS CA c'est pour l'exemple.
Vous allez tuer vos performances !!!
le wake interne au timer n'est pas le bon et plein d'autres choses ne vont pas.
Excepté ce petit avertissement ^^
C'est quand même ce qu'on désire ^^
Truck not here yet
Truck not here yet
Truck not here yet
Il s'est passé 3.0014149s
The truck is here 42
Les sucres syntaxiques async/.await
Maintenant que l'on a globalement fait n'importe quoi, essayons de rentrer dans le rang. 😁
Petite expérience avant.
Voici une future qui finit au premier poll
;
Elle s'utilise ainsi:
Ce qui affiche
La valeur est 42
Mais si je vous disais que c'est totalement inutile ^^
Rust fourni un sucre syntaxique pour ce genre d'usage au travers du mot-clef async
.
Qui écrit dans la console
La valeur est 42
Cela fonction car aussi bien async {42}
que AlwaysReady(42)
implémente Future<Output=i32>
.
On peut même simplifier en:
Ok donc on peut faire implémenter le comportement de X
.
Alors oui mais c'est un peu rapide ...
Truck not here yet
Truck not here yet
Truck not here yet
Truck not here yet
Il s'est passé 475.2µs
La valeur est 42
Pourtant le timer est bien là. 🙄
Oui mais une future qui n'est pas poll ne fait rien.
Pollons-la !
Oui mais avec quel contexte ?
Alors oui notre arnaque moldave marche ...
let waker = noop;
let mut cx = from_waker;
let mut timer = after;
loop
Et affiche bien ce qu'on désire
Truck not here yet
Truck not here yet
Truck not here yet
Truck not here yet
Il s'est passé 4.0007765s
La valeur est 42
Mais là aussi on sent que ce n'est pas élégant.
C'est pour cela que Rust fourni un dual à async
qui se nomme .await
.
C'est un sucre syntaxique qui s'utilise ainsi.
Bon là on parle !
Truck not here yet
Truck not here yet
Truck not here yet
Truck not here yet
Il s'est passé 4.0006858s
La valeur est 42
le mot-clef .await
s'utilise sur n'importe quelle Future
. Il a pour résultat d'écrire l'arnaque moldave mais en mieux 😂
je ne ferai aucun commentaire, ce n'est que pour votre culture.
C'est pas du vrai Rust mais du pseudo-rust, ça compile pas.
<expr>.await
devient
match into_future
Mais ça ressemble vaguement à ce qu'on a écrit. La récupération du Context en mieux.
Fonctions asynchrones
Mais on ne va tout de même pas écrire tout notre code dans un block? si ? Non biensûr que non.
On peut découper en fonction.
(Le découpage est un peu nul, mais c'est pour l'exemple ^^')
Si on essaie d'exécuter
error[E0728]: `await` is only allowed inside `async` functions and blocks
|
| fn iteration(counter: i32) -> bool {
| --------- this is not `async`
...
| Timer::after(1).await;
| ^^^^^ only allowed inside `async` functions and blocks
Rust nous dit que nous n'avons pas le droit d'utiliser .await
en dehors d'un block async
ou d'une fonction async
.
Tiens, tiens.
On peut donc rendre une fonction asynchrone. Essayons.
async
Il est bien gentil il nous mâche tout le boulot, même plus besoin de moi pour expliquer 😎
error[E0308]: mismatched types
|
| if iteration(counter) {
| ^^^^^^^^^^^^^^^^^^ expected `bool`, found future
|
note: calling an async function returns a future
--> src\main.rs:104:16
|
| if iteration(counter) {
| ^^^^^^^^^^^^^^^^^^
help: consider `await`ing on the `Future`
|
| if iteration(counter).await {
| ++++++
Bien Grand-Chef !
async
Pas mal !
Truck not here yet
Truck not here yet
Truck not here yet
Truck not here yet
Il s'est passé 4.0006166s
La valeur est 42
Du coup c'est aussi du sucre mais pourquoi ?
Prenons du plus simple.
async
Clairement c'est une Future
qui est renvoyée.
Donc égal à
D'où
async
// Equivalent à
On retrouve notre async
, on peut utiliser notre .await
. 😀
Itérer sur des futures
Tout ça commence à être très sympa, maintenant, nous allons exploiter la lazyness des futures à notre avantage.
Autrement dit, tant qu'une future n'est pas poll, elle ne fait rien, ce n'est qu'une boîte que l'on peut transporter et référencer.
On va donc s'en faire une collections et itérer dessus.
Et oui ! Tout le laïus interminable sur les itérateurs d'il y a plus de 1000 lignes c'était pas pour faire du remplissage. 😁
Donc première chose, nous allons créer une fonction delivery
qui nous renverra une Future
.
async
On modifie également la méthode itération
async
Notez que tout est en async fn
donc tout est Future
.
On peut donc mettre notre Future
dans un tableau et itéré dessus. 😀
On affiche bien
Truck not here yet
Truck not here yet
Truck not here yet
Truck not here yet
Il s'est passé 4.0006349s
La valeur est 42
Truck not here yet
Truck not here yet
Il s'est passé 2.0002937s
La valeur est 12
use Future; use Add; use Pin; use ; use ; async async
Donc on sait itérer mais il manque un dernier morceau, pour le moment nous avons une relation 1:1 entre le tableau et ce qu'on itère.
Nous allons voir comment créer une indirection.
Itérateur asynchrone
Cahier des charges
Voici le jeu de données que l'on possède. On va le transformer en un tableau de futures pour simuler une latence réseau par exemple.
let data = vec!;
Voici le résultat que l'on désire
let expected = ;
Attention ça va être un peu tordu, l'abstraction nécessite parfois de tordre la réalité ^^
Tout d'abord que voulons nous comme API haut-niveau ?
Dans l'idéal ceci :
Mais nous ne pourrons pas faire ça, rappellez vous, si l'on desugar cela donnerait:
let iterator = .into_iterator; // iterator = x
for sentence in iterator.await
Or cela nous oblige à implémenter à la fois Future
et Iterator
.
Mais du coup, le .await
n'est là que pour sortir l'iterateur, pas poller de la nouvelle données.
Bref on se retrouve coincé assez vite dans notre tentative de for
.
Bon et par un while
. On implémente Future
sur X
et fin.
Mais non 🙄
error[E0382]: use of moved value: `x`
|
| let mut x = X::new(data);
| ----- move occurs because `x` has type `X`, which does not implement the `Copy` trait
|
| while let Some(t) = x.await {}
| --------------------^------
| | |
| | `x` moved due to this await, in previous iteration of loop
| inside of this loop
|
note: `into_future` takes ownership of the receiver `self`, which moves `x`
|
| fn into_future(self) -> Self::IntoFuture;
Car derrière le rideau, .await
applle into_future
qui consomme x
,
Nous n'allons pas non plus pouvoir copier car les data
seront allouées et donc non-copiable.
Nous devons donc partir sur une double indirection.
Qu'il faut lire comme
loop
On va donc appelé x.next()
à chaque tour de boucle, on sait aussi que next_future
est une impl Future<Output=Option<String>>
vu que l'on veut des Option<String>
en bout de chaîne.
On sait également que next()
ne peut pas consommer x
sinon on retombe dans le même travers que into_future
.
Donc X
possède une fonction next
qui retourne quelque chose Y
qui implémente Future<Output=Option<String>>
.
Pour plus de lisibilité, nous allons appeler notre Y
une NextFuture
.
Il faudra également que NextFuture
connaisse le X
qu'il l'a créé.
Par conséquent la signature de X::next
est :
;
Nous pouvons en déterminer également le
type Output =
Et donc implémenter NextFuture
.
Alors c'est parti pour implémenter tout ce beau monde.
Avec une future fixe
D'abord X
.
On se créé un type custom.
type FutureInput = ;
Puis X
Puis on implémente le constructeur et next
Le <'_>
après le NextFuture
c'est pour pas s'ennuyer avec la lifetime, on laisse Rust bosser comme un grand.
Comme promis, NextFuture
prend une référence de X
mutable.
L'implémentation de la Future
appelle le poll
de la référence de X
.
Du coup, on implémente également Future
pour X
.
Et en exécutant,
on obtient
la clef vaut prefix_1_data1
la clef vaut prefix_1_data2
la clef vaut prefix_1_data3
la clef vaut prefix_2_data1
la clef vaut prefix_2_data2
la clef vaut prefix_2_data3
la clef vaut prefix_2_data4
la clef vaut prefix_3_data1
la clef vaut prefix_3_data2
la clef vaut prefix_3_data3
la clef vaut prefix_3_data4
Cool mais mais c'est pas très souple, nous avons seulement une future qui se résout tout de seul.
Nous, nous voulons du délai pour simuler le réseau.
On va procéder un remaniement ministériel.
Avec une future quelconque
Premièrement, nous nons créons une fonction asynchrone qui sera notre future avec délai.
use Timer;
async
Pour l'occasion, je remplace mon Timer
foireux par un vrai, celui fourni par async_io
au travers de smol
.
Je créé deux types pour me faciliter la vie
type BoxFuture<T> = ;
type FutureInput = ;
Le premier permet de ranger une future quelconque dans une boîte et de pouvoir la transbahuter sans y penser.
Le second est l'entré des données: (clef, valeur)
Je modifie X
pour refléter se nouveau type.
Je créé une fonction boxed
qui va transformer une Future
en BoxFuture
, le type est compliqué mais globalement, ça veut juste dire "c'est open bar, fait ce que tu veux, j'ai calé ta future en mémoire, je te file de quoi y accéder et je te promet qu'elle sera là quand tu en aura besoin".
Je modifie la fonction new
en conséquence, et en rajoutant un peu d'aléatoire.
cargo add rand
Le reste est identique
use Timer; use Rng; use Future; use Pin; use ; use ; // déclaration des types type BoxFuture<T> = ; type FutureInput = ; // future avec délai async // boxing de future /// Représente le retour de `X::next()` /// Itérateur asynchrone
Il s'est passé 185.0034ms
la clef vaut prefix_1_data1
Il s'est passé 1.7600028s
la clef vaut prefix_1_data2
Il s'est passé 1.2960022s
la clef vaut prefix_1_data3
Il s'est passé 1.2860019s
la clef vaut prefix_2_data1
Il s'est passé 331.0027ms
la clef vaut prefix_2_data2
Il s'est passé 406.0027ms
la clef vaut prefix_2_data3
Il s'est passé 1.7650035s
la clef vaut prefix_2_data4
Il s'est passé 688.0042ms
la clef vaut prefix_3_data1
Il s'est passé 1.5060066s
la clef vaut prefix_3_data2
Il s'est passé 1.8030035s
la clef vaut prefix_3_data3
Il s'est passé 124.0034ms
la clef vaut prefix_3_data4
Yeah ! Du délai ! 🤩
On simule du réseau pas stable ou de la DB lente ^^
Découpler le flux d'entré de la sortie
Bon c'est cool mais ce n'est pas ce qu'on désire.
Pour rappel on en entré
let data = vec!;
Et voici le résultat que l'on désire
let expected = ;
Or pour le moment, nous avons:
let result = vec!;
Nous allons donc retarder la sortie de la données et accumuler assez de résultat.
Pour accumuler nous allons utiliser le fait que les clefs sont toutes sous le modèle
prefix_<id>_data<n>
Accumuler sur ce prefix_<id>
et lorsque l'on passe au suivant relâcher la concaténation de ce qu'on a accumulé.
Pour le coup c'est vraiment de l'algoritmique, le seul point de vigilence est de bien renvoyer le dernier morceau d'accumulateur
use Timer; use Rng; use Future; use Pin; use ; use ; // déclaration des types type BoxFuture<T> = ; type FutureInput = ; // future avec délai async // boxing de future /// Représente le retour de `X::next()` /// Itérateur asynchrone
Ce qui nous donne
La clef prefix_1_data1 est arrivée en 1.1680078s
La clef prefix_1_data2 est arrivée en 1.6620032s
La clef prefix_1_data3 est arrivée en 152.0027ms
La clef prefix_2_data1 est arrivée en 1.3760026s
le message "jeu de données 1" est arrivée en 4.3585695s
La clef prefix_2_data2 est arrivée en 1.3740032s
La clef prefix_2_data3 est arrivée en 1.4890043s
La clef prefix_2_data4 est arrivée en 1.9170056s
La clef prefix_3_data1 est arrivée en 1.9830028s
le message "le ciel est gris" est arrivée en 6.7637874s
La clef prefix_3_data2 est arrivée en 310.0058ms
La clef prefix_3_data3 est arrivée en 587.0023ms
La clef prefix_3_data4 est arrivée en 527.0066ms
le message "L'asynchrone ce n'est pas si compliqué" est arrivée en 1.4246736s
Et là c'est vraiment le Champagne 🍾 🤩
Nous venons de créer un système qui est capable de retarder la création d'un flux avec des données qui elle même sont retardée de manière aléatoire.
Bref en un mot comme en 100, nous avons un Itérateur Asynchrone !!!
S'il reste des futures à poller cela donne quelque chose comme cela.
sequenceDiagram participant Future X ->> NextFuture : next() activate NextFuture Note left of X: création du NextFuture Runtime ->> NextFuture : poll() Note right of Runtime: on poll NextFuture NextFuture ->> X : poll() Note left of Future: On poll la première future activate Future X ->> Future : poll() Future -->> X : Poll::Pending Note left of X: La future n'est pas prête X -->> NextFuture : Poll::Pending NextFuture -->> Runtime : Poll::Pending Runtime ->> NextFuture : poll() Note right of Runtime: on poll NextFuture NextFuture ->> X : poll() X ->> Future : poll() Future -->> X : Poll::Ready(data) Note left of X: La future est prête X -->> NextFuture : Poll::Pending deactivate Future Note left of X: Mais X n'a pas assez accumulé NextFuture -->> Runtime : Poll::Pending Runtime ->> NextFuture : poll() Note right of Runtime: on poll NextFuture NextFuture ->> X : poll() Note left of Future: On passe à la future suivante activate Future X ->> Future : poll() Future -->> X : Poll::Ready(data) X -->> NextFuture : Poll::Ready(Some(data)) deactivate Future Note left of X: X a assez accumulé NextFuture -->> Runtime : Poll::Ready(Some(data)) Note left of Runtime: on relâche une donnée deactivate NextFuture
Lorsque l'on arrive au bout de l'itération cela donne
sequenceDiagram X ->> NextFuture : next() activate NextFuture Note left of X: création du NextFuture Runtime ->> NextFuture : poll() Note right of Runtime: on poll NextFuture NextFuture ->> X : poll() X -->> NextFuture : Poll::Ready(Some(data)) NextFuture -->> Runtime : Poll::Ready(Some(data)) Note left of X: Avec data deactivate NextFuture X ->> NextFuture : next() activate NextFuture Note left of X: nouvelle itération Runtime ->> NextFuture : poll() Note right of Runtime: on poll NextFuture NextFuture ->> X : poll() X -->> NextFuture : Poll::Ready(None) NextFuture -->> Runtime : Poll::Ready(None) Note left of X: Plus de data deactivate NextFuture
le trait Stream et l'écosystème asynchrone
Bon, on a bien rigolé à réinventer la roue, mais vous vous douter que tout ça existe déjà et n'est pas à réplémenter à chaque fois.
La première chose à comprendre c'est que même si la librairie ne fournie pas les outils, les choses sont déjà plus ou moins normailisées et n'attende que d'être officialisée.
Prenez par exemple la crate future_util, véritable couteau suisse des Futures.
La méthode boxed
que nous avons implémenté existe déjà.
Seulement elle est directement rattaché au trait FutureExt.
Trait qui est automatiquement implémenté pour toute future.
Ainsi on peut faire ceci
cargo add futures-lite
On utilise le type Boxed
de futures-utils
pub type Boxed<T> = ;
dans notre X
// Attention a importer le trait, sinon .boxed() n'existe pas !
use FutureExt;
Et du coup, nous pouvons nous séparer de notre fonction boxed
maison, et de notre BoxFuture
.
type BoxFuture<T> = ;
Bon et maintenant si je vous disais que le NextFuture
existe déjà ici.
Utilisons-le à la place
use NextFuture;
Oui mais on ne peut pas le créer, le champ est privé et il n'y a pas de constructeur.
error[E0451]: field `stream` of struct `NextFuture` is private
|
| NextFuture { stream: self }
| ^^^^^^^^^^^^ private field
Mais ça c'est parce que encore une fois, on ne travail pas comme il faut.
La vraie manière est d'implémenter le trait Stream.
Elle demande de définir une méthode poll_next
qui a l'exacte signature par repport à poll
.
Allons-y.
On implémente
poll_next
et paspoll
car leNextFuture
de la lib appellepoll_next
.
Ce qui nous donne finalement ce code
use Timer; use Boxed; use StreamExt; use ; use Rng; use Future; use Pin; use ; use ; // déclaration des types type FutureInput = ; // future avec délai async /// Itérateur asynchrone // on implémente Stream
Qui a pour affichage
La clef prefix_1_data1 est arrivée en 308.0041ms
La clef prefix_1_data2 est arrivée en 1.1420077s
La clef prefix_1_data3 est arrivée en 1.251002s
La clef prefix_2_data1 est arrivée en 1.1130028s
le message "jeu de données 1" est arrivée en 3.8145345s
La clef prefix_2_data2 est arrivée en 354.0151ms
La clef prefix_2_data3 est arrivée en 669.003ms
La clef prefix_2_data4 est arrivée en 439.0069ms
La clef prefix_3_data1 est arrivée en 682.0036ms
le message "le ciel est gris" est arrivée en 2.1447636s
La clef prefix_3_data2 est arrivée en 239.0026ms
La clef prefix_3_data3 est arrivée en 1.1480031s
La clef prefix_3_data4 est arrivée en 1.6000024s
le message "L'asynchrone ce n'est pas si compliqué" est arrivée en 2.9876462s
On a rien cassé 😁
API Haut niveau
Continuons à ne pas écrire du code, c'est assez reposant.
Nous allons nous créer un stream de nos futures.
unfold
Pour cela nous utilisons une fonction appelée unfold.
Celle-ci a pour signature
unfold
Nous, nous allons mettre ce que nous avion dans le
use ;
let data = from;
let rng = thread_rng;
// on donne le data et le rng comme état
let stream = unfold;
}
Pour l'utiliser, cela donne
use ;
Cool !
La clef prefix_1_data1 est arrivé en 1.9214232s
("prefix_1_data1", "jeu")
La clef prefix_1_data2 est arrivé en 251.7301ms
("prefix_1_data2", "de")
La clef prefix_1_data3 est arrivé en 1.9595308s
("prefix_1_data3", "données 1")
La clef prefix_2_data1 est arrivé en 1.9699186s
("prefix_2_data1", "le")
La clef prefix_2_data2 est arrivé en 1.6447015s
("prefix_2_data2", "ciel")
La clef prefix_2_data3 est arrivé en 202.1345ms
("prefix_2_data3", "est")
La clef prefix_2_data4 est arrivé en 897.8317ms
("prefix_2_data4", "gris")
La clef prefix_3_data1 est arrivé en 616.9607ms
("prefix_3_data1", "L'asynchrone")
La clef prefix_3_data2 est arrivé en 1.5978526s
("prefix_3_data2", "ce n'est")
La clef prefix_3_data3 est arrivé en 1.1858525s
("prefix_3_data3", "pas si")
La clef prefix_3_data4 est arrivé en 1.4951382s
("prefix_3_data4", "compliqué")
Mais ce n'est pas ce qu'on veut, du coup on continue dans l'exploration de l'ecosystème
async-stream
Pour finir, je veux vous montrer une lib très pratique async-stream
cargo add async-stream
Elle fourni une macro stream! qui permet de faire ce que l'on désire.
Le fonctionnement est simple.
Tout ce qu'on yield
est rajouté dans le stream.
async
Affiche
0
1
2
Comment cela marche ?
Et bien en fait, si on démonte la macro, ça donne ceci
// on créé un tuyau
let = channel;
// une partie est dans `AsyncStream`
let stream = new;
A chaque fois que l'on yield
on envoie dans un tuyau qui débouche sur AsyncStream
.
Et lui à l'autre bout dans son poll_next
.
Il attend alors rx
pour la prochaine valeur, jusqu'au None
.
Ainsi nous pouvons l'utiliser pour ce que l'on désire
use Timer;
use ;
use FutureExt;
use Rng;
use VecDeque;
use pin;
use ;
// déclaration des types
type FutureInput = ;
// future avec délai
async