| | import torch |
| | import torch.nn as nn |
| | import torch.nn.functional as F |
| | from transformers import ( |
| | AutoModel, AutoConfig, AutoTokenizer, |
| | T5ForConditionalGeneration, T5Config, |
| | AutoModelForSequenceClassification, |
| | PreTrainedModel, PretrainedConfig |
| | ) |
| | from transformers.modeling_utils import ( |
| | load_state_dict, |
| | WEIGHTS_NAME, |
| | SAFE_WEIGHTS_NAME, |
| | SAFE_WEIGHTS_INDEX_NAME, |
| | WEIGHTS_INDEX_NAME |
| | ) |
| | from transformers.utils import ( |
| | is_safetensors_available, |
| | is_torch_available, |
| | logging, |
| | EntryNotFoundError, |
| | PushToHubMixin |
| | ) |
| | import os |
| | import json |
| | import numpy as np |
| |
|
| | logger = logging.get_logger(__name__) |
| |
|
| | class BaseHateSpeechModel(nn.Module): |
| | """Base class cho tất cả các mô hình hate speech detection""" |
| | def __init__(self, model_name: str, num_labels: int = 3): |
| | super().__init__() |
| | self.num_labels = num_labels |
| | self.model_name = model_name |
| | |
| | def forward(self, input_ids, attention_mask, labels=None): |
| | raise NotImplementedError |
| | |
| | def load_state_dict(self, state_dict, strict=True): |
| | """ |
| | Override load_state_dict để bypass transformers' key renaming. |
| | Load trực tiếp state_dict vào model mà không qua key mapping. |
| | """ |
| | |
| | missing_keys, unexpected_keys = super().load_state_dict(state_dict, strict=False) |
| | if missing_keys and strict: |
| | logger.warning(f"Missing keys when loading state_dict: {missing_keys}") |
| | if unexpected_keys: |
| | logger.warning(f"Unexpected keys when loading state_dict: {unexpected_keys}") |
| | return missing_keys, unexpected_keys |
| | |
| | @classmethod |
| | def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs): |
| | """ |
| | Load model từ pretrained checkpoint. |
| | Transformers sẽ tự động load state_dict sau khi khởi tạo model. |
| | """ |
| | |
| | config = kwargs.pop("config", None) |
| | |
| | |
| | if config is None: |
| | try: |
| | config = AutoConfig.from_pretrained(pretrained_model_name_or_path) |
| | except Exception: |
| | config = {} |
| | |
| | |
| | num_labels = kwargs.pop("num_labels", None) |
| | if num_labels is None: |
| | if hasattr(config, "num_labels"): |
| | num_labels = config.num_labels |
| | elif isinstance(config, dict) and "num_labels" in config: |
| | num_labels = config["num_labels"] |
| | else: |
| | num_labels = 3 |
| | |
| | |
| | base_model_name = None |
| | if hasattr(config, "_name_or_path"): |
| | base_model_name = config._name_or_path |
| | elif isinstance(config, dict) and "_name_or_path" in config: |
| | base_model_name = config["_name_or_path"] |
| | |
| | |
| | if base_model_name: |
| | model = cls(model_name=base_model_name, num_labels=num_labels, **kwargs) |
| | else: |
| | |
| | model = cls(num_labels=num_labels, **kwargs) |
| | |
| | return model |
| |
|
| | class PhoBERTV2Model(BaseHateSpeechModel): |
| | """PhoBERT-V2 cho hate speech detection""" |
| | def __init__(self, model_name: str = "vinai/phobert-base-v2", num_labels: int = 3): |
| | super().__init__(model_name, num_labels) |
| | self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) |
| | self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) |
| | self.dropout = nn.Dropout(0.1) |
| | self.classifier = nn.Linear(self.config.hidden_size, num_labels) |
| | |
| | def forward(self, input_ids, attention_mask, labels=None): |
| | outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) |
| | pooled_output = outputs.pooler_output |
| | pooled_output = self.dropout(pooled_output) |
| | logits = self.classifier(pooled_output) |
| | |
| | loss = None |
| | if labels is not None: |
| | loss_fn = nn.CrossEntropyLoss() |
| | loss = loss_fn(logits, labels) |
| | return {"loss": loss, "logits": logits} |
| |
|
| | class BartPhoModel(BaseHateSpeechModel): |
| | """BART Pho cho hate speech detection""" |
| | def __init__(self, model_name: str = "vinai/bartpho-syllable-base", num_labels: int = 3): |
| | super().__init__(model_name, num_labels) |
| | self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) |
| | self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) |
| | self.dropout = nn.Dropout(0.1) |
| | self.classifier = nn.Linear(self.config.d_model, num_labels) |
| | |
| | def forward(self, input_ids, attention_mask, labels=None): |
| | outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) |
| | |
| | last_hidden_states = outputs.last_hidden_state |
| | pooled_output = last_hidden_states.mean(dim=1) |
| | pooled_output = self.dropout(pooled_output) |
| | logits = self.classifier(pooled_output) |
| | |
| | loss = None |
| | if labels is not None: |
| | loss_fn = nn.CrossEntropyLoss() |
| | loss = loss_fn(logits, labels) |
| | return {"loss": loss, "logits": logits} |
| |
|
| | class ViSoBERTModel(BaseHateSpeechModel): |
| | """ViSoBERT cho hate speech detection""" |
| | def __init__(self, model_name: str = "uitnlp/visobert", num_labels: int = 3): |
| | super().__init__(model_name, num_labels) |
| | self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) |
| | self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) |
| | self.dropout = nn.Dropout(0.1) |
| | self.classifier = nn.Linear(self.config.hidden_size, num_labels) |
| | |
| | def forward(self, input_ids, attention_mask, labels=None): |
| | outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) |
| | |
| | |
| | if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None: |
| | pooled_output = outputs.pooler_output |
| | else: |
| | |
| | pooled_output = outputs.last_hidden_state.mean(dim=1) |
| | |
| | pooled_output = self.dropout(pooled_output) |
| | logits = self.classifier(pooled_output) |
| | |
| | loss = None |
| | if labels is not None: |
| | loss_fn = nn.CrossEntropyLoss() |
| | loss = loss_fn(logits, labels) |
| | return {"loss": loss, "logits": logits} |
| |
|
| | class PhoBERTV1Model(BaseHateSpeechModel): |
| | """PhoBERT-V1 cho hate speech detection""" |
| | def __init__(self, model_name: str = "vinai/phobert-base", num_labels: int = 3): |
| | super().__init__(model_name, num_labels) |
| | self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) |
| | self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) |
| | self.dropout = nn.Dropout(0.1) |
| | self.classifier = nn.Linear(self.config.hidden_size, num_labels) |
| | |
| | def forward(self, input_ids, attention_mask, labels=None): |
| | outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) |
| | |
| | if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None: |
| | pooled_output = outputs.pooler_output |
| | else: |
| | pooled_output = outputs.last_hidden_state.mean(dim=1) |
| | pooled_output = self.dropout(pooled_output) |
| | logits = self.classifier(pooled_output) |
| | |
| | loss = None |
| | if labels is not None: |
| | loss_fn = nn.CrossEntropyLoss() |
| | loss = loss_fn(logits, labels) |
| | return {"loss": loss, "logits": logits} |
| |
|
| | class MBERTModel(BaseHateSpeechModel): |
| | """mBERT (bert-base-multilingual-cased) cho hate speech detection""" |
| | def __init__(self, model_name: str = "bert-base-multilingual-cased", num_labels: int = 3): |
| | super().__init__(model_name, num_labels) |
| | self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) |
| | self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) |
| | self.dropout = nn.Dropout(0.1) |
| | self.classifier = nn.Linear(self.config.hidden_size, num_labels) |
| | |
| | def forward(self, input_ids, attention_mask, labels=None): |
| | outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) |
| | if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None: |
| | pooled_output = outputs.pooler_output |
| | else: |
| | pooled_output = outputs.last_hidden_state.mean(dim=1) |
| | pooled_output = self.dropout(pooled_output) |
| | logits = self.classifier(pooled_output) |
| | |
| | loss = None |
| | if labels is not None: |
| | loss_fn = nn.CrossEntropyLoss() |
| | loss = loss_fn(logits, labels) |
| | return {"loss": loss, "logits": logits} |
| |
|
| | class SPhoBERTModel(BaseHateSpeechModel): |
| | """SPhoBERT (biến thể PhoBERT syllable-level) cho hate speech detection""" |
| | def __init__(self, model_name: str = "vinai/phobert-base", num_labels: int = 3): |
| | super().__init__(model_name, num_labels) |
| | self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) |
| | self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) |
| | self.dropout = nn.Dropout(0.1) |
| | self.classifier = nn.Linear(self.config.hidden_size, num_labels) |
| | |
| | def forward(self, input_ids, attention_mask, labels=None): |
| | outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) |
| | if hasattr(outputs, 'pooler_output') and outputs.pooler_output is not None: |
| | pooled_output = outputs.pooler_output |
| | else: |
| | pooled_output = outputs.last_hidden_state.mean(dim=1) |
| | pooled_output = self.dropout(pooled_output) |
| | logits = self.classifier(pooled_output) |
| | |
| | loss = None |
| | if labels is not None: |
| | loss_fn = nn.CrossEntropyLoss() |
| | loss = loss_fn(logits, labels) |
| | return {"loss": loss, "logits": logits} |
| |
|
| | class ViHateT5Model(BaseHateSpeechModel): |
| | """ViHateT5 cho hate speech detection""" |
| | def __init__(self, model_name: str = "VietAI/vit5-base", num_labels: int = 3): |
| | super().__init__(model_name, num_labels) |
| | self.config = T5Config.from_pretrained(model_name) |
| | self.encoder = T5ForConditionalGeneration.from_pretrained(model_name, config=self.config) |
| | self.dropout = nn.Dropout(0.1) |
| | self.classifier = nn.Linear(self.config.d_model, num_labels) |
| | |
| | def forward(self, input_ids, attention_mask, labels=None): |
| | outputs = self.encoder.encoder(input_ids=input_ids, attention_mask=attention_mask) |
| | |
| | last_hidden_states = outputs.last_hidden_state |
| | pooled_output = last_hidden_states.mean(dim=1) |
| | pooled_output = self.dropout(pooled_output) |
| | logits = self.classifier(pooled_output) |
| | |
| | loss = None |
| | if labels is not None: |
| | loss_fn = nn.CrossEntropyLoss() |
| | loss = loss_fn(logits, labels) |
| | return {"loss": loss, "logits": logits} |
| |
|
| | class XLMRModel(BaseHateSpeechModel): |
| | """XLM-R Large cho hate speech detection""" |
| | def __init__(self, model_name: str = "xlm-roberta-large", num_labels: int = 3): |
| | super().__init__(model_name, num_labels) |
| | self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) |
| | self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) |
| | self.dropout = nn.Dropout(0.1) |
| | self.classifier = nn.Linear(self.config.hidden_size, num_labels) |
| | |
| | def forward(self, input_ids, attention_mask, labels=None): |
| | outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) |
| | pooled_output = outputs.pooler_output |
| | pooled_output = self.dropout(pooled_output) |
| | logits = self.classifier(pooled_output) |
| | |
| | loss = None |
| | if labels is not None: |
| | loss_fn = nn.CrossEntropyLoss() |
| | loss = loss_fn(logits, labels) |
| | return {"loss": loss, "logits": logits} |
| |
|
| | class RoBERTaGRUModel(BaseHateSpeechModel): |
| | """RoBERTa + GRU Hybrid model""" |
| | def __init__(self, model_name: str = "vinai/phobert-base-v2", num_labels: int = 3, hidden_size: int = 256): |
| | super().__init__(model_name, num_labels) |
| | self.config = AutoConfig.from_pretrained(model_name, ignore_mismatched_sizes=True) |
| | self.encoder = AutoModel.from_pretrained(model_name, config=self.config, ignore_mismatched_sizes=True) |
| | self.gru = nn.GRU( |
| | input_size=self.config.hidden_size, |
| | hidden_size=hidden_size, |
| | num_layers=2, |
| | batch_first=True, |
| | dropout=0.1, |
| | bidirectional=True |
| | ) |
| | self.dropout = nn.Dropout(0.1) |
| | self.classifier = nn.Linear(hidden_size * 2, num_labels) |
| | |
| | def forward(self, input_ids, attention_mask, labels=None): |
| | outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) |
| | hidden_states = outputs.last_hidden_state |
| | |
| | |
| | gru_output, _ = self.gru(hidden_states) |
| | |
| | |
| | pooled_output = gru_output.mean(dim=1) |
| | pooled_output = self.dropout(pooled_output) |
| | logits = self.classifier(pooled_output) |
| | |
| | loss = None |
| | if labels is not None: |
| | loss_fn = nn.CrossEntropyLoss() |
| | loss = loss_fn(logits, labels) |
| | return {"loss": loss, "logits": logits} |
| |
|
| | class TextCNNModel(BaseHateSpeechModel): |
| | """TextCNN cho hate speech detection""" |
| | def __init__(self, vocab_size: int, embedding_dim: int = 128, num_labels: int = 3, |
| | num_filters: int = 100, filter_sizes: list = [3, 4, 5], dropout: float = 0.5): |
| | super().__init__("textcnn", num_labels) |
| | self.embedding = nn.Embedding(vocab_size, embedding_dim) |
| | self.convs = nn.ModuleList([ |
| | nn.Conv2d(1, num_filters, (filter_size, embedding_dim)) |
| | for filter_size in filter_sizes |
| | ]) |
| | self.dropout = nn.Dropout(dropout) |
| | self.classifier = nn.Linear(num_filters * len(filter_sizes), num_labels) |
| | |
| | @classmethod |
| | def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs): |
| | """Override để detect vocab_size từ state_dict hoặc checkpoint file""" |
| | |
| | vocab_size = kwargs.pop("vocab_size", None) |
| | config = kwargs.pop("config", None) |
| | |
| | |
| | if vocab_size is None: |
| | import os |
| | state_dict = None |
| | |
| | if os.path.isdir(pretrained_model_name_or_path): |
| | if os.path.isfile(os.path.join(pretrained_model_name_or_path, SAFE_WEIGHTS_NAME)): |
| | try: |
| | from safetensors.torch import load_file |
| | state_dict = load_file(os.path.join(pretrained_model_name_or_path, SAFE_WEIGHTS_NAME)) |
| | except Exception: |
| | pass |
| | elif os.path.isfile(os.path.join(pretrained_model_name_or_path, WEIGHTS_NAME)): |
| | try: |
| | state_dict = torch.load(os.path.join(pretrained_model_name_or_path, WEIGHTS_NAME), map_location="cpu") |
| | except Exception: |
| | pass |
| | |
| | |
| | if state_dict is not None and "embedding.weight" in state_dict: |
| | vocab_size = state_dict["embedding.weight"].shape[0] |
| | else: |
| | vocab_size = 30000 |
| | |
| | |
| | num_labels = kwargs.pop("num_labels", None) |
| | if num_labels is None: |
| | if config and hasattr(config, "num_labels"): |
| | num_labels = config.num_labels |
| | elif config and isinstance(config, dict) and "num_labels" in config: |
| | num_labels = config["num_labels"] |
| | else: |
| | num_labels = 3 |
| | |
| | |
| | model = cls(vocab_size=vocab_size, num_labels=num_labels, **kwargs) |
| | |
| | return model |
| | |
| | def forward(self, input_ids, attention_mask, labels=None): |
| | |
| | embedded = self.embedding(input_ids) |
| | |
| | |
| | embedded = embedded.unsqueeze(1) |
| | |
| | |
| | conv_outputs = [] |
| | for conv in self.convs: |
| | conv_out = F.relu(conv(embedded)) |
| | conv_out = conv_out.squeeze(3) |
| | pooled = F.max_pool1d(conv_out, conv_out.size(2)) |
| | pooled = pooled.squeeze(2) |
| | conv_outputs.append(pooled) |
| | |
| | |
| | concatenated = torch.cat(conv_outputs, dim=1) |
| | |
| | |
| | concatenated = self.dropout(concatenated) |
| | logits = self.classifier(concatenated) |
| | |
| | loss = None |
| | if labels is not None: |
| | loss_fn = nn.CrossEntropyLoss() |
| | loss = loss_fn(logits, labels) |
| | return {"loss": loss, "logits": logits} |
| |
|
| | class BiLSTMModel(BaseHateSpeechModel): |
| | """BiLSTM cho hate speech detection""" |
| | def __init__(self, vocab_size: int, embedding_dim: int = 128, hidden_size: int = 256, |
| | num_labels: int = 3, num_layers: int = 2, dropout: float = 0.5): |
| | super().__init__("bilstm", num_labels) |
| | self.embedding = nn.Embedding(vocab_size, embedding_dim) |
| | self.lstm = nn.LSTM( |
| | input_size=embedding_dim, |
| | hidden_size=hidden_size, |
| | num_layers=num_layers, |
| | batch_first=True, |
| | dropout=dropout if num_layers > 1 else 0, |
| | bidirectional=True |
| | ) |
| | self.dropout = nn.Dropout(dropout) |
| | self.classifier = nn.Linear(hidden_size * 2, num_labels) |
| | |
| | @classmethod |
| | def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs): |
| | """Override để detect vocab_size từ state_dict hoặc checkpoint file""" |
| | |
| | vocab_size = kwargs.pop("vocab_size", None) |
| | config = kwargs.pop("config", None) |
| | |
| | |
| | if vocab_size is None: |
| | import os |
| | state_dict = None |
| | |
| | if os.path.isdir(pretrained_model_name_or_path): |
| | if os.path.isfile(os.path.join(pretrained_model_name_or_path, SAFE_WEIGHTS_NAME)): |
| | try: |
| | from safetensors.torch import load_file |
| | state_dict = load_file(os.path.join(pretrained_model_name_or_path, SAFE_WEIGHTS_NAME)) |
| | except Exception: |
| | pass |
| | elif os.path.isfile(os.path.join(pretrained_model_name_or_path, WEIGHTS_NAME)): |
| | try: |
| | state_dict = torch.load(os.path.join(pretrained_model_name_or_path, WEIGHTS_NAME), map_location="cpu") |
| | except Exception: |
| | pass |
| | |
| | |
| | if state_dict is not None and "embedding.weight" in state_dict: |
| | vocab_size = state_dict["embedding.weight"].shape[0] |
| | else: |
| | vocab_size = 30000 |
| | |
| | |
| | num_labels = kwargs.pop("num_labels", None) |
| | if num_labels is None: |
| | if config and hasattr(config, "num_labels"): |
| | num_labels = config.num_labels |
| | elif config and isinstance(config, dict) and "num_labels" in config: |
| | num_labels = config["num_labels"] |
| | else: |
| | num_labels = 3 |
| | |
| | |
| | model = cls(vocab_size=vocab_size, num_labels=num_labels, **kwargs) |
| | |
| | return model |
| | |
| | def forward(self, input_ids, attention_mask, labels=None): |
| | |
| | embedded = self.embedding(input_ids) |
| | |
| | |
| | lstm_output, (hidden, cell) = self.lstm(embedded) |
| | |
| | |
| | |
| | pooled_output = lstm_output.mean(dim=1) |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | pooled_output = self.dropout(pooled_output) |
| | logits = self.classifier(pooled_output) |
| | |
| | loss = None |
| | if labels is not None: |
| | loss_fn = nn.CrossEntropyLoss() |
| | loss = loss_fn(logits, labels) |
| | return {"loss": loss, "logits": logits} |
| |
|
| | class EnsembleModel(BaseHateSpeechModel): |
| | """Ensemble model kết hợp các mô hình deep learning""" |
| | def __init__(self, models: list, num_labels: int = 3, weights: list = None): |
| | super().__init__("ensemble", num_labels) |
| | self.models = nn.ModuleList(models) |
| | self.num_models = len(models) |
| | self.weights = weights if weights else [1.0] * self.num_models |
| | self.weights = torch.tensor(self.weights, dtype=torch.float32) |
| | self.weights = self.weights / self.weights.sum() |
| | |
| | def forward(self, input_ids, attention_mask, labels=None): |
| | all_logits = [] |
| | total_loss = 0 |
| | |
| | for i, model in enumerate(self.models): |
| | model_output = model(input_ids, attention_mask, labels) |
| | all_logits.append(model_output["logits"]) |
| | |
| | if model_output["loss"] is not None: |
| | total_loss += self.weights[i] * model_output["loss"] |
| | |
| | |
| | ensemble_logits = torch.zeros_like(all_logits[0]) |
| | for i, logits in enumerate(all_logits): |
| | ensemble_logits += self.weights[i] * logits |
| | |
| | return { |
| | "loss": total_loss if total_loss > 0 else None, |
| | "logits": ensemble_logits |
| | } |
| |
|
| | def get_model(model_name: str, num_labels: int = 3, **kwargs): |
| | """ |
| | Factory function để tạo model dựa trên tên |
| | |
| | Args: |
| | model_name: Tên model ("phobert-v2", "bartpho", "visobert", "vihate-t5", |
| | "xlm-r", "roberta-gru", "textcnn", "bilstm", "bilstm-crf", "ensemble") |
| | num_labels: Số lượng nhãn (3 cho hate speech: CLEAN, OFFENSIVE, HATE) |
| | **kwargs: Các tham số bổ sung cho model |
| | |
| | Returns: |
| | Model instance |
| | """ |
| | model_mapping = { |
| | "phobert-v1": PhoBERTV1Model, |
| | "phobert-v2": PhoBERTV2Model, |
| | "bartpho": BartPhoModel, |
| | "visobert": ViSoBERTModel, |
| | "vihate-t5": ViHateT5Model, |
| | "xlm-r": XLMRModel, |
| | "mbert": MBERTModel, |
| | "sphobert": SPhoBERTModel, |
| | "roberta-gru": RoBERTaGRUModel, |
| | "textcnn": TextCNNModel, |
| | "bilstm": BiLSTMModel, |
| | "ensemble": EnsembleModel |
| | } |
| | |
| | if model_name not in model_mapping: |
| | raise ValueError(f"Unknown model: {model_name}. Available models: {list(model_mapping.keys())}") |
| | |
| | model_class = model_mapping[model_name] |
| | return model_class(num_labels=num_labels, **kwargs) |