https://lafor.ge/feed.xml

Partie 8 : Contraindre les données via un schéma

2024-12-16
Les articles de la série

Bonjour à toutes et tous 😃

Le système de tuple que l'on a bâti en partie 7 semble marcher, mais on a déjà relevé des problèmes sur le retour des données qui n'est pas constant.

db > CREATE TABLE Users (id INTEGER, name TEXT(50), email TEXT(128));
db > INSERT INTO Users(id, name, email) VALUES (1, 'John Doe', 'john.doe@example.com');
db > INSERT INTO Users(id, name, email) VALUES (2, 'Jane Doe', 'jane.doe@example.com');
db > SELECT * FROM Users;
[Text("John Doe"), Text("john.doe@example.com"), Integer(1)]
[Text("jane.doe@example.com"), Integer(2), Text("Jane Doe")]

Mais il y a bien pire que des champs mal ordonnés. On peut carrément rajouter de nouveau champs non-attendus.

db > INSERT INTO Users(id, name, email, phone) VALUES (3, 'Jack Doe', 'jane.doe@example.com', '+33455325.52'); 
db > SELECT * FROM Users;                                                                                      
[Integer(3), Text("jane.doe@example.com"), Text("Jack Doe"), Text("+33455325.52")]

Ou mal typés !

db > INSERT INTO Users(id, name, email) VALUES ('1', 'John Doe', 'john.doe@example.com'); 
db > SELECT * FROM Users;                                                                 
[Text("John Doe"), Text("john.doe@example.com"), Text("1")]

Bref, c'est une catastrophe !! 😱

Tout ça parce que le schéma de la commande de création de la table est juste la pour la déco. On n'en fait rien du tout!

db > CREATE TABLE Users (id INTEGER, name TEXT(50), email TEXT(128));
                        |                                          |
                        --------------------------------------------
                                            schéma

Pour remédier à tout cela, nous allons devoir contraindre nos insertions. 😎

Contraindre les tuples

Nos tuples nous ont libéré de tout usage d'entité fixée, mais la contrepartie, c'est que nous avons perdu beaucoup de garanties sur nos données.

Garanties que nous allons graduellement retrouver.

Ordre des champs

Notre structure de schéma actuelle a un problème, elle ne conserve pas l'ordre d'insertion car c'est une HashMap.

struct Schema {
    pub fields: HashMap<String, ColumnType>,
}

Et lors de l'insertion des données qui sont aussi une HashMap<String, Value> , on se content de collecter les valeurs de cette map sous la forme d'un tuple.

let values = data.into_values().collect::<Vec<Value>>();
table.insert(values)?;

Le problème c'est que l'ordre est défini par des règles que l'on ne maîtrise pas totalement.

Ce qui provoque les atrocités vu en introduction.

db > CREATE TABLE Users (id INTEGER, name TEXT(50), email TEXT(128));
db > INSERT INTO Users(id, name, email) VALUES (1, 'John Doe', 'john.doe@example.com');
db > INSERT INTO Users(id, name, email) VALUES (2, 'Jane Doe', 'jane.doe@example.com');
db > SELECT * FROM Users;
[Text("John Doe"), Text("john.doe@example.com"), Integer(1)]
[Text("jane.doe@example.com"), Integer(2), Text("Jane Doe")]

La solution est alors de venir adjoindre une liste qui elle conserve l'ordre quelque soit la valeur de nos données.

struct Schema {
    pub fields: HashMap<String, ColumnType>,
    pub columns: Vec<String>,
}

impl Schema {
    pub fn new(definition: Vec<(String, ColumnType)>) -> Self {
        Schema {
            columns: definition.iter().map(|(k, _)| k.clone()).collect(),
            fields: HashMap::from_iter(definition),
        }
    }
}

Ainsi on peut alors faire quelque chose dans ce genre :

impl Database {
    fn insert(
        &mut self,
        table: String,
        mut data: HashMap<String, Value>,
    ) -> Result<(), InsertionError> {
        match self.tables.get_mut(&table) {
            Some(table) => {
                let mut fields = vec![];
                for column in table.schema.columns.iter() {
                    let value = data
                        .remove(column)
                        .ok_or(InsertionError::MissingColumn(column.to_string()))?;
                    fields.push(value);
                }
                table.insert(fields)?;
            }
            None => {
                Err(InsertionError::TableNotExist(table))?;
            }
        }

        Ok(())
    }
}

Comme le tuples est maintenant contraint en ordre de sérialisation, si on désérialise, on obtiendra

db > CREATE TABLE Users (id INTEGER, name TEXT(50), email TEXT(128));
db > INSERT INTO Users(id, name, email) VALUES (1, 'John Doe', 'john.doe@example.com');
db > INSERT INTO Users(id, name, email) VALUES (2, 'Jane Doe', 'jane.doe@example.com');
db > SELECT * FROM Users;
[Integer(1), Text("John Doe"), Text("john.doe@example.com")]
[Integer(2), Text("Jane Doe"), Text("jane.doe@example.com")]

Car la désérialisation conserve l'ordre. Les champs ne sont plus sans-dessus-dessous, nous avons réglé notre premier problème. 😎

Respecter le nombres de champs

Nous allons maintenant nous attaquer au second problème qui est de pouvoir rajouter des champs supplémentaire ou inconnu.

db > INSERT INTO Users(id, name, email, phone) VALUES (3, 'Jack Doe', 'jane.doe@example.com', '+33455325.52'); 
db > SELECT * FROM Users;                                                                                      
[Integer(3), Text("jane.doe@example.com"), Text("Jack Doe"), Text("+33455325.52")]

Pour cela nous allons donner à notre schéma le pouvoir judiciaire de valider ou non les tuples de données.

Nous allons matérialiser le verdict sous la forme d'une nouvelle erreur CheckColumnDefinitionError.

enum CheckColumnDefinitionError {
    /// Le nombre de champs du tuple diffère de celle du schéma
    MismatchedColumnsSize {
        expected: usize,
        got: usize,
    },
    /// La colonne n'existe pas
    UnknownColumn(String),
    /// La chaîne de caractère est trop grande par rapport
    ExceededStringMaxSize {
        max_size: usize,
        got: usize,
        column_name: String,
    },
    /// Le type du champ n'est pas compatible avec le schéma
    WrongType {
        expected: ColumnType,
        got: ColumnType,
        column_name: String,
    },
}

On peut alors définir la méthodes check_values.

impl Schema {
    fn check_values(
        &self,
        values: &HashMap<String, Value>,
    ) -> Result<(), CheckColumnDefinitionError> {
        if values.len() != self.columns.len() {
            // une incohérences dans le nombre de champs a été détectée
            return Err(CheckColumnDefinitionError::MismatchedColumnsSize {
                expected: self.columns.len(),
                got: values.len(),
            });
        }

        for (key, value) in values.iter() {
            if !self.fields.contains_key(key) {
                // la clef n'existe pas dans le schéma.
                return Err(CheckColumnDefinitionError::UnknownColumn(key.to_string()));
            }
        }

        Ok(())
    }
}

Cela résout nos deux problèmes d'un coup

#[test]
fn test_schema_invalid_missing_column() {
    let schema = Schema::new(vec![
        ("id".to_string(), ColumnType::Integer),
        ("name".to_string(), ColumnType::Text(10)),
    ]);
    let values = vec![("id".to_string(), Value::Integer(1))];
    assert_eq!(
        schema.check_values(&HashMap::from_iter(values)),
        Err(CheckColumnDefinitionError::MismatchedColumnsSize {
            expected: 2,
            got: 1
        })
    );
}

#[test]
fn test_schema_invalid_unknown_column() {
    let schema = Schema::new(vec![
        ("id".to_string(), ColumnType::Integer),
        ("name".to_string(), ColumnType::Text(10)),
    ]);
    let values = vec![
        ("id".to_string(), Value::Integer(1)),
        ("unknown".to_string(), Value::Text("test".to_string())),
    ];
    assert_eq!(
        schema.check_values(&HashMap::from_iter(values)),
        Err(CheckColumnDefinitionError::UnknownColumn(
            "unknown".to_string()
        ))
    );
}

Conformité de type de données

Notre troisième problème est la non cohérence des types de données.

db > INSERT INTO Users(id, name, email) VALUES ('1', 'John Doe', 'john.doe@example.com'); 
db > SELECT * FROM Users;                                                                 
[Text("1"), Text("John Doe"), Text("john.doe@example.com")]

On va en profiter pour contraindre la taille des chaîne de caractères insérables.

Dans la même méthode on rajouter une collection vérification.

impl Schema {
    fn check_values(
        &self,
        values: &HashMap<String, Value>,
    ) -> Result<(), CheckColumnDefinitionError> {
        if values.len() != self.columns.len() {
            return Err(CheckColumnDefinitionError::MismatchedColumnsSize {
                expected: self.columns.len(),
                got: values.len(),
            });
        }

        for (key, value) in values.iter() {
            if !self.fields.contains_key(key) {
                // la clef n'existe pas dans le schéma.
                return Err(CheckColumnDefinitionError::UnknownColumn(key.to_string()));
            }
            let column_type = self.fields.get(key).unwrap();
            match (value, column_type) {
                // la valeur et la définition de la colonne coincide avec un entier
                (Value::Integer(_), ColumnType::Integer) => (),
                // la valeur et la définition de la colonne coincide bien avec une string mais
                // la taille de la valeur est trop importante
                (Value::Text(data), ColumnType::Text(max_size)) if data.len() > *max_size => {
                    return Err(CheckColumnDefinitionError::ExceededStringMaxSize {
                        max_size: *max_size,
                        got: data.len(),
                        column_name: key.to_string(),
                    })
                }
                // la valeur et la définition de la colonne coincide bien avec une string
                // et n'est pas supérieur à la taille maximale
                (Value::Text(_), ColumnType::Text(_)) => (),
                // un entier était attendu, mais une string est trouvée
                (Value::Text(data), ColumnType::Integer) => {
                    return Err(CheckColumnDefinitionError::WrongType {
                        expected: ColumnType::Integer,
                        got: ColumnType::Text(data.len()),
                        column_name: key.to_string(),
                    })
                }
                // une string était attendu, mais un entier est trouvé
                (Value::Integer(_), ColumnType::Text(max_size)) => {
                    return Err(CheckColumnDefinitionError::WrongType {
                        expected: ColumnType::Text(*max_size),
                        got: ColumnType::Integer,
                        column_name: key.to_string(),
                    })
                }
            }
        }

        Ok(())
    }
}

Notre schéma peut désormais invalider les tuples incorrects.

#[test]
fn test_schema_invalid_wrong_type() {
    let schema = Schema::new(vec![
        ("id".to_string(), ColumnType::Integer),
        ("name".to_string(), ColumnType::Text(10)),
    ]);
    let values = vec![
        ("id".to_string(), Value::Text("test".to_string())),
        ("name".to_string(), Value::Text("name".to_string())),
    ];
    assert_eq!(
        schema.check_values(&HashMap::from_iter(values)),
        Err(CheckColumnDefinitionError::WrongType {
            expected: ColumnType::Integer,
            got: ColumnType::Text("test".to_string().len()),
            column_name: "id".to_string()
        })
    );
}

#[test]
fn test_schema_invalid_too_long() {
    let schema = Schema::new(vec![
        ("id".to_string(), ColumnType::Integer),
        ("name".to_string(), ColumnType::Text(10)),
    ]);
    let values = vec![
        ("id".to_string(), Value::Integer(42)),
        (
            "name".to_string(),
            Value::Text("too long for your own good".to_string()),
        ),
    ];
    assert_eq!(
        schema.check_values(&HashMap::from_iter(values)),
        Err(CheckColumnDefinitionError::ExceededStringMaxSize {
            max_size: 10,
            got: 26,
            column_name: "name".to_string()
        })
    );
}

Avant insertion, il alors possible de demander au schéma d'interdire les données incorrectes.

// Vérification des champs
table.schema.check_values(&data).map_err(|err| {
    InsertionError::Serialization(SerializationError::ColumnDefinition(err))
})?;
let mut fields = vec![];
// Réorganisation du tuples
for column in table.schema.columns.iter() {
    let value = data
        .remove(column)
        .ok_or(InsertionError::MissingColumn(column.to_string()))?;
    fields.push(value);
}
// insertion
table.insert(fields)?;

On se rapproche du bout! 😍

Il nous reste la dernière contrainte de taille fixe de tuple. Et nous allons le voir tout de suite.

Sérialialisation a taille fixe

Nouvelle sérialisation

Dans la partie 7, je vous avais avertis que la sérialisation actuelle était temporaire.

Pour un tuple [Integer(42), Text("test")] cela donne.

0x02         0x00      0x00 0x00 0x00 0x00 0x00 0x00 0x00 0xd6   0x01       0x04       0x74 0x65 0x73 0x74
^            ^         ^                                         ^          ^          ^
taille       D=Int     42 sur 8 bytes encodé en litte-endian     D=Text     taille     "test" encodé en UTF-8
du Vec                                                                      String

Ce modèle de sérialisation est appelé "auto-porteur", car sans avoir de schéma on peut désérialiser la donnée en utilisant les différents maqueurs disséminé

  • taille du tuple
  • discriminant de la variante
  • taille de la chaîne de caractères

Ce qui est cool parce que l'on peut faire ce que l'on désire de données et stocker virtuellement n'importe quoi, pensez Mongo DB.

Mais pas glop dans une base de données relationnelle comme sqlite qui se base massivement sur des schémas pour opérer efficacement, se retrouver avec des données disparatres est un vrai problème.

Problème qui est comblé par l'utilisation d'un associé à une table.

Par exemple id INTEGER, name TEXT(50), un entier suivi d'une chaîne de maximum 50 bytes.

Nous obtenons alors deux informations cruciales de ce schéma:

  • il y a deux champs
  • le premier est un entier, le second une chaîne de caractères

Une règle arbitraire que nous allons nous fixer est que toutes les entrées font toutes la même taille. Cette règle va être très utile pour la suite de nos développements.

Cela veut dire que le mot "test" ou "anticonstitutionnellement" respectivement de 4 et 26 caractères vont être effectivement encodé dans 50 bytes.

Pour cela, il y a deux méthodes:

  • soit l'on réserve un bytes pour la taille comme précédemment
  • soit on utilise le bon vieux caractère NULL 0x00 comme caractère de fin de string

Cette série d'article n'étant qu'un gros prétexte pour essayer des choses, je opter pour la deuxième solution. 😇

Finalement, notre tuple encodé depuis le schéma nous donne:

                                                               NULL signifiant la fin de la chaîne
                                                               ˇ                                              
0x00 0x00 0x00 0x00 0x00 0x00 0x00 0xd6    0x74 0x65 0x73 0x74 0x00 0x00 0x00 ... 0x00 0x00 0x00
^                                          ^                                                   ^
42 sur 8 bytes encodé en litte-endian      "test" encodé en UTF-8                     50ème byte                 

Chaque entrée prend alors 58 bytes !! Quelque soit le contenu réel.

On verra dans le futur comment optimiser cette taille gargantuesque de tuple encodé.

Modification des traits serde

Nous allons opérer une petite modification du trait Serializable en lui permettant de renvoyer la taille encodée.

pub trait Serializable {
    fn serialize(&self, cursor: &mut Cursor<&mut [u8]>) -> Result<usize, SerializationError>;
}

Cela va être cruciale par la suite.

Modification du serde des primitives

Les entiers ne bougent pas car ils sont déjà optimalement stockés. Par contre on renvoie tout de même la taille encodée.

impl Serializable for i64 {
    fn serialize(
        &self,
        cursor: &mut std::io::Cursor<&mut [u8]>,
    ) -> Result<usize, SerializationError> {
        cursor
            .write(self.to_le_bytes().as_ref())
            .map_err(|e| SerializationError::Buffer(BufferError::BufferFull(e.to_string())))?;
        Ok(size_of::<i64>())
    }
}

Tout nos entiers sont des i64 donc sur 8 bytes, on pourra dans l'avenir définir des types plus restreints comme du u8 par exemple.

Notre String va subir une petite modification à la fois en sérialisation et en désérialisation.

Lors de la sérialisation, nous n'encodons plus la taille de la string avant la data. A la place, on encode la string puis on lui rajoute le caractère NULL.

impl Serializable for String {
    fn serialize(
        &self,
        cursor: &mut std::io::Cursor<&mut [u8]>,
    ) -> Result<usize, SerializationError> {
        // encode la string
        cursor
            .write(self.as_bytes())
            .map_err(|e| SerializationError::Buffer(BufferError::BufferFull(e.to_string())))?;
        // encode le caractère NULL de fin de chaîne
        cursor
            .write(b"\0")
            .map_err(|e| SerializationError::Buffer(BufferError::BufferFull(e.to_string())))?;
        // on n'oublie pas de rajouter 1 à la taille encodée sinon le NULL sera mangé
        Ok(self.as_bytes().len() + 1)
    }
}

Pour la désérialisation, nous pourrions nous contenter d'utiliser le read_until. Mais je ne suis pas fan de la double allocation de buffer que cela nous contraint. La première pour lire les données, la seconde pour l'encodage UTF-8.

A la place nous allons être un peu malin et lire la données sans la copier, trouver ce qui nous intéresse et puis encoder la slice, ainsi pas de copie des bytes dans deux buffer.

impl Deserializable for String {
    type Output = String;

    fn deserialize(cursor: &mut std::io::Cursor<&[u8]>) -> Result<Self, DeserializationError> {
        // on lit les données jusqu'au caractère NULL
        let data = &cursor.get_ref()[cursor.position() as usize..];

        let mut size = 0;
        // on boucle sur les caractères jusqu'à trouver le caractère NULL
        for x in &cursor.get_ref()[cursor.position() as usize..] {
            if *x == b'\0' {
                break;
            }
            size += 1;
        }

        // on déplace le curseur pour lui faire consommer les bytes
        // attention a ne pas oublier le caractère NULL
        cursor.set_position(cursor.position() + size + 1);

        // on encode en UTF-8 la slice
        String::from_utf8(data[..size as usize].to_vec())
            .map_err(DeserializationError::UnableToDeserializeString)
    }
}

Sécurité

Ce mode lecture doit-être utilisé seulement sur des données que vous connaissez, ici c'est un modèle jouet, mais dans la réalité une erreur d'encodage qui n'écrit jamais le NULL et vous êtes soumis à une erreur de type buffer overflow car les bytes renvoyés seront au-delà de la données attendue !

Les bytes suivants peuvent avoir accueilli n'importe quoi, il peut rester des traces de données précédentes !

Notre nouvelle sérialisation est prête !

#[test]
fn test_serialization_i64() {
    let mut buf = [0_u8; 1024];
    let mut writer = Cursor::new(&mut buf[..]);
    42_i64.serialize(&mut writer).expect("serialization error");
    let mut reader = Cursor::new(&buf[..]);
    assert_eq!(
        i64::deserialize(&mut reader).expect("deserialization error"),
        42_i64
    );
}

#[test]
fn test_serialization_string() {
    let mut buf = [0_u8; 1024];
    let mut writer = Cursor::new(&mut buf[..]);
    "toto"
        .to_string()
        .serialize(&mut writer)
        .expect("serialization error");
    let mut reader = Cursor::new(&buf[..]);
    assert_eq!(
        String::deserialize(&mut reader).expect("deserialization error"),
        "toto".to_string()
    );
}
#[test]
fn test_serialization_multiple() {
    let mut buf = [0_u8; 1024];
    let mut writer = Cursor::new(&mut buf[..]);
    42_i64.serialize(&mut writer).expect("serialization error");
    let _size = "toto"
        .to_string()
        .serialize(&mut writer)
        .expect("serialization error");
    let _size = "long string to test the buffer size"
        .to_string()
        .serialize(&mut writer)
        .expect("serialization error");
    let mut reader = Cursor::new(&buf[..]);
    assert_eq!(
        i64::deserialize(&mut reader).expect("deserialization error"),
        42_i64
    );
    assert_eq!(
        String::deserialize(&mut reader).expect("deserialization error"),
        "toto".to_string()
    );
    assert_eq!(
        String::deserialize(&mut reader).expect("long string to test the buffer size"),
        "long string to test the buffer size".to_string()
    );
}

Vous noterez que dans ce mode de sérialisation, les données sont à la queue leu-leu et il n'y a pas de constances dans la longeur des champs string.

Nous allons utilisé notre schéma comme guide sérialisation.

Reposer le formatage du tuple sur le schéma

Notre schéma est constitué d'une HashMap<String, ColumnType>. C'est par le biais de ColumnType qui je le rappele est une énumération.

enum ColumnType {
    Integer,
    Text(usize),
}

Que l'on va pouvoir opérer à notre mise en place d'une sérialisation a taille fixe.

Mais avant il va falloir simplifier la sérialisation de Value.

enum Value {
    Integer(i64),
    Text(String),
}

impl Serializable for Value {
    fn serialize(&self, cursor: &mut Cursor<&mut [u8]>) -> Result<usize, SerializationError> {
        let size = match self {
            Value::Integer(data) => data.serialize(cursor)?,
            Value::Text(data) => data.serialize(cursor)?,
        };

        Ok(size)
    }
}

Auparavant, nous encodions le un discriminant de type de variante.

Mais comme désormais le schéma gère le type et l'ordonnencement des champs, il devient inutile d'encoder ce discriminant.

On retire également l'implémentation de Deserializable car il n'est plus possible de désérialiser sans schéma l'énumération. Et je ne veux pas modifier sa signature.

Du coup je vais plutôt passer par le ColumnType pour réaliser la sérialisation.

impl ColumnType {
    pub fn serialize(
        &self,
        cursor: &mut Cursor<&mut [u8]>,
        value: &Value,
    ) -> Result<(), SerializationError> {
        // sérialisation de la valeur via la sérialisation de Value
        let serialized_size = value.serialize(cursor)?;

        match self {
            // sérialiser un entier ne nécessite pas plus d'actions
            ColumnType::Integer => {}
            // sérialiser une string nécessite de normaliser la taille des données encodées
            ColumnType::Text(max_size) => {
                // le delta permet de normaliser en faisant la différence de MAX - WRITTEN
                let delta = cursor.position() as usize + *max_size - serialized_size;
                // on déplace le curseur pour atteindre la taille du champ texte du schéma
                cursor.set_position(delta as u64);
            }
        }
        Ok(())
    }
}

Maintenant que la sérialisation n'encode que les données et plus le type de celles-ci. Le schéma devient crucial pour relire nos données.

impl ColumnType {
    fn deserialize(&self, cursor: &mut Cursor<&[u8]>) -> Result<Value, DeserializationError> {
        match self {
            // la désérialisation est directe pour un entier
            ColumnType::Integer => Ok(Value::Integer(i64::deserialize(cursor)?)),
            // tout comme pour la sérialisation, il faut normaliser
            ColumnType::Text(max_size) => {
                // on désérialise vers la String
                let data = String::deserialize(cursor)?;
                // attention à ne pas compter deux fois le NULL déjà consommé
                let delta = cursor.position() as usize + *max_size - data.len() - 1;
                cursor.set_position(delta as u64);
                Ok(Value::Text(data))
            }
        }
    }
}

Sérialiser au travers du schéma

On peut alors déplacer la logique de vérification et de sérialisation des données sous la responsabilité du Schema.

impl Schema {
    pub fn serialize(
        &self,
        cursor: &mut Cursor<&mut [u8]>,
        values: &HashMap<String, Value>,
    ) -> Result<(), SerializationError> {
        // vérification de la validité des données
        self.check_values(values)
            .map_err(SerializationError::ColumnDefinition)?;
        // sérialisation ordonnées
        for schema_field in self.columns.iter() {
            // récupération de la valeur
            let value = values
                .get(schema_field)
                .ok_or(SerializationError::MissingColumn(schema_field.to_string()))?;
            // récupération du ColumnType
            let definition = self
                .fields
                .get(schema_field)
                .ok_or(SerializationError::MissingColumn(schema_field.to_string()))?;
            // sérialisation de la valeur
            definition.serialize(cursor, value)?
        }
        Ok(())
    }
}

On applique de même pour la désérialisation.

impl Schema {
    pub fn deserialize(
        &self,
        cursor: &mut Cursor<&[u8]>,
    ) -> Result<Vec<Value>, DeserializationError> {
        let mut values = Vec::with_capacity(self.columns.len());
        // Le schéma connaît l'ordonnancement des champs
        for schema_field in self.columns.iter() {
            // récupération du ColumnType associé
            let definition =
                self.fields
                    .get(schema_field)
                    .ok_or(DeserializationError::MissingColumn(
                        schema_field.to_string(),
                    ))?;
            // désérialisation dans la bonne variante de Value
            let value = definition.deserialize(cursor)?;
            // accumulation dans le tuple
            values.push(value);
        }
        Ok(values)
    }
}

Déplacement de la logique d'insertion au niveau de la table

Le schéma étant présent au niveau de la Table, il est plus naturelle d'y placer la logique.

La Table qui précédemment permettait n'importe quelle données Serializable pouvait être insérées.

Désormais, nous n'accepterons plus que des HashMap<String, Value> seuls types consommable par la sérialisation par schéma.

impl Table {
    // modification du type de row  vers une HashMap
    pub fn insert(&mut self, row: &HashMap<String, Value>) -> Result<(), InsertionError> {
        let mut writer = Cursor::new(&mut self.inner[self.offset..]);
        // appel à la sérialisation au travers du schéma
        // la vérification est également faite à ce moment
        self.schema
            .serialize(&mut writer, row)
            .map_err(InsertionError::Serialization)?;
        self.offset += writer.position() as usize;
        self.row_number += 1;
        Ok(())
    }
}

On peut faire de même pour la sélection.

impl Table {
    pub fn select(&self) -> Result<Vec<Vec<Value>>, SelectError> {
        let mut reader = Cursor::new(&self.inner[..]);
        let mut rows = Vec::with_capacity(self.row_number);
        for _row_number in 0..self.row_number {
            rows.push(
                // désérialisation par le schéma
                self.schema
                    .deserialize(&mut reader)
                    .map_err(SelectError::Deserialization)?,
            )
        }
        Ok(rows)
    }
}

Modification de la Database

On peut alors enlever la boucle qui ordonnençait les champs lors de l'insertion.

impl Database {
    pub fn insert(
        &mut self,
        table: String,
        data: HashMap<String, Value>,
    ) -> Result<(), InsertionError> {
        match self.tables.get_mut(&table) {
            Some(table) => {
                // on passe directement les champs
                table.insert(&data)?;
            }
            None => {
                Err(InsertionError::TableNotExist(table))?;
            }
        }

        Ok(())
    }

    pub fn select(&mut self, table_name: String) -> Result<Vec<Vec<Value>>, SelectError> {
        match self.tables.get(&table_name) {
            Some(table) => table.select(),
            None => Err(SelectError::TableNotExist(table_name))?,
        }
    }
}

Désormais nous n'avons plus besoin de réorganiser les champs dans les tests

#[test]
fn test_database() {
    let mut database = Database::new();
    let schema = Schema::new(vec![
        ("id".to_string(), crate::data::ColumnType::Integer),
        ("name".to_string(), crate::data::ColumnType::Text(50)),
        ("email".to_string(), crate::data::ColumnType::Text(60)),
    ]);
    database
        .create_table("Users".to_string(), schema)
        .expect("create table failed");
    for i in 0..50 {
        let user = [
            ("id".to_string(), Value::Integer(i as i64)),
            ("name".to_string(), Value::Text(format!("test_{i}"))),
            (
                "email".to_string(),
                Value::Text(format!("email_{i}@example.com")),
            ),
        ];
        database
            .insert("Users".to_string(), HashMap::from(user))
            .expect("insert user failed");
    }
    let rows = database.select("Users".to_string()).expect("select failed");
    assert_eq!(rows.len(), 50);
    for (i, row) in rows.iter().enumerate() {
        let expected = vec![
            Value::Integer(i as i64),
            Value::Text(format!("test_{i}")),
            Value::Text(format!("email_{i}@example.com")),
        ];
        let row = row.to_vec();

        assert_eq!(row, expected);
    }
}

Testons tout ça!

D'abord on vérifie la cohérence d'ordre des champs par rapport au schéma

db > CREATE TABLE Users (id INTEGER, name TEXT(20), email TEXT(50));
db > INSERT INTO Users (id, name, email) VALUES (42, 'john.doe', 'john.doe@example.com');
db > INSERT INTO Users (id, name, email) VALUES (666, 'jane.doe', 'jane.doe@example.com');
db > SELECT * FROM Users;
[Integer(42), Text("john.doe"), Text("john.doe@example.com")]
[Integer(666), Text("jane.doe"), Text("jane.doe@example.com")]

Check ✅

On peut aussi vérifier la contrainte sur les tailles de chaînes de caractères.

db > INSERT INTO Users (id, name, email) VALUES (1, 'Un nom beacoup trop long enfin plus que 20 caracteres', 'email@example.com');
Insertion(Serialization(ColumnDefinition(ExceededMaxSize { max_size: 20, got: 53, column_name: "name" })))

Check ✅

L'incohérence de type de colonne par rapport au schéma.

db > INSERT INTO Users (id, name, email) VALUES ('666', 'jane.doe', 'jane.doe@example.com');
Insertion(Serialization(ColumnDefinition(WrongType { expected: Integer, got: Text(3), column_name: "id" })))

Check ✅

Des colonnes inconnues.

db > INSERT INTO Users (id, unknown, email) VALUES (666, 'jane.doe', 'jane.doe@example.com');
Insertion(Serialization(ColumnDefinition(UnknownColumn("unknown"))))

Check ✅

Check ✅, check ✅, check ✅ !!

Tout est fonctionnel !! 😍😍😍

Je ne l'ai fait que pour une table, mais bien évidemment chaque table possède son schéma de contraintes respectif.

Conclusion

Phieeeew!!!!

On a bien bossé là ! Nous avons maintenant des données contraintes et normalisées.

Cette normalisation va nous permettre d'attaquer la partie suivante qui parlera des curseurs et des pages.

Une notion qui va être centrale dans notre système de stockage.

Merci de votre lecture ❤️

Vous pouvez trouver le code la partie ici et le diff là.

avatar

Auteur: Akanoa

Je découvre, j'apprends, je comprends et j'explique ce que j'ai compris dans ce blog.

Ce travail est sous licence CC BY-NC-SA 4.0.