A.I. Tools

MLOps-Tips and Tricks-75 Code Snippets | by Senthil E | Mar, 2023


Photo by Aswathy N on Unsplash

MLOps and Data Engineering


MLOps, or Machine Learning Operations, refers to the set of practices that streamline the development, deployment, and maintenance of machine learning models, bridging the gap between data science and software engineering. This article aims to provide valuable tips and tricks for MLOps and data engineering, covering a wide range of topics such as model training, data preprocessing, performance optimization, monitoring, and deployment.

Image by the Author

Dask-ML-Parallelize model training:Use Dask-ML to train and evaluate your machine-learning models in parallel, leveraging the full power of your hardware.With Dask-ML, you can quickly scale your machine learning workloads across multiple cores, processors, or even clusters, making it easy to train and evaluate large models on large datasets.import dask_ml.model_selection as dcvfrom sklearn.datasets import make_classificationfrom sklearn.svm import SVC

# Create a large datasetX, y = make_classification(n_samples=100000, n_features=20, random_state=42)

# Define your modelmodel = SVC()

# Train your model in parallel using Dask-MLparams = {“C”: dcv.Categorical([0.1, 1, 10]), “kernel”: dcv.Categorical([“linear”, “rbf”])}search = dcv.RandomizedSearchCV(model, params, n_iter=10, cv=3)search.fit(X, y)

Check for more information.

2. Feature Tools: Featuretools is an open-source Python library for automated feature engineering, allowing you to generate new features from your raw data with minimal manual effort.

import featuretools as ft

# Load your raw data into an entitysetes = ft.EntitySet(id=”my_data”)es = es.entity_from_dataframe(entity_id=”customers”, dataframe=data, index=”customer_id”)

# Define relationships between entities# …

# Automatically generate new featuresfeature_matrix, feature_defs = ft.dfs(entityset=es, target_entity=”customers”, max_depth=2)

For more information please check.

3. Tensorboard: TensorBoard is a powerful visualization tool for TensorFlow that allows you to monitor your model’s performance and track various metrics during training and evaluation.

import tensorflow as tffrom tensorflow.keras.callbacks import TensorBoard

# Define your modelmodel = tf.keras.Sequential([…])

# Compile your modelmodel.compile(optimizer=’adam’, loss=’sparse_categorical_crossentropy’, metrics=[‘accuracy’])

# Create a TensorBoard callbacktensorboard_callback = TensorBoard(log_dir=”./logs”)

# Train your model with the TensorBoard callbackmodel.fit(X_train, y_train, epochs=10, validation_data=(X_test, y_test), callbacks=[tensorboard_callback])

For more information please check.

4. Tensorflow Serving:

TensorFlow Serving is a high-performance serving system for machine learning models, designed for production environments.TensorFlow Serving supports multiple models, model versioning, and automatic loading and unloading of models, making it easy to manage and serve your machine learning models at scale.# Save your TensorFlow model in the SavedModel formatmodel.save(“my_model/1/”)

# Install TensorFlow Servingecho “deb [arch=amd64] stable tensorflow-model-server tensorflow-model-server-universal” | sudo tee /etc/apt/sources.list.d/tensorflow-serving.list && \curl | sudo apt-key add -sudo apt-get update && sudo apt-get install tensorflow-model-server

# Start TensorFlow Serving with your modeltensorflow_model_server –rest_api_port=8501 –model_name=my_model –model_base_path=$(pwd)/my_model

For more information please check.

5. Automate hyperparameter tuning with Optuna: Optuna is a powerful and flexible optimization library that can automatically explore and optimize hyperparameters for your machine-learning models.

import optunafrom sklearn.model_selection import cross_val_scorefrom sklearn.ensemble import RandomForestClassifier

def objective(trial):n_estimators = trial.suggest_int(“n_estimators”, 10, 200)max_depth = trial.suggest_int(“max_depth”, 3, 20)

clf = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth)score = cross_val_score(clf, X_train, y_train, cv=5).mean()

return score

study = optuna.create_study(direction=”maximize”)study.optimize(objective, n_trials=50)

best_params = study.best_params

For more information please check.

6. SHAP: Use SHAP (SHapley Additive exPlanations) to explain the output of your machine learning models and gain insights into their behavior.

import shapfrom sklearn.model_selection import train_test_splitfrom sklearn.ensemble import RandomForestRegressor

# Load and prepare your data# …

# Train a RandomForestRegressorX_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)model = RandomForestRegressor()model.fit(X_train, y_train)

# Explain the model’s predictions using SHAPexplainer = shap.Explainer(model)shap_values = explainer(X_test)

# Plot the SHAP values for a single predictionshap.plots.waterfall(shap_values[0])

For more information please check.

7. Ray: Ray Tune is a powerful and flexible library for distributed hyperparameter tuning, allowing you to leverage the full power of your hardware to optimize your machine learning models.

from ray import tunefrom ray.tune.schedulers import ASHASchedulerfrom sklearn.datasets import make_classificationfrom sklearn.model_selection import cross_val_scorefrom sklearn.ensemble import RandomForestClassifier

def train_model(config):n_estimators = config[“n_estimators”]max_depth = config[“max_depth”]

clf = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth)score = cross_val_score(clf, X_train, y_train, cv=3).mean()


# Load your dataX, y = make_classification(n_samples=1000, n_features=20, random_state=42)

# Define the search space for hyperparametersconfig = {“n_estimators”: tune.randint(10, 200),”max_depth”: tu:ne.randint(3, 20)}

# Set up Ray Tunescheduler = ASHAScheduler(metric=”mean_accuracy”, mode=”max”)analysis = tune.run(train_model, config=config, scheduler=scheduler, num_samples=50)

# Get the best hyperparametersbest_params = analysis.best_config

For more information please check.

8. Experiment tracking with MLflow: MLflow, you can compare different runs, reproduce previous results, and share your work with others, making collaboration and iteration more efficient.

import mlflowimport mlflow.sklearnfrom sklearn.datasets import load_irisfrom sklearn.model_selection import train_test_splitfrom sklearn.ensemble import RandomForestClassifierfrom sklearn.metrics import accuracy_score

# Load your datairis = load_iris()X, y = iris.data, iris.targetX_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

# Train your model and log metrics with MLflowwith mlflow.start_run():clf = RandomForestClassifier(n_estimators=100, max_depth=10)clf.fit(X_train, y_train)

train_accuracy = clf.score(X_train, y_train)test_accuracy = clf.score(X_test, y_test)

mlflow.log_param(“n_estimators”, 100)mlflow.log_param(“max_depth”, 10)mlflow.log_metric(“train_accuracy”, train_accuracy)mlflow.log_metric(“test_accuracy”, test_accuracy)

mlflow.sklearn.log_model(clf, “model”)

For more information please check.

9. Scikit-learn: Pipeline: Use Scikit-learn Pipeline to chain multiple preprocessing steps and a final estimator.

from sklearn.pipeline import Pipelinefrom sklearn.preprocessing import StandardScalerfrom sklearn.linear_model import LogisticRegression

pipe = Pipeline([(“scaler”, StandardScaler()),(“classifier”, LogisticRegression())])

pipe.fit(X_train, y_train)

10. Scikit-learn: Grid search: Use GridSearchCV to perform hyperparameter tuning.

from sklearn.model_selection import GridSearchCV

param_grid = {“classifier__C”: [0.1, 1, 10],”classifier__penalty”: [“l1”, “l2”]}

grid_search = GridSearchCV(pipe, param_grid, cv=5)grid_search.fit(X_train, y_train)

11. Joblib:joblib is a popular library for saving and loading Scikit-learn models. Use dump() to save a model to a file, and load() to restore the model from the file.

import joblib

# Save the modeljoblib.dump(grid_search.best_estimator_, “model.pkl”)

# Load the modelloaded_model = joblib.load(“model.pkl”)

12. Tensorflow: Simple neural network. Use the Keras API to define a simple feedforward neural network with dense (fully connected) layers.

import tensorflow as tf

model = tf.keras.Sequential([tf.keras.layers.Dense(64, activation=”relu”, input_shape=(10,)),tf.keras.layers.Dense(32, activation=”relu”),tf.keras.layers.Dense(1, activation=”sigmoid”)])

model.compile(optimizer=”adam”, loss=”binary_crossentropy”, metrics=[“accuracy”])

13. Early Stopping: Code snippet for early stopping

early_stopping = tf.keras.callbacks.EarlyStopping(monitor=”val_loss”, patience=3)

history = model.fit(X_train, y_train, epochs=100, validation_split=0.2, callbacks=[early_stopping])

14. Tensorflow Model-Save and Load: Use the save() method to save the model architecture, weights, and optimizer state to a single file. Use load_model() to restore the saved model from the file.

# Save the modelmodel.save(“model.h5”)

# Load the modelloaded_model = tf.keras.models.load_model(“model.h5”)

15. Dask: Parallelize operations: Use Dask to parallelize operations on large datasets.

import dask.array as da

x = da.ones((10000, 10000), chunks=(1000, 1000))y = x + x.Tz = y.sum(axis=0)result = z.compute()

16. TPOT: Automated machine learning: TPOT (Tree-based Pipeline Optimization Tool) is a genetic algorithm-based automated machine learning library. Use TPOTClassifier or TPOTRegressor to optimize a machine learning pipeline for your data.

from tpot import TPOTClassifier

tpot = TPOTClassifier(generations=5, population_size=20, verbosity=2)tpot.fit(X_train, y_train)

For more information please check.

Image by the Author

17. Category Encoders: Category Encoders is a library that provides various encoding methods for categorical variables, such as target encoding, one-hot encoding, and ordinal encoding.

import category_encoders as ce

encoder = ce.TargetEncoder()X_train_encoded = encoder.fit_transform(X_train, y_train)X_test_encoded = encoder.transform(X_test)

18. Imbalanced-learn: is a library that provides various techniques for handling imbalanced datasets, such as oversampling, undersampling, and combination methods. Use the appropriate resampling technique, such as SMOTE, to balance your dataset before training your model.

from imblearn.over_sampling import SMOTE

smote = SMOTE()X_resampled, y_resampled = smote.fit_resample(X_train, y_train)

For more information please check.

19. Auto-sklearn: is an automated machine learning library that wraps Scikit-learn, providing the automated model and preprocessing selection. Use AutoSklearnClassifier or AutoSklearnRegressor to optimize a machine learning pipeline data.

from autosklearn.classification import AutoSklearnClassifier

auto_classifier = AutoSklearnClassifier(time_left_for_this_task=600)auto_classifier.fit(X_train, y_train)

20. Scikit-learn: Column Transformer: ColumnTransformer allows you to apply different preprocessing steps to different columns of your input data, which is particularly useful when dealing with mixed data types.

from sklearn.compose import ColumnTransformerfrom sklearn.preprocessing import StandardScaler, OneHotEncoder

preprocessor = ColumnTransformer(transformers=[(“num”, StandardScaler(), [“numerical_feature_1”, “numerical_feature_2”]),(“cat”, OneHotEncoder(), [“categorical_feature”]),])

X_train_transformed = preprocessor.fit_transform(X_train)X_test_transformed = preprocessor.transform(X_test)

21. RandomizedSearchCV is an alternative to GridSearchCV that searches the parameter space more efficiently by randomly sampling a fixed number of parameter settings. Define a parameter distribution as a dictionary, where the keys are the parameter names (including the step name if using a pipeline) and the values are distributions from which to sample parameter values. Pass the model (or pipeline) and parameter distribution to RandomizedSearchCV and fit the data.

from sklearn.model_selection import RandomizedSearchCVfrom scipy.stats import uniform

param_dist = {“classifier__C”: uniform(loc=0, scale=4),”preprocessor__num__with_mean”: [True, False],}

random_search = RandomizedSearchCV(pipe, param_dist, n_iter=10, cv=5, scoring=”accuracy”)random_search.fit(X_train, y_train)

22. TensorFlow Data Validation: Use TensorFlow Data Validation (TFDV) to validate and explore your data.

import tensorflow_data_validation as tfdv

stats = tfdv.generate_statistics_from_csv(data_location=”train.csv”)schema = tfdv.infer_schema(statistics=stats)tfdv.display_schema(schema=schema)

23. TensorFlow Model Analysis: Use TensorFlow Model Analysis (TFMA) to evaluate your TensorFlow models.

import tensorflow_model_analysis as tfma

eval_shared_model = tfma.default_eval_shared_model(eval_saved_model_path=”path/to/saved_model”)results = tfma.run_model_analysis(eval_shared_model=eval_shared_model,data_location=”test.tfrecords”,file_format=”tfrecords”,slice_spec=[tfma.slicer.SingleSliceSpec()])tfma.view.render_slicing_metrics(results)

24. TensorFlow Transform: Use TensorFlow Transform (TFT) to preprocess your data for TensorFlow models.

import tensorflow_transform as tft

def preprocessing_fn(inputs):outputs = {}outputs[“scaled_feature”] = tft.scale_to_z_score(inputs[“numerical_feature”])outputs[“one_hot_feature”] = tft.compute_and_apply_vocabulary(inputs[“categorical_feature”])return outputs

25. TensorFlow Extended (TFX): Use TensorFlow Extended (TFX) to create end-to-end machine learning pipelines.

from tfx.components import CsvExampleGen, Trainerfrom tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

context = InteractiveContext()

example_gen = CsvExampleGen(input_base=”path/to/data”)context.run(example_gen)

trainer = Trainer(module_file=”path/to/trainer_module.py”,examples=example_gen.outputs[“examples”],train_args=trainer_pb2.TrainArgs(num_steps=10000),eval_args=trainer_pb2.EvalArgs(num_steps=5000))context.run(trainer)

Image by the Author

26. CuPy: CuPy is a library that provides a NumPy-like interface for GPU-accelerated computing. Use CuPy arrays, which have a similar interface to NumPy arrays, to perform computations on GPU. Many common NumPy functions are available in CuPy, allowing you to perform GPU-accelerated computations with familiar syntax.

import cupy as cp

x = cp.array([1, 2, 3, 4, 5])y = cp.array([6, 7, 8, 9, 10])

z = cp.dot(x, y)

27. RAPIDS is a suite of GPU-accelerated libraries for data science, including cuDF (GPU-accelerated DataFrame library similar to Pandas) and cuML (GPU-accelerated machine learning library similar to Scikit-learn). Use cuDF DataFrames to perform data manipulation tasks on GPU, and cuML models to train and evaluate machine learning models on GPU.

import cudfimport cuml

df = cudf.read_csv(“data.csv”)kmeans_model = cuml.KMeans(n_clusters=5)kmeans_model.fit(df)

28. FastAPI is a modern, high-performance web framework for building APIs with Python, particularly suitable for machine learning models.

Create an instance of FastAPI, and define API endpoints using decorators, such as @app.post().Use uvicorn to run your FastAPI application, specifying the host and port.from fastapi import FastAPIimport uvicorn

app = FastAPI()

@app.post(“/predict”)async def predict(text: str):prediction = model.predict([text])return {“prediction”: prediction}

if __name__ == “__main__”:uvicorn.run(app, host=”″, port=8000)

29. Streamlit is a library for quickly creating interactive web applications for machine learning and data science, using only Python.

Use Streamlit’s simple API to create user interface elements, such as text inputs and sliders, and display output or visualizations.Run the Streamlit app using the command streamlit run app.py in your terminal.import streamlit as st

st.title(“My Streamlit App”)

input_text = st.text_input(“Enter some text:”)st.write(f”You entered: {input_text}”)

slider_value = st.slider(“Select a value:”, 0, 100, 50)st.write(f”Slider value: {slider_value}”)

For more information please check.

30. Docker File: Create a Dockerfile to define a custom Docker image for your machine learning application.

FROM python:3.8

WORKDIR /appCOPY requirements.txt .RUN pip install –no-cache-dir -r requirements.txt

COPY . .

CMD [“python”, “app.py”]

Use the FROM keyword to specify the base image, such as the official Python image.Use the WORKDIR keyword to set the working directory for subsequent instructions.Use the COPY keyword to copy files and directories from the host system to the image.Use the RUN keyword to execute commands during the build process, such as installing dependencies.Use the CMD keyword to define the default command to run when the container starts.

31. Build a docker image:

docker build -t my_ml_app:latest .

32. Run a docker container: Use the docker run command to create and start a Docker container from an image. Use the -p flag to map a host port to a container port, allowing external access to services running inside the container.

docker run -p 5000:5000 my_ml_app:latest

32. Kubernetes YAML Config File:

apiVersion: apps/v1kind: Deploymentmetadata:name: my-ml-appspec:replicas: 3selector:matchLabels:app: my-ml-apptemplate:metadata:labels:app: my-ml-appspec:containers:- name: my-ml-app-containerimage: my_ml_app:latestports:- containerPort: 5000

apiVersion: v1kind: Servicemetadata:name: my-ml-app-servicespec:selector:app: my-ml-appports:- protocol: TCPport: 80targetPort: 5000type: LoadBalancer

Use apiVersion, kind, and metadata to define the Kubernetes resource type and metadata.Use spec to define the desired state of the resource, such as the number of replicas, container images, and exposed ports.Use the — separator to define multiple resources in the same file, such as a Deployment and a Service.

33. kubectl: Use the kubectl command-line tool to manage the Kubernetes cluster and resources.

# Apply the Kubernetes configuration filekubectl apply -f my_ml_app.yaml

# List all deploymentskubectl get deployments

# List all serviceskubectl get services

# Scale the deploymentkubectl scale deployment my-ml-app –replicas=5

# Delete the deployment and servicekubectl delete -f my_ml_app.yaml

34. Organize your project:

my_ml_project/|– data/| |– raw/| |– processed/|– models/|– notebooks/|– src/| |– features/| |– models/| |– utils/|– Dockerfile|– requirements.txtUse separate directories for data, models, notebooks, and source code.Further, subdivide directories to separate raw and processed data or different types of source code modules.

35. Model versioning: Use model versioning tools like DVC or MLflow to track different versions of your trained machine learning models.

Store model artifacts (e.g., weights, metadata) in a centralized storage system, such as Amazon S3 or Google Cloud Storage.Use a versioning tool to keep track of model versions, their associated training data, and hyperparameters.Enable easy model comparison and reproducibility by tracking performance metrics and training configurations.

36. Automated testing:

Use testing libraries like unittest or pytest to write and run tests.Test individual functions and classes with unit tests, and test interactions between components with integration tests.Perform end-to-end tests to ensure the entire system works as expected, including model serving and API endpoints.

37. Papermill:

Papermill allows you to parameterize Jupyter Notebooks by injecting new values for specific cells.Execute Notebooks programmatically and generate reports with different parameter values without manual intervention.import papermill as pm

pm.execute_notebook(input_path=’input_notebook.ipynb’,output_path=’output_notebook.ipynb’,parameters={‘param1’: ‘value1’, ‘param2’: ‘value2’})

38. Environment management: tools like Conda or virtualenv to create isolated environments for projects.

# Create a new Conda environmentconda create -n my_ml_env python=3.8

# Activate the environmentconda activate my_ml_env

# Install packagesconda install pandas scikit-learn

# Deactivate the environmentconda deactivate

39. Progressive model loading: Load large models in chunks to reduce memory consumption and improve performance.

import numpy as npimport pandas as pdfrom sklearn.linear_model import LinearRegression

chunksize = 10000model = LinearRegression()

for i, chunk in enumerate(pd.read_csv(“large_dataset.csv”, chunksize=chunksize)):X_chunk = chunk.drop(“target”, axis=1)y_chunk = chunk[“target”]model.partial_fit(X_chunk, y_chunk)print(f”Processed chunk {i + 1}”)

40. Feature encoding:

Feature encoding techniques transform categorical variables into numerical representations that machine learning models can use.One-hot encoding creates binary columns for each category, while target encoding replaces each category with the mean of the target variable for that category.import pandas as pdfrom sklearn.preprocessing import OneHotEncoder

data = pd.DataFrame({“Category”: [“A”, “B”, “A”, “C”]})

encoder = OneHotEncoder()encoded_data = encoder.fit_transform(data)


41. Data validation: Validate the quality and consistency of your data using data validation frameworks like Great Expectations, Pandera, or custom validation functions.

import pandera as pafrom pandera import DataFrameSchema, Column, Check

schema = DataFrameSchema({“age”: Column(pa.Int, Check(lambda x: 18 <= x <= 100)),”income”: Column(pa.Float, Check(lambda x: x >= 0)),”gender”: Column(pa.String, Check(lambda x: x in [“M”, “F”, “Other”])),})

# Validate your DataFramevalidated_df = schema.validate(df)

42. Data versioning: Use data versioning tools like DVC or Pachyderm to track changes to your datasets and ensure reproducibility across different experiments and model versions.

# Initialize DVC in your projectdvc init

# Add your dataset to DVCdvc add data/my_dataset

# Commit the changes to your Git repositorygit add data/my_dataset.dvc .dvc/configgit commit -m “Add my_dataset to DVC”

43. Use feature stores: Implement feature stores like Feast or Hopsworks to store, manage, and serve features for machine learning models.

from feast import FeatureStore

# Initialize the feature storestore = FeatureStore(repo_path=”path/to/your/feature_store”)

# Fetch features for trainingtraining_df = store.get_historical_features(entity_df=entity_df,feature_refs=[“your_feature_name”]).to_df()

# Fetch features for servingfeature_vector = store.get_online_features(feature_refs=[“your_feature_name”],entity_rows=[{“your_entity_key”: “your_value”}]).to_dict()

Feature stores can help you centralize the management of your features, ensuring consistency and reducing duplication across different models and experiments.

44. Feature scaling: Apply feature scaling techniques like MinMax scaling, standard scaling, or normalization to ensure that your features have similar scales and distributions.

from sklearn.datasets import load_irisfrom sklearn.preprocessing import StandardScaler

X, y = load_iris(return_X_y=True)

# Scale features using standard scalingscaler = StandardScaler()X_scaled = scaler.fit_transform(X)

45. Dimensionality reduction: Apply dimensionality reduction techniques like PCA, t-SNE, or UMAP to reduce the number of features in your dataset while preserving important patterns and relationships.

from sklearn.datasets import load_irisfrom sklearn.decomposition import PCA

X, y = load_iris(return_X_y=True)

# Apply PCA to reduce the dimensionality of the datasetpca = PCA(n_components=2)X_reduced = pca.fit_transform(X)

46. Pandas chaining: Chain Pandas operations together to create more readable and concise data manipulation code.

import pandas as pd

data = pd.read_csv(“my_data.csv”)

# Chain Pandas operationsresult = (data.query(“age >= 30”).groupby(“city”).agg({“salary”: “mean”}).sort_values(“salary”, ascending=False))

47. Use the ‘pipe’ function: Use the pipe function to integrate custom functions or operations in your Pandas chaining workflow.

import pandas as pd

def custom_operation(df, column, value):return df[df[column] > value]

data = pd.read_csv(“data.csv”)

# Integrate custom operations using ‘pipe’result = (data.pipe(custom_operation, “age”, 18).groupby(“city”).agg({“salary”: “mean”}).sort_values(“salary”, ascending=False))

48. Pandas’ built-in plotting: Use Pandas’ built-in plotting functions for quick and easy data visualization.

import pandas as pd

data = pd.read_csv(“my_data.csv”)

# Create a bar plot of average salary by citydata.groupby(“city”)[“salary”].mean().plot(kind=”bar”)

49. Visualize missing data with Missingno: Use the Missingno library to visualize missing data in your dataset.

import pandas as pdimport missingno as msno

data = pd.read_csv(“data.csv”)

# Visualize missing datamsno.matrix(data)

50. Use SQL Databases: You can use the sqlite3 library in Python to interact with an SQLite database. For example, you can create a table in an SQLite database and insert some data into it:

import sqlite3

# Connect to an SQLite databaseconn = sqlite3.connect(‘example.db’)

# Create a tableconn.execute(‘CREATE TABLE IF NOT EXISTS my_table (id INTEGER PRIMARY KEY, name TEXT)’)

# Insert some dataconn.execute(‘INSERT INTO my_table (id, name) VALUES (?, ?)’, (1, ‘John’))conn.execute(‘INSERT INTO my_table (id, name) VALUES (?, ?)’, (2, ‘Jane’))

# Commit the changesconn.commit()

# Retrieve datacursor = conn.execute(‘SELECT * FROM my_table’)for row in cursor:print(row)

51. Requests Library: Use the requests library to make HTTP requests: The requests library provides a simple way to make HTTP requests to APIs or websites. Here’s an example of how to make a GET request.

import requests

# make a GET request to a websiteresponse = requests.get(‘https://www.google.com’)

# print the response contentprint(response.content)

52. OS Library: Use the os library to manipulate files and directories: The os library provides functions for interacting with files and directories.

import os

# create a directoryos.mkdir(‘my_directory’)

53. Working with JSON:

Encoding Python data to JSON format:

import json

data = {“name”: “Mark”,”age”: 28,”gender”: “Male”}

json_data = json.dumps(data)print(json_data)

Decoding JSON data to Python format:

import json

json_data = ‘{“name”: “Mark”, “age”: 28, “gender”: “Male”}’

data = json.loads(json_data)print(data)

54. Working with CSV Files: USing CSV module.

import csv

# Reading a CSV filewith open(‘example.csv’, ‘r’) as file:csv_reader = csv.reader(file)for row in csv_reader:print(row)

# Writing to a CSV filewith open(‘example.csv’, ‘w’, newline=”) as file:csv_writer = csv.writer(file)csv_writer.writerow([‘Name’, ‘Age’, ‘Gender’])csv_writer.writerow([‘John’, 25, ‘Male’])csv_writer.writerow([‘Jane’, 30, ‘Female’])

55. Using SQL Alchemy for Database Access: SQL Alchemy is a popular Python library for working with databases. It provides a simple interface for connecting to various databases and executing SQL queries.

from sqlalchemy import create_engine

# Connect to a PostgreSQL databaseengine = create_engine(‘postgresql://username:password@host:port/database_name’)

# Execute a SQL query and return the results as a dataframequery = “SELECT * FROM table_name WHERE column_name > 100″df = pd.read_sql(query, engine)

# Write a dataframe to a new table in the databasedf.to_sql(‘new_table_name’, engine)

56. Feature selection using Recursive Feature Elimination (RFE):

RFE helps identify the most important features, leading to better model performance and faster training.Feature selection can reduce overfitting and improve the generalization of your model.from sklearn.datasets import load_irisfrom sklearn.feature_selection import RFEfrom sklearn.linear_model import LogisticRegression

# Load your datairis = load_iris()X, y = iris.data, iris.target

# Create a Logistic Regression modelmodel = LogisticRegression()

# Perform Recursive Feature Eliminationrfe = RFE(model, n_features_to_select=2)rfe.fit(X, y)

# Get the most important featuresimportant_features = rfe.support_

57. Use Apache Parquet for efficient storage of columnar data: Apache Parquet is a columnar storage file format that provides efficient compression and encoding schemes, making it ideal for storing large datasets used in machine learning.

import pandas as pdimport pyarrow as paimport pyarrow.parquet as pq

# Read a CSV file using pandasdata = pd.read_csv(“data.csv”)

# Convert the pandas DataFrame to an Apache Arrow Tabletable = pa.Table.from_pandas(data)

# Write the Arrow Table to a Parquet filepq.write_table(table, “data.parquet”)

# Read the Parquet file into a pandas DataFramedata_from_parquet = pq.read_table(“data.parquet”).to_pandas()

58. Use Apache Kafka for real-time data streaming: Apache Kafka is a distributed streaming platform that enables you to build real-time data pipelines and applications.

from kafka import KafkaProducer, KafkaConsumer

# Create a Kafka producerproducer = KafkaProducer(bootstrap_servers=”localhost:9092″)

# Send a message to a Kafka topicproducer.send(“my_topic”, b”Hello, Kafka!”)

# Create a Kafka consumerconsumer = KafkaConsumer(“my_topic”, bootstrap_servers=”localhost:9092″)

# Consume messages from the Kafka topicfor msg in consumer:print(msg.value)

59. Partition your data for efficient querying: Partitioning your data can help improve query performance by reducing the amount of data that needs to be read for a given query.

import pandas as pdimport pyarrow as paimport pyarrow.parquet as pq

# Read a CSV file using pandasdata = pd.read_csv(“data.csv”)

# Convert the pandas DataFrame to an Apache Arrow Tabletable = pa.Table.from_pandas(data)

# Write the Arrow Table to a partitioned Parquet datasetpq.write_to_dataset(table, root_path=”partitioned_data”, partition_cols=[“state”])

# Read the partitioned Parquet dataset into a pandas DataFramedata_from_partitioned_parquet = pq.ParquetDataset(“partitioned_data”).read().to_pandas()

60. Use data augmentation techniques to increase dataset size: Data augmentation involves creating new training examples by applying various transformations to the existing data, which can help improve model performance.

import numpy as npimport tensorflow as tffrom tensorflow.keras.preprocessing.image import ImageDataGenerator

# Define an image data generator for data augmentationdatagen = ImageDataGenerator(rotation_range=20,width_shift_range=0.2,height_shift_range=0.2,horizontal_flip=True,)

# Load your data(x_train, y_train), (_, _) = tf.keras.datasets.cifar10.load_data()x_train = x_train.astype(np.float32) / 255.0

# Fit the data generator to your datadatagen.fit(x_train)

# Train your model with augmented datamodel = create_your_model()model.compile(optimizer=”adam”, loss=”sparse_categorical_crossentropy”, metrics=[“accuracy”])model.fit(datagen.flow(x_train, y_train, batch_size=32), epochs=10)

61. Using Flask for model deployment: Below is an example of how to use Flask to deploy a machine learning model:

from flask import Flask, request, jsonifyimport joblib

app = Flask(__name__)

@app.route(‘/predict’, methods=[‘POST’])def predict():data = request.get_json()features = [data[‘feature1’], data[‘feature2’], data[‘feature3’]]model = joblib.load(‘model.pkl’)prediction = model.predict([features])[0]response = {‘prediction’: int(prediction)}return jsonify(response)

if __name__ == ‘__main__’:app.run()

62. Using Pytest for testing:

For example, we have a file called math_operations.py.

# math_operations.py

def add(a, b):return a + b

def multiply(a, b):return a * b

Next, create a test module with the same name as your module, but with a test_ prefix. In our case, we’ll create a file calledtest_math_operations.py:

# test_math_operations.py

import math_operations

def test_add():assert math_operations.add(2, 3) == 5assert math_operations.add(-1, 1) == 0assert math_operations.add(0, 0) == 0

def test_multiply():assert math_operations.multiply(2, 3) == 6assert math_operations.multiply(-1, 1) == -1assert math_operations.multiply(0, 0) == 0

Run the tests using the pytest command

pytest test_math_operations.py

Pytest will discover and run the test functions in the test_math_operations.py module.

63. Use automated data pipelines: Automated data pipelines can help you automate the process of data ingestion, cleaning, and transformation. Some of the important tools are

Apache Airflow Ml pipeline

from airflow import DAGfrom airflow.operators.python_operator import PythonOperatorfrom datetime import datetime

def preprocess_data():# Preprocess data herepass

def train_model():# Train model herepass

default_args = {‘owner’: ‘myname’,’start_date’: datetime(2023, 3, 15),’retries’: 1,’retry_delay’: timedelta(minutes=5),}

with DAG(‘my_dag’, default_args=default_args, schedule_interval=’@daily’) as dag:preprocess_task = PythonOperator(task_id=’preprocess_task’, python_callable=preprocess_data)train_task = PythonOperator(task_id=’train_task’, python_callable=train_model)

preprocess_task >> train_task

64. Use Transfer Learning: Transfer learning can help you reuse and adapt pre-trained machine learning models for your own use cases. Here’s an example of how to use transfer learning with TensorFlow:

import tensorflow as tffrom tensorflow.keras.applications import VGG16

# Load pre-trained modelbase_model = VGG16(weights=’imagenet’, include_top=False, input_shape=(224, 224, 3))

# Freeze base layersfor layer in base_model.layers:layer.trainable = False

# Add custom top layersx = base_model.outputx = tf.keras.layers.GlobalAveragePooling2D()(x)x = tf.keras.layers.Dense(256, activation=’relu’)(x)predictions = tf.keras.layers.Dense(10, activation=’softmax’)(x)

# Create new modelmodel = tf.keras.models.Model(inputs=base_model.input, outputs=predictions)

# Compile and train modelmodel.compile(optimizer=’adam’, loss=’categorical_crossentropy’, metrics=[‘accuracy’])model.fit(X_train, y_train, epochs=10)

65. Use automated machine learning(Auto ML): By using platforms like H2O.ai or Google Cloud AutoML, you can automatically select, train, and deploy models based on your data and requirements. Here’s an example of how to use H2O.ai’s AutoML platform:

import h2ofrom h2o.automl import H2OAutoML

# Start H2O clusterh2o.init()

# Load datadata = h2o.import_file(‘my-data.csv’)

# Define target variabletarget = ‘label’

# Split data into train and test setstrain, test = data.split_frame(ratios=[0.8])

# Define AutoML settingsautoml = H2OAutoML(max_models=10, seed=1234)

# Train AutoML modelautoml.train(x=data.columns, y=target, training_frame=train)

# Evaluate AutoML modelpredictions = automl.leader.predict(test)accuracy = (predictions[‘predict’] == test[target]).mean()print(f’Accuracy: {accuracy}’)

66. Use anomaly detection: By using libraries like PyOD or TensorFlow, you can detect anomalies based on statistical or machine learning techniques. Here’s an example of how to use PyOD to detect anomalies in a dataset:

import numpy as npfrom pyod.models.knn import KNN

# Load dataX = np.load(‘my-data.npy’)

# Define anomaly detectordetector = KNN(n_neighbors=5)

# Train detectordetector.fit(X)

# Detect anomaliesanomaly_scores = detector.decision_scores_threshold = np.percentile(anomaly_scores, 95)anomalies = np.where(anomaly_scores > threshold)

# Print anomaliesprint(f’Anomalies: {anomalies}’)

67. Using Weights and Biases: Here’s an example of how to use Weights & Biases to run and track machine learning experiments.

import wandbimport tensorflow as tf

# Initialize W&Bwandb.init(project=’my-project’)

# Load datadata = tf.data.TFRecordDataset(‘my-data.tfrecord’)

# Define hyperparametersconfig = wandb.configconfig.learning_rate = 0.1config.num_epochs = 10

# Define modelmodel = tf.keras.models.Sequential([tf.keras.layers.Dense(32, activation=’relu’),tf.keras.layers.Dense(10, activation=’softmax’)])model.compile(optimizer=tf.keras.optimizers.Adam(), loss=’categorical_crossentropy’, metrics=[‘accuracy’])

# Train modelhistory = model.fit(data.batch(32), epochs=config.num_epochs)

# Log metrics and artifacts to W&Bwandb.log({‘accuracy’: history.history[‘accuracy’][-1]})wandb.log_artifact(‘my-model.h5’)

68. Important tools managing machine learning workflows:

69. Use Data Compression: Consider using tools and libraries such as zlib, gzip, or bz2 for data compression in Python.

import zlib

# Compress data with zlibdata_compressed = zlib.compress(data)

# Decompress data with zlibdata_decompressed = zlib.decompress(data_compressed)

70. Data serialization: Consider using tools and libraries such as JSON, YAML, or protobuf for data serialization in Python.

import json

# Serialize data to JSONdata_json = json.dumps(data)

# Deserialize data from JSONdata_deserialized = json.loads(data_json)

71. Data normalization and scaling: Consider using tools and libraries such as scikit-learn, TensorFlow, or PyTorch for data normalization and scaling in Python.

import pandas as pdfrom sklearn.preprocessing import StandardScaler, MinMaxScaler

# Standardize data with Z-score normalizationscaler = StandardScaler()data_normalized = scaler.fit_transform(data)

# Scale data with min-max scalingscaler = MinMaxScaler()data_scaled = scaler.fit_transform(data)

72. Data encryption and security: Consider using tools and libraries such as cryptography, Fernet, or PyAesCrypt for data encryption and security in Python.

from cryptography.fernet import Fernet

# Generate encryption key with Fernetkey = Fernet.generate_key()

# Encrypt data with Fernetcipher_suite = Fernet(key)encrypted_data = cipher_suite.encrypt(data)

# Decrypt data with Fernetdecrypted_data = cipher_suite.decrypt(encrypted_data)

import hashlib

# Hash data with hashlibhash_value = hashlib.sha256(data.encode(‘utf-8’)).hexdigest()

import tokenizers

# Define tokenization with tokenizerstokenizer = tokenizers.Tokenizer(tokenizers.models.WordPiece(‘vocab.txt’, unk_token='[UNK]’))encoded_data = tokenizer.encode(data).ids

73. Data Validation using Great Expectation:

import great_expectations as ge

# Load a dataset (e.g., a Pandas DataFrame)data = ge.read_csv(“data.csv”)

# Create an Expectation Suiteexpectation_suite = data.create_expectation_suite(“my_suite”)

# Add expectationsdata.expect_column_values_to_be_unique(“id”)data.expect_column_values_to_not_be_null(“name”)data.expect_column_mean_to_be_between(“age”, min_value=20, max_value=40)

# Validate data against the Expectation Suitevalidation_result = data.validate(expectation_type=”basic”)

# Save the Expectation Suite and the validation resultge.save_expectation_suite(expectation_suite, “my_suite.json”)ge.save_validation_result(validation_result, “my_suite_validation.json”)

74. logging module: Use the logging module for flexible logging.

import logging

logging.basicConfig(level=logging.INFO)logging.info(“This is an info message.”)logging.error(“This is an error message.”)

75. Use Dask dataframe : Dask is a powerful library for parallel and distributed computing in Python. It allows you to process large datasets that don’t fit into memory by breaking them into smaller chunks and processing them in parallel.

import dask.dataframe as dd

# Read CSV file using Dask (file is partitioned into smaller chunks)ddf = dd.read_csv(‘large_file.csv’)

# Perform operations on the data (lazy evaluation)filtered_ddf = ddf[ddf[‘column_A’] > 10]mean_value = filtered_ddf[‘column_B’].mean()

# Compute the result (operations are executed in parallel)result = mean_value.compute()print(“Mean of column B for rows where column A > 10:”, result)


I hope the above tips and tricks are useful. Again there is a lot of tools and process in MLOps. The MLOps landscape also keeps changing very fast. It is better to keep updating with the latest tools and MLOps processes.


Mlflow: An Open Platform to Simplify the Machine Learning Lifecycle. (2021). An open-source monitoring system with a dimensional data model. (2021). The open and composable observability and data visualization platform. (2021). Core: Deploy, scale & monitor your machine learning models in Kubernetes. (2021). The Machine Learning Toolkit for Kubernetes. (2021). Parallel computing with task scheduling. (2021). Retrieved from Expectations: Always know what to expect from your data. (2021). A platform to programmatically author, schedule, and monitor workflows. (2021). Extended: A production-ready ML platform for TensorFlow. (2021). The New Standard in Dataflow Automation. (2021). Feature Store for Machine Learning. (2021). Learning Engineering” by Andriy Burkov.“Data Engineering Cookbook” by Andreas Kretz.“Hands-On Data Engineering with Python” by James Lee.

Source link

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button
Translate »