https://lafor.ge/feed.xml

Debugguer avec la télemétrie en Rust

2022-07-15

Bonjour à toutes et tous 😀

Un court article sur ma frustration du moment. Je bosse actuellement avec de l’asynchrone et du parrallélisme.

C’est très cool quand ça marche mais c’est une galère sans nom à débugguer quand ça veut pas. 🙄

Et quand je dis ça veut pas, je veux dire, l’app bloque sans messages de log parce que vous êtes dans un test d’inté par exemple (non c’est pas du tout du vécu 😡).

Aujourd’hui, je vous rédige ce billet pour vous montrer de quelle manière l’on peut débugguer ce genre de situations.

Installer les outils

Nous allons utiliser un projet en Go qui s’appelle Jaeger (rien à voir avoir avec les Titans 😋) , ceci est un outil de tracing permettant de visualiser l’éxécution d’un programme asynchrone.

Comme pré-requis, je considère que vous avez Docker d’installé et de fonctionnel sur votre machine.

Ainsi que Rust 1.62+ également installé et de fonctionnel.

Lancez jaeger.

docker run -d --name jaeger \
    -p6831:6831/udp \
    -p6832:6832/udp \
    -p16686:16686 \
    -p14268:14268 \
    jaegertracing/all-in-one:latest

Si vous vous rendez sur localhost:16686. Vous devriez vous retrouver devant une interface comme celle ci:

home de jaeger

Nous allons aussi avoir besoin d’un projet en Rust.

cargo new lab-telementry
cd lab-telementry
cargo add opentelemetry
cargo add opentelemetry-jaeger
cargo add tracing
cargo add tracing-opentelemetry
cargo add tracing-subscriber

Ou dans le Cargo.toml

[dependencies]
opentelemetry = "0.17.0"
opentelemetry-jaeger = "0.16.0"
tracing = "0.1.35"
tracing-opentelemetry = "0.17.4"
tracing-subscriber = "0.3.14"

Les concepts de base

Tracer et Layer

Avant de savoir courir il faut apprendre à marcher.

Notre cerveau primitif, n’étant vraiment pas câblé pour l’asynchrone, nous allons expliquer les concepts de bases dans un environnement synchrone.

Pour cela dans le main.rs, nous allons créer un petit code de test.

Nous allons d’abord déclarer notre Tracer jaeger.

let tracer_jaeger = opentelemetry_jaeger::new_agent_pipeline()
    .with_service_name("lab-telemetry")
    .install_simple()
    .unwrap()

Nous lui donnons le nom de service lab-telemetry. C’est en quelque sorte l’identifiant projet dans Jaeger, en effet on peut faire le tracing d’une multitude de projet à la fois. Il faut bien quelque choses pour les distinguer.

Ensuite nous allons déclarer un Layer OpenTelemetry avec notre Tracer jaeger.

let opentelemetry_layer = tracing_opentelemetry::layer()
    .with_tracer(tracer_jaeger);

OpenTelemetry est une initiative d’uniformisation des standards. Son but est de rendre compatibles l’ensemble des outils de l’écosystème de l’observabilité déjà existants.

Puis nous allons enregister notre Layer dans un Registry.

tracing_subscriber::registry()
    .with(opentelemetry_layer)
    .init();

Le rôle du Registry va être de capturer les Span et de les distribuer aux différent Layers qui ont souscrit à ce Registry. Dans notre cas le opentelemetry_layer.

En parlant de Span créons en une.

let _ = span!(tracing::Level::TRACE, "my span");

Une Span est une portion bornée temporellement dans l’éxécution d’un programme, qui a une date de début et une date de fin.

Entre ces deux bornes temporelles, il va se dérouler des évènements.

Nous allons aussi forcer les Tracer à envoyer toutes les Spans qu’ils connaissent avant de terminer le programme.

shutdown_tracer_provider();
Code Rust
use opentelemetry::global::shutdown_tracer_provider;
use tracing::span;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

fn main() {
    // Create a Jaeger Tracer
    let tracer_jaeger = opentelemetry_jaeger::new_pipeline()
        .with_service_name("lab-telemetry")
        .install_simple()
        .unwrap();

    // Create a tracing layer with the configured tracer
    let opentelemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer_jaeger);

    // Register Layers
    tracing_subscriber::registry()
        .with(opentelemetry_layer)
        .init();

    // Declare a span
    let _ = span!(tracing::Level::TRACE, "my span");

    // Force all Span to be sent
    shutdown_tracer_provider();
}

Si vous allez sur localhost:16686. En sélectionnant le service lab-telemetry et ensuite en appuyant sur le bouton Find Traces.

Vous devriez désormais voir la span dans le volet de droite, ici entouré en rouge.

home de jaeger du service lab-telemetry

Cela signifie que notre système est opérationnel. Il envoie des Span à Jaeger.

  • Nous avons un Tracer jaeger qui est capable de transformer des Spans en dans un format de messages que Jaeger est capable de comprendre.
  • Ce Tracer est inclue dans un Layer OpenTelemetry qui sert de couche de compatibilité au reste de la télémétrie.
  • Ce Layer est enregistré dans un Registry qui écoute les spans de notre éxécution de code.
  • Les Spans sont des intervalles de temps dans l’éxécution de notre code.

Les spans

Je vous dis depuis tout à l’heure qu’une Span est un intervalle de temps. Mais il ne semble pas y avoir ni de début ni de fin à notre Span.

Déclarons le début de notre Span ainsi que sa fin.

    let guard = span!(Level::TRACE, "my_span bound").entered();
    sleep(Duration::from_secs(2));
    guard.exit();

home de jaeger du service lab-telemetry avec une span de 2 secondes

Nous avons bien une Span de 2 secondes.

Ce qui est intéressant, c’est que les Spans peuvent elles-même contenir d’autres Spans

let parent = span!(Level::TRACE, "parent").entered();

    sleep(Duration::from_secs(2));

    let child = span!(Level::TRACE, "child").entered();
        sleep(Duration::from_secs(1));
    child.exit();

parent.exit();

Dans Jaeger cela nous donne bien une Span de 3 secondes.

home de jaeger du service lab-telemetry avec une span parent

Mais détail supplémentaire, cette entrée possède 2 spans.

Cliquez sur 2 Spans.

flamechart de la span parent

Nous voyons bien l’imbrication des deux Spans. Le parent dure 3 secondes, l’enfant démarre deux secondes après le début du parent et dure 1 seconde.

Ce que vous voyez là ce nomme un flamechart et constitue le nerf de la guerre de notre article.

Il permet de visualiser temporellement les Spans de notre application et ainsi comprendre les agencements des appels lors de l’éxécution du programme.

Pour le moment tout est synchrone donc simple à comprendre mais ça va se corser très vite. 😇

Le code actualisé.

Code Rust
use opentelemetry::global::shutdown_tracer_provider;
use std::thread::sleep;
use std::time::Duration;
use tracing::{span, Level};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

fn main() {
    // Create a Jaeger Tracer
    let tracer_jaeger = opentelemetry_jaeger::new_pipeline()
        .with_service_name("lab-telemetry")
        .install_simple()
        .unwrap();

    // Create a tracing layer with the configured tracer
    let opentelemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer_jaeger);

    // Register Layers
    tracing_subscriber::registry()
        .with(opentelemetry_layer)
        .init();

    // Declare a span
    let parent = span!(Level::TRACE, "parent").entered();

    sleep(Duration::from_secs(2));

    let child = span!(Level::TRACE, "child").entered();
    sleep(Duration::from_secs(1));
    child.exit();

    parent.exit();

    // Force all Span to be sent
    shutdown_tracer_provider();
}

Les logs

Nous sommes désormais dans la capacité de déterminer quand telles où telles Span commencent et se terminent et même de les imbriquer.

C’est bien de savoir le Quand mais le Quoi c’est encore mieux.

C’est à ce stade que la notion de logs est introduite.

Elle va nous permettre de définir des messages rattacher à la Span.

let parent = span!(Level::TRACE, "parent").entered();
    info!("Début de la span parent");

    debug!("Nous allons attendre 2 secondes...");
    sleep(Duration::from_secs(2));
    debug!("Fin du délai d'attente de 2 secondes");

    let child = span!(Level::TRACE, "child").entered();
        info!("Début de la span child");
        debug!("Nous allons attendre 2 secondes...");
        sleep(Duration::from_secs(1));
        debug!("Fin du délai d'attente de 2 secondes");
        info!("Fin de la span child");
    child.exit();
    info!("Fin de la span parent");
parent.exit();

Dans Jaeger.

Si vous cliquez sur les deux textes encadrés en rouge à gauche, puis sur Logs. Vous devriez visualiser ceci:

flamechart de la span parent avec logs

Ce qui est encadré en bleu se nomme un évènement ou Event. Une même Span peut contenir de zéro à plusieurs événements.

Ici, nos deux Spans possèdent chacune 4 Events.

Il est également possible de les ouvrir pour avoir encore plus de détails.

détails des logs d'une span

Ici, nous retrouvons le corps du message ainsi que le niveau de log, le numéro de ligne et le fichier qui a émit cet Event.

Ce qui est intéressant c’est que ces Event peuvent-être variabilisés et enrichis.

let id = 42;
let parent = span!(Level::TRACE, "parent").entered();
  info!(request.id = id, "Début de la span parent");

détails des logs d'une span avec variable

Et même avec des données complexes comme des structures.

#[derive(Debug)]
struct Request {
    id: u8,
    path: &'static str,
    method: &'static str,
}

impl Display for Request {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        write!(f, "Je suis une requête")
    }
}

let request = Request {
    id: 42,
    path: "/home",
    method: "GET",
};

let parent = span!(Level::TRACE, "parent").entered();
info!(
    request.id = request.id,
    request_display = %request,
    request_debug = ?request,
    "Début de la span parent"
);
  • Nous pouvons accéder au champ request.id.
  • Nous pouvons appeler le trait Dispay de Request via le sigil %.
  • Nous pouvons également appeler le trait Debug de Request via le sigil ?.

Sur Jaeger cela nous donne:

détails des logs d'une span avec tag venant d'une structure

Les champs qui sont rattachés à un Event sont des Tags. Un Tag est une entité indexée qui peut être recherchée dans Jaeger et dans tout autre plateforme d’analyse de télémétrie.

Il en va de même pour notre tag personnalisé request.id.

recherche par tag.

Et là on s’aperçoit très vite que si l’on balade un identifiant dans toute nos spans on va pouvoir avoir une vision d’ensemble extrêmement précise de ce qui se passe dans notre système pour un identifiant donné.

  • Nous avons un Tracer jaeger qui est capable de transformer des Span en dans un format de messages que Jaeger est capable de comprendre.
  • Ce Tracer est compris dans un Layer OpenTelemetry qui sert de couche de compatibilité au reste de la télémétrie.
  • Ce Layer est enregistré dans un Registry qui écoute les spans de notre éxécution de code.
  • Les Spans sont des intervalles de temps dans l’éxécution de notre code.
  • Les Spans peuvent avoir des Events
  • Les Events sont des points datés dans l’éxécution d’une Span
  • Un Event peut avoir des Tags
  • Les Tags sont des clefs-valeurs qui sont indexés et donc qui peuvent être recherchés au sein d’une plateforme d’analyse de télémétrie.
Avec ça, nous avons tout le vocabulaire nécessaire! 😀

Le code de cette partie:

Code Rust
use opentelemetry::global::shutdown_tracer_provider;
use std::fmt::{Display, Formatter};
use std::thread::sleep;
use std::time::Duration;
use tracing::{debug, info, span, Level};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

fn main() {
    // Create a Jaeger Tracer
    let tracer_jaeger = opentelemetry_jaeger::new_pipeline()
        .with_service_name("lab-telemetry")
        .install_simple()
        .unwrap();

    // Create a tracing layer with the configured tracer
    let opentelemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer_jaeger);

    // Register Layers
    tracing_subscriber::registry()
        .with(opentelemetry_layer)
        .init();

    #[derive(Debug)]
    struct Request {
        id: u8,
        path: &'static str,
        method: &'static str,
    }

    impl Display for Request {
        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
            write!(f, "Je suis une requête")
        }
    }

    let request = Request {
        id: 42,
        path: "/home",
        method: "GET",
    };

    let parent = span!(Level::TRACE, "parent").entered();
    info!(
        request.id = request.id,
        request_display = %request,
        request_debug = ?request,
        "Début de la span parent"
    );

    debug!("Nous allons attendre 2 secondes...");
    sleep(Duration::from_secs(2));
    debug!("Fin du délai d'attente de 2 secondes");

    let child = span!(Level::TRACE, "child").entered();
    info!("Début de la span child");
    debug!("Nous allons attendre 2 secondes...");
    sleep(Duration::from_secs(1));
    debug!("Fin du délai d'attente de 2 secondes");
    info!("Fin de la span child");
    child.exit();
    info!("Fin de la span parent");
    parent.exit();

    // Force all Span to be sent
    shutdown_tracer_provider();
}

Instrumentation des fonctions

Pour le moment notre système fonctionne bien dans notre méthode main, mais comme tout programme un peu conséquent il faudra invariablement le modulariser.

Et donc le découper en fonction.

Nous allons en écrire deux et réutiliser notre structure Request:

#[derive(Debug)]
struct Request {
    id: u8,
    path: &'static str,
    method: &'static str,
    token: &'static str,
}

fn handle_request(request: Request) {
    if auth(request) {
        // ...
    } else {
        // ...
    }
}

fn auth(request: Request) -> bool {
    request.token == "blabla"
}

Nous allons maintenant rajouter des spans, des events et des tags à nos fonctions.

fn handle_request(request: Request) {
    let span = trace_span!("handle_request").entered();
    if auth(&request) {
        info!(request = ?request, id = request.id, "Request authenticated")
    } else {
        error!(request = ?request, id = request.id, "Request forbidden")
    }
    span.exit();
}

fn auth(request: &Request) -> bool {
    let span = trace_span!("auth").entered();
    // Emulate call for db
    sleep(Duration::from_millis(500));
    let value = request.token == "blabla";
    span.exit();
    value
}

Et dans notre main

let request = Request {
    id: 42,
    path: "/home",
    method: "GET",
    token: "blabla",
};

let request2 = Request {
    id: 43,
    path: "/home",
    method: "GET",
    token: "bleubleu",
};

let span = span!(Level::TRACE, "main").entered();
handle_request(request);
handle_request(request2);
span.exit();

Sur jaeger, cela nous donne ceci:

gérer plusieurs appels de fonctions.

On y voit une span différente pour chaque appel à handle_request.

En ouvrant les détails nous pouvons voir le token en cause.

détails d'un Tag.

Maintenant nous allons simplifier l’écriture en utilisant un attribut de fonction.

Cet attribut est #[tracing::instrument].

#[tracing::instrument(name = "main")]
fn handle_requests() {
    let request = Request {
        id: 42,
        path: "/home",
        method: "GET",
        token: "blabla",
    };

    let request2 = Request {
        id: 43,
        path: "/home",
        method: "GET",
        token: "bleubleu",
    };

    handle_request(request);
    handle_request(request2);
}

#[tracing::instrument]
fn handle_request(request: Request) {
    if auth(&request) {
        info!(request = ?request, id = request.id, "Request authenticated")
    } else {
        error!(request = ?request, id = request.id, "Request forbidden")
    }
}

#[tracing::instrument]
fn auth(request: &Request) -> bool {
    // Emulate call for db
    sleep(Duration::from_millis(500));
    request.token == "blabla"
}
#[tracing::instrument(name = "main")]

Permet de redéfinir le nom de la span; par défaut il s’agit du nom de la fonction décorée.

Il devient alors facile d’outiller les fonctions et d’en rajouter un peu partout.

tag ajouté automatiquement par l'attribut instrument

En bonus, on gagne également le tag request qui provient des paramètres de notre appel à handle_request. Et ce, gratuitement.

Code de la partie.

Code Rust
use opentelemetry::global::shutdown_tracer_provider;

use std::thread::sleep;
use std::time::Duration;
use tracing::{error, info, span, trace_span, Level};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

#[derive(Debug)]
struct Request {
    id: u8,
    path: &'static str,
    method: &'static str,
    token: &'static str,
}

fn main() {
    // Create a Jaeger Tracer
    let tracer_jaeger = opentelemetry_jaeger::new_pipeline()
        .with_service_name("lab-telemetry")
        .install_simple()
        .unwrap();

    // Create a tracing layer with the configured tracer
    let opentelemetry_layer = tracing_opentelemetry::layer()
        .with_tracer(tracer_jaeger);

    // Register Layers
    tracing_subscriber::registry()
        .with(opentelemetry_layer)
        .init();

    handle_requests();

    // Force all Span to be sent
    shutdown_tracer_provider();
}

#[tracing::instrument(name = "main")]
fn handle_requests() {
    let request = Request {
        id: 42,
        path: "/home",
        method: "GET",
        token: "blabla",
    };

    let request2 = Request {
        id: 43,
        path: "/home",
        method: "GET",
        token: "bleubleu",
    };

    handle_request(request);
    handle_request(request2);
}

#[tracing::instrument]
fn handle_request(request: Request) {
    if auth(&request) {
        info!(request = ?request, id = request.id, "Request authenticated")
    } else {
        error!(request = ?request, id = request.id, "Request forbidden")
    }
}

#[tracing::instrument]
fn auth(request: &Request) -> bool {
    // Emulate call for db
    sleep(Duration::from_millis(500));
    request.token == "blabla"
}

L’asynchrone

Nous allons rendre asynchrone et parallélisé le code du dessus.

Avant tout installons tokio.

cargo add tokio --features macros --features rt --features rt-multi-thread

Ce qui nous donne dans le Cargo.toml

tokio = { version = "1.20.0", features = ["macros", "rt", "rt-multi-thread"] }

Puis, il faut que l’on fasse évoluer notre méthode handling_requests.

Elle devient asynchrone

#[tracing::instrument(name = "main")]
async fn handle_requests() {
    // corps de la méthode
}

Puis nous allons modifier notre fonction handle_requests.

#[tracing::instrument(name = "main")]
async fn handle_requests() {
    let request = Request {
        id: 42,
        path: "/home",
        method: "GET",
        token: "blabla",
    };

    let request2 = Request {
        id: 43,
        path: "/home",
        method: "GET",
        token: "bleubleu",
    };

    let span = Span::current();

    let t1 = tokio::spawn(handle_request(request));

    let span = Span::current();
    let t2 = tokio::spawn(
        async {
            thread::sleep(Duration::from_millis(100));
            handle_request(request2).await
        }
    );

    let _ = join!(t1, t2);
}

Pour que l’asynchrone fonctionne il lui faut un reactor, ici ce sera tokio.

#[tokio::main]
async fn main() {
    // ...
}

Étant donné que handling_requests est désormais asynchrone, il faut la poll pour qu’elle fasse quelque chose.

handle_requests().await;

Le code complet

Code Rust
use opentelemetry::global::shutdown_tracer_provider;
use std::thread;

use std::thread::sleep;
use std::time::Duration;
use tokio::join;
use tracing::{error, info, Span};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

#[derive(Debug)]
struct Request {
    id: u8,
    path: &'static str,
    method: &'static str,
    token: &'static str,
}

#[tokio::main]
async fn main() {
    // Create a Jaeger Tracer
    let tracer_jaeger = opentelemetry_jaeger::new_pipeline()
        .with_service_name("lab-telemetry")
        .install_simple()
        .unwrap();

    // Create a tracing layer with the configured tracer
    let opentelemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer_jaeger);

    // Register Layers
    tracing_subscriber::registry()
        .with(opentelemetry_layer)
        .init();

    handle_requests().await;

    // Force all Span to be sent
    shutdown_tracer_provider();
}

#[tracing::instrument(name = "main")]
async fn handle_requests() {
    let request = Request {
        id: 42,
        path: "/home",
        method: "GET",
        token: "blabla",
    };

    let request2 = Request {
        id: 43,
        path: "/home",
        method: "GET",
        token: "bleubleu",
    };

    let t1 = tokio::spawn(handle_request(request));
    
    let t2 = tokio::spawn(async {
        // Je décale l'exécution de 100 ms pour un rendu plus visuel
        thread::sleep(Duration::from_millis(100));
        handle_request(request2).await
    });

    let _ = join!(t1, t2);
}

#[tracing::instrument]
async fn handle_request(request: Request) {
    if auth(&request) {
        info!(request = ?request, id = request.id, "Request authenticated")
    } else {
        error!(request = ?request, id = request.id, "Request forbidden")
    }
}

#[tracing::instrument]
fn auth(request: &Request) -> bool {
    // Emulate call for db
    sleep(Duration::from_millis(500));
    request.token == "blabla"
}
Si l’on éxécute le code tel quel nous allons observer que nos spans sont découpés en 3.

En effet, les différentes fonctions évoluant dans des threads différents les contextes et donc les root spans sont aussi différents.

Ce qui nous donne le résultat suivant:

les spans sont coupés en trois car 3 threads

Pour réconciller ces threads nous allons devoir opérer quelques changements.

La crate tracing nous permet de facilement attacher une span à un thread et plus particulèrement à la futur éxécutée.

let future = async {
    
};

tokio::spawn(future.instrument(info_span!("ma span")))

Cet appel à instrument va venir accrocher la span “ma span” à l’éxécution de la futur ainsi sera repésenté par “ma span” dans Jaeger.

Faites le test. 😛

On peut donc en extrapole quelque chose comme:

let fut1 = handle_request(request);
let fut2 = async {
    thread::sleep(Duration::from_millis(100));
    handle_request(request2).await
};

let t1 = tokio::spawn(fut1.instrument(info_span!("handle request success")));
let t2 = tokio::spawn(fut2.instrument(info_span!("handle request fail")));

let _ = join!(t1, t2);

Ce qui nous donne le résultat suivant sur Jaeger

les spans des threads sont regroupées et wrappées

Et si l’on désire ne avoir de wrapping on peut également écrire ceci:

let t1 = tokio::spawn(fut1.instrument(Span::current()));
let t2 = tokio::spawn(fut2.instrument(Span::current()));

Span::current() permet de récupérer la span du thread parent.

les spans des threads sont regroupées sans wrapping

Et voilà nous sommes en mesure de pouvoir comprendre d’un coup d’oeil ce qu’il se passe temporellement lors de l’éxécution en parallèle de plusieurs threads.

Le code final

Code Rust
use opentelemetry::global::shutdown_tracer_provider;
use std::thread;

use std::thread::sleep;
use std::time::Duration;
use tokio::join;
use tracing::{error, info, Instrument, Span};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

#[derive(Debug)]
struct Request {
    id: u8,
    path: &'static str,
    method: &'static str,
    token: &'static str,
}

#[tokio::main]
async fn main() {
    // Create a Jaeger Tracer
    let tracer_jaeger = opentelemetry_jaeger::new_pipeline()
        .with_service_name("lab-telemetry")
        .install_simple()
        .unwrap();

    // Create a tracing layer with the configured tracer
    let opentelemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer_jaeger);

    // Register Layers
    tracing_subscriber::registry()
        .with(opentelemetry_layer)
        .init();

    handle_requests().await;

    // Force all Span to be sent
    shutdown_tracer_provider();
}

#[tracing::instrument(name = "main")]
async fn handle_requests() {
    let request = Request {
        id: 42,
        path: "/home",
        method: "GET",
        token: "blabla",
    };

    let request2 = Request {
        id: 43,
        path: "/home",
        method: "GET",
        token: "bleubleu",
    };

    let fut1 = handle_request(request);
    let fut2 = async {
        thread::sleep(Duration::from_millis(100));
        handle_request(request2).await
    };

    let t1 = tokio::spawn(fut1.instrument(Span::current()));
    let t2 = tokio::spawn(fut2.instrument(Span::current()));

    let _ = join!(t1, t2);
}

#[tracing::instrument]
async fn handle_request(request: Request) {
    if auth(&request) {
        info!(request = ?request, id = request.id, "Request authenticated")
    } else {
        error!(request = ?request, id = request.id, "Request forbidden")
    }
}

#[tracing::instrument]
fn auth(request: &Request) -> bool {
    // Emulate call for db
    sleep(Duration::from_millis(500));
    request.token == "blabla"
}

Problèmes pouvant être rencontrés

La span n’est jamais envoyée.

Parfois nous trouvons dans des situations où une span devrait apparaître mais ne le fait jamais.

Pour comprendre cette situation, prenons le code suivant:

use opentelemetry::global::shutdown_tracer_provider;
use std::thread::sleep;
use std::time::Duration;
use tracing::info;

use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

fn main() {
    // Create a Jaeger Tracer
    let tracer_jaeger = opentelemetry_jaeger::new_pipeline()
        .with_service_name("infinite")
        .install_simple()
        .unwrap();

    // Create a tracing layer with the configured tracer
    let opentelemetry_layer = tracing_opentelemetry::layer()
        .with_tracer(tracer_jaeger)
        .with_threads(true);

    // Register Layers
    tracing_subscriber::registry()
        .with(opentelemetry_layer)
        .init();

    for_loop();
    infinite_loop();

    // Force all Span to be sent
    shutdown_tracer_provider();
}

#[tracing::instrument]
fn infinite_loop() {
    info!("Début de la boucle");
    let mut i = 0;
    loop {
        info!(iteration = i, "Itération");
        sleep(Duration::from_millis(1));
        i += 1;
    }
    info!("Fin de la boucle")
}

#[tracing::instrument]
fn for_loop() {
    info!("Début de la boucle");
    for i in 1..10 {
        info!(iteration = i, "Itération");
        sleep(Duration::from_millis(1));
    }
    info!("Fin de la boucle")
}

Si l’on éxécute celui-ci nous allons avoir le résultat suivant:

seul la span for-loop est envoyée

Contenant les différents events des itérations.

détails de la span for_loop

Par contre, aucune trace de la span infinite_loop.

Une span étant bornée temporellement, si celle-ci ne finit jamais, elle ne sera jamais envoyé au Tracer et par conséquent les Events qui y sont rattachés non plus.

Pour nous permettre de visualiser le comportement de notre fonctions infinite_loop. Nous allons devoir créer une span qui est bornée dans le temps et donc transmissible au Tracer.

Et finalement affichable dans Jaeger.

Pour ce faire nous allon utiliser une méthode qui se nomme in_scope.

info_span!("ma span").in_scope(||{
    info!("Dans ma span");
})

Tous les évènements qui sont déclenchés dans in_scope seront rattachés à ma span et envoyés sur Jaeger comme tel.

Remarque

Le code

info_span!("ma span").in_scope(||{
    info!("Dans ma span");
})

Est équivalent à

let span = info_span!("ma span");
{
    let _enter = span.enter();
    info!("Dans ma span");
}
drop(span);

Si drop(span) n’est pas appelé et que vous vous trouvez dans une boucle infinie.

Votre span ma span ne sera jamais envoyée!

Nous pouvons réécrire notre fonction infinite_loop ainsi:

#[tracing::instrument]
fn infinite_loop() {
    info_span!("infinite_loop").in_scope(|| {
        info!(flag = "start", "Début de la boucle");
    });

    let mut i = 0;
    loop {
        info_span!("infinite_loop").in_scope(|| {
            info!(iteration = i, "Itération");
        });

        sleep(Duration::from_millis(1));
        i += 1;
    }
    info!("Fin de la boucle")
}

Sur Jaeger nous avons désormais une span infinite_loop! 😎

détails de la span infinite_loop

Et en ouvrant son détails on peut voir que toutes les spans ayant le même nom sont regroupés sous la forme d’un bel escalier de spans.

détails de la span infinite_loop

Nous avons notre flag=start tout en haut et les iteration=N ensuite.

D’ailleurs nous pouvons créer le même genre de retour pour notre for_loop, cela permettra de visualiser le temps pris par chaque itération.

#[tracing::instrument]
fn for_loop() {
    info!("Début de la boucle");
    for i in 1..10 {
        info_span!("for_loop").in_scope(|| {
            info!(iteration = i, "Itération");
        });
        sleep(Duration::from_millis(1));
    }
    info!("Fin de la boucle")
}

Cette fois-ci sur Jaeger notre span for_loop devient:

détails de la span for_loop après utilisation de in_scope

On peut y voir les temps passés dans les in_scope de chaque itération.

Et si l’on ouvre les détails cela nous donne:

détails des sous spans de for_loop

Nous avons bien nos events de début et de chaque itération.

Nous avons en prime l’event de fin car cette span est finie.

Nous pouvons également le réaliser en asynchrone.

Code Rust
use opentelemetry::global::shutdown_tracer_provider;
use std::thread::sleep;
use std::time::Duration;
use tokio::{join, spawn};
use tracing::{info, info_span};

use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

#[tokio::main]
async fn main() {
    // Create a Jaeger Tracer
    let tracer_jaeger = opentelemetry_jaeger::new_pipeline()
        .with_service_name("infinite")
        .install_simple()
        .unwrap();

    // Create a tracing layer with the configured tracer
    let opentelemetry_layer = tracing_opentelemetry::layer()
        .with_tracer(tracer_jaeger)
        .with_threads(true);

    // Register Layers
    tracing_subscriber::registry()
        .with(opentelemetry_layer)
        .init();

    let fut1 = spawn(for_loop());
    let fut2 = spawn(infinite_loop());

    let _ = join!(fut1, fut2);

    // Force all Span to be sent
    shutdown_tracer_provider();
}

#[tracing::instrument]
async fn infinite_loop() {
    info_span!("infinite_loop").in_scope(|| {
        info!(flag = "start", "Début de la boucle");
    });

    let mut i = 0;
    loop {
        info_span!("infinite_loop").in_scope(|| {
            info!(iteration = i, "Itération");
        });

        sleep(Duration::from_millis(1));
        i += 1;
    }
    info!("Fin de la boucle")
}

#[tracing::instrument]
async fn for_loop() {
    info!("Début de la boucle");
    for i in 1..10 {
        info_span!("for_loop").in_scope(|| {
            info!(iteration = i, "Itération");
        });
        sleep(Duration::from_millis(1));
    }
    info!("Fin de la boucle")
}
Et obtenir le même résultat dans Jaeger.

Mon Jaeger crash !

Dans certaines condition il se peut que votre Jeager crash, et qu’avant il prenne un nombre exponentiel de mémoire.

Ce cas, je l’ai observé lorsqu’un trop grand nombre de span sont envoyées en très peu de temps.

Vous risquez de vous retrouver avec ce genre de message d’erreur:

jaeger qui a crash

On peut aisément reproduire ce comportement grâce à ce code

fn main() {


    // Déclaration du tracing
    
    info_span!("will crash").in_scope(|| {
        for i in 0..1000000 {
            dummy(i)
        }
    });
}

#[tracing::instrument]
fn dummy(index: u64) {
    info!("I was here {}", index)
}

Celui-ci va engorger le système et finir par faire crasher le système de télémétrie.

Heureusement il est possible de régler ce soucis en configurant la manière dont les spans sont traitées.

En effet, il est possible d’affecter un niveau de log à nos spans.

#[tracing::instrument(level = "debug")]
fn dummy(index: u64) {
    debug!("I was here {}", index)
}

Par contre en l’état le système ne filtrera rien, il faut rajouter un filtre qui aura pour effet de venir conditionner le traitement ou non de notre span.

Pour en bénéficier, il faut activer une feature

cargo add tracing-subscriber --features env-filter

Ou via le Cargo.toml

tracing-subscriber = { version = "0.3.14", features = ["env-filter"] }
    // Register Layers
    tracing_subscriber::registry()
        .with(EnvFilter::from_default_env())
        .with(opentelemetry_layer)
        .init();

La 3ème ligne, indique d’utiliser la variable d’environnement standard RUST_LOG.

Ce qui signifie que l’on peut venir piloter les spans au travers d’elle.

RUST_LOG=info cargo run

Pour les besoin du test et pour des raisons de visibilité, nous allons réduire le nombre d’itérations à 10.

fn main() {


    // Déclaration du tracing
    
    info_span!("won't crash").in_scope(|| {
        for i in 0..10 {
            dummy(i)
        }
    });
}

Avec ce réglage vous allez voir dans Jaeger ceci:

aucune span de la méthode dummy n'est envoyé

Aucune span ne sera produite.

Par contre en éxécutant

RUST_LOG=debugcargo run

Vous verrez

les spans de la méthode dummy sont envoyées

Nous pouvons donc régler le degré de finesse de nos spans en fonction de nos besoins.

Que ce soit de la télémétrie de production ou de développement et donc du debug.

Récapitulatif de code:

Code Rust
use futures::future::join_all;
use futures::{Stream, StreamExt};
use opentelemetry::global::shutdown_tracer_provider;
use opentelemetry::runtime::Tokio;
use tracing::{debug, info_span};

use tracing_subscriber::EnvFilter;

use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

#[tokio::main]
async fn main() {
    // Create a Jaeger Tracer
    let tracer_jaeger = opentelemetry_jaeger::new_pipeline()
        .with_service_name("too much spans")
        .install_batch(Tokio)
        .unwrap();

    // Create a tracing layer with the configured tracer
    let opentelemetry_layer = tracing_opentelemetry::layer()
        .with_tracer(tracer_jaeger)
        .with_threads(true);

    // Register Layers
    tracing_subscriber::registry()
        .with(EnvFilter::from_default_env())
        .with(opentelemetry_layer)
        .init();

    info_span!("won't crash").in_scope(|| {
        for i in 0..10 {
            dummy(i)
        }
    });

    shutdown_tracer_provider();
}

#[tracing::instrument(level = "debug")]
fn dummy(index: u64) {
    debug!("I was here {}", index)
}

Un petit exemple concret

Nous allons réaliser un exemple qui va récapituler les différents concepts et les mettre en action dans un cas concret.

Le système que je vous propose sera un serveur web qui ira chercher des données sur un serveur distant. Et qui mettra en cache dans une base de données sqlite ces réponses de requêtes avant de servir le client.

schéma général de l'application d'exemple

Disclaimer

Ici le but n’est pas de créer quelque chose qui doit aller en prod, les décisions d’architectures peuvent être qualifier au mieux de discutables. 😅

Pour cela nous allons avoir besoin d’un serveur distant, nous allons utiliser la plateforme JSON placeholder qui est conçu pour cet usage.

Et plus particulièrement son service users. Qui permet de générer des données utilisateurs.

Si l’on interroge ainsi https://jsonplaceholder.typicode.com/users/1. Nous obtenons un utilisateur en particulier.

{
  "id": 1,
  "name": "Leanne Graham",
  "username": "Bret",
  "email": "Sincere@april.biz",
  "address": {
    "street": "Kulas Light",
    "suite": "Apt. 556",
    "city": "Gwenborough",
    "zipcode": "92998-3874",
    "geo": {
      "lat": "-37.3159",
      "lng": "81.1496"
    }
  },
  "phone": "1-770-736-8031 x56442",
  "website": "hildegard.org",
  "company": {
    "name": "Romaguera-Crona",
    "catchPhrase": "Multi-layered client-server neural-net",
    "bs": "harness real-time e-markets"
  }
}

Mais attention au retour de cette API, en effet tout id utilisateur supérieur à 10 renvoit un json vide mais un code de retout HTTP 200.

Afin de pouvoir manipuler ce JSON nous allons utiliser la lib serde_json qui nous impose de créer une structure pour accueillir les données.

On l’installe

cargo add serde --features derive
cargo add serde serde_json
#[derive(Deserialize, Serialize)]
struct User {
    id: u32,
    name: String,
    email: String,
    phone: String,
    website: String,
}

Ensuite pour des raisons de rapidité de mise en oeuvre, je vais utiliser un serveur actix-web.

On l’installe

cargo add actix-web

Voici le main

#[actix_web::main]
async fn main() -> std::io::Result<()> {

    HttpServer::new(|| App::new().service(get_user))
        .bind(("127.0.0.1", 8080))?
        .run()
        .await
}

Il déclare un serveur en écoute locale sur le port 8080, et possède un service get_user.

Que nous allons déclarer dès maintenant.

#[get("/user/{id}")]
async fn get_user(id: web::Path<u32>) -> HttpResponse {
    let id = id.into_inner();

    if id > 10 {
        return HttpResponse::BadRequest().json(json!({
            "message" : "User id can't exceed 10"
        }));
    }

    let user = Cache::hit(id).await;

    match user {
        Ok(user) => HttpResponse::Ok().json(user),
        Err(err) => HttpResponse::NotFound().json(json!({
           "code": 404,
            "message": err.to_string(),
        })),
    }
}

Cette méthode renvoit une BadRequest si l’id est supérieur à 10. Dans le cas contraire il utilise le cache pour récupérer l’utilisateur associer à l’id demandé.

Puis formate la réponse en erreur ou non, sous forme de JSON.

Parlons en du cache.

Celui-ci est constitué d’une base de données sqlite en local.

À chaque requête on vient ouvrir cette BDD et effectuer une opération de SELECT sur l’ID en question.

Code Rust
struct Cache;

impl Cache {

    fn init() -> Result<Connection> {
        let connection = sqlite::open("db.sqlite")?;
        connection.execute("CREATE TABLE IF NOT EXISTS users (id INTEGER , value TEXT )")?;

        Ok(connection)
    }

    fn set(user: &User) -> Result<()> {
        let connection = Self::init()?;
        let mut statement = connection.prepare("INSERT INTO users VALUES(?, ?)")?;
        let user_string = json!(user).to_string();
        statement.bind(1, user.id as i64)?;
        statement.bind(2, user_string.as_str())?;

        statement.next()?;

        Ok(())
    }

    async fn hit(key: u32) -> Result<Option<User>> {
        let connection = Self::init()?;

        let mut statement = connection.prepare("SELECT * FROM users WHERE id = ?")?;
        statement.bind(1, key as i64)?;

        match statement.next().unwrap() {
            State::Row => {
                let user_string = statement.read::<String>(1)?;
                let user = serde_json::from_str::<User>(&user_string)?;

                Ok(Some(user))
            }
            State::Done => {
                let user = fetch_user(key).await?;
                Self::set(&user)?;

                Ok(Some(user))
            }
        }
    }
}
Lors de l’opération de hit du cache, si celui-ci échoue car la valeur n’est pas encore disponible dans le cache. Celui-ci viens appeler une méthode fetch_user qui réalise l’appel sur le réseau.
async fn fetch_user(id: u32) -> reqwest::Result<User> {
    let url = format!("https://jsonplaceholder.typicode.com/users/{}", id);

    let response = reqwest::get(url).await?;
    let user = response.json::<User>().await?;

    Ok(user)
}

Comme nous utilisons un ensemble de crates différentes leurs erreurs ne vont pas être compatibles. Pour éviter ce souci, nous créons une énumération pour unifier les erreurs.

Nous utilisons aussi la crate thiserror qui permet de facilement créer les méthode de display de chacune des erreurs.

Code Rust
#[derive(Error, Debug)]
enum CustomError {
    #[error("sqlite errors {0:?}")]
    Sqlite(sqlite::Error),
    #[error("reqwest errors")]
    Reqwest(reqwest::Error),
    #[error("json errors")]
    Json(serde_json::Error),
}

impl From<sqlite::Error> for CustomError {
    fn from(err: sqlite::Error) -> Self {
        CustomError::Sqlite(err)
    }
}

impl From<serde_json::Error> for CustomError {
    fn from(err: serde_json::Error) -> Self {
        CustomError::Json(err)
    }
}

impl From<reqwest::Error> for CustomError {
    fn from(err: reqwest::Error) -> Self {
        CustomError::Reqwest(err)
    }
}

type Result<T> = std::result::Result<T, CustomError>;

Un petit récap:

Code Rust
use actix_web::{get, web, App, HttpResponse, HttpServer};
use serde::{Deserialize, Serialize};
use serde_json::json;
use sqlite::{Connection, State};
use thiserror::Error;

#[derive(Error, Debug)]
enum CustomError {
    #[error("sqlite errors {0:?}")]
    Sqlite(sqlite::Error),
    #[error("reqwest errors")]
    Reqwest(reqwest::Error),
    #[error("json errors")]
    Json(serde_json::Error),
}

impl From<sqlite::Error> for CustomError {
    fn from(err: sqlite::Error) -> Self {
        CustomError::Sqlite(err)
    }
}

impl From<serde_json::Error> for CustomError {
    fn from(err: serde_json::Error) -> Self {
        CustomError::Json(err)
    }
}

impl From<reqwest::Error> for CustomError {
    fn from(err: reqwest::Error) -> Self {
        CustomError::Reqwest(err)
    }
}

type Result<T> = std::result::Result<T, CustomError>;

#[derive(Deserialize, Serialize)]
struct User {
    id: u32,
    name: String,
    email: String,
    phone: String,
    website: String,
}

struct Cache;

impl Cache {

    fn init() -> Result<Connection> {
        let connection = sqlite::open("db.sqlite")?;
        connection.execute("CREATE TABLE IF NOT EXISTS users (id INTEGER , value TEXT )")?;

        Ok(connection)
    }

    fn set(user: &User) -> Result<()> {
        let connection = Self::init()?;
        let mut statement = connection.prepare("INSERT INTO users VALUES(?, ?)")?;
        let user_string = json!(user).to_string();
        statement.bind(1, user.id as i64)?;
        statement.bind(2, user_string.as_str())?;

        statement.next()?;

        Ok(())
    }

    async fn hit(key: u32) -> Result<Option<User>> {
        let connection = Self::init()?;

        let mut statement = connection.prepare("SELECT * FROM users WHERE id = ?")?;
        statement.bind(1, key as i64)?;

        match statement.next().unwrap() {
            State::Row => {
                let user_string = statement.read::<String>(1)?;
                let user = serde_json::from_str::<User>(&user_string)?;

                Ok(Some(user))
            }
            State::Done => {
                let user = fetch_user(key).await?;
                Self::set(&user)?;

                Ok(Some(user))
            }
        }
    }
}

#[tracing::instrument]
async fn fetch_user(id: u32) -> Result<User> {
    let url = format!("https://jsonplaceholder.typicode.com/users/{}", id);

    let response = reqwest::get(url).await?;
    let response_string = response.text().await?;
    let user = serde_json::from_str(response_string.as_str())?;

    Ok(user)
}

#[get("/user/{id}")]
async fn get_user(id: web::Path<u32>) -> HttpResponse {
    let id = id.into_inner();

    if id > 10 {
        return HttpResponse::BadRequest().json(json!({
            "message" : "User id can't exceed 10"
        }));
    }

    let user = Cache::hit(id).await;

    match user {
        Ok(user) => HttpResponse::Ok().json(user),
        Err(err) => HttpResponse::NotFound().json(json!({
           "code": 404,
            "message": err.to_string(),
        })),
    }
}

#[actix_web::main] // or #[tokio::main]
#[tracing::instrument]
async fn main() -> std::io::Result<()> {

    HttpServer::new(|| App::new().service(get_user))
        .bind(("127.0.0.1", 8080))?
        .run()
        .await
}

Ainsi que les dépendances nécessaire

[dependencies]
actix-web = "4.1.0"
reqwest = { version = "0.11.11", features = ["json"] }
serde = { version = "1.0.140", features = ["derive"] }
serde_json = "1.0.82"
mime = "0.3.16"
sqlite = "0.26.0"
thiserror = "1.0.31"

Bien! Si vous exécuter ce code vous devriez voir apparaître en vous rendant sur localhost:8080 un json avec le user id 2.

Puis en réexécutant cette même requête une réponse sensiblement plus rapide.

Mais plus rapide de combien au juste ? 🧐

La télémétrie va nous venir en aide.

Nous allons outiller nos fonctions !

D’abord le cache

impl Cache {

    #[tracing::instrument]
    fn init() -> Result<Connection> {
        // ...
    }

    #[tracing::instrument(skip(user), fields(id = user.id))]
    fn set(user: &User) -> Result<()> {
        // ...
    }

    #[tracing::instrument]
    async fn hit(key: u32) -> Result<Option<User>> {
        // ...
    }
}

Puis les autres méthodes

#[tracing::instrument]
async fn fetch_user(id: u32) -> reqwest::Result<User> {
    // ...
}


#[get("/user/{id}")]
#[tracing::instrument]
async fn get_user(id: web::Path<u32>) -> HttpResponse {
    // ...
}

Et on finit par initialiser le tracing comme il se doit.

#[actix_web::main] // or #[tokio::main]
#[tracing::instrument]
async fn main() -> std::io::Result<()> {
    // Create a Jaeger Tracer
    let tracer_jaeger = opentelemetry_jaeger::new_pipeline()
        .with_service_name("cache user")
        .install_simple()
        .unwrap();

    // Create a tracing layer with the configured tracer
    let opentelemetry_layer = tracing_opentelemetry::layer()
        .with_tracer(tracer_jaeger)
        .with_threads(true);

    // Register Layers
    tracing_subscriber::registry()
        .with(EnvFilter::from_default_env())
        .with(opentelemetry_layer)
        .init();
    
    // ...
}

Et de rajouter les dépendances nécessaires.

[dependencies]
opentelemetry = { version = "0.17.0", features = ["trace", "metrics"] }
opentelemetry-jaeger = { version = "0.16.0" }
tracing = "0.1.35"
tracing-opentelemetry = "0.17.4"
tracing-subscriber = { version = "0.3.14", features = ["env-filter"] }

Si l’on reproduit cette expérience

Si vous éxécuter ce code vous devriez en vous rendant sur [localhost:8080](https://localhost:8080/user/2) voir apparaître 
un json avec le user id 2.

Puis en réexécutant cette même requête une réponse sensiblement plus rapide.

En ayant pris soin de supprimer la base de données sqlite.

Vous devriez avoir les spans suivantes:

spans de l'exemple

On observe un facteur x100 entre la requête du dessous et celle du dessus.

Bien évidemment, celle du dessus est la requête avec cache et celle du dessous sans cache.

Voyons ces spans en détails.

Tout d’abord sans le cache.

span sans cache

On voit bien l’ordonnencement des actions

  flowchart LR
get_user-->Cache::hit-->fetch-->Cache::set

On peut d’ailleurs détailler l’exécution grâce aux tags de nos spans.

détail de la span sans cache

On voit bien l’ID 2 être baladé dans les différents appels de fonctions.

Et maintenant si on réitère l’opération avec le cache déjà rempli.

span avec cache

On a beacoup moins de spans dont une absence de fetch_user, donc pas d’appel réseau donc pas 100 ms dans la vue en allant rechercher l’info sur le web !!

Notre appel se résume à un simple:

  flowchart LR
get_user-->Cache::hit

Pour ceux qui en ont besoin je vous met le code ici.

Conclusion

J’espère que cette très légère introduction à la télémétrie vous aura plu.

En écrivant cet article, je n’ai pu que constater le gouffre insondable de mon ignorance et le chemin qui me reste à parcourir pour commencer à maîtriser le sujet.

Voyez cet article comme une sorte d’état de l’art de mes connaissances à l’instant présent.

Je vous remercie de votre lecture et vous donne rendez-vous pour autre chose dans un autre article. ❤️

avatar

Auteur: Akanoa

Je découvre, j'apprends, je comprends et j'explique ce que j'ai compris dans ce blog.

Ce travail est sous licence CC BY-NC-SA 4.0.