MLOps-Tips and Tricks-75 Code Snippets | by Senthil E | Mar, 2023
MLOps and Data Engineering
Introduction:
MLOps, or Machine Learning Operations, refers to the set of practices that streamline the development, deployment, and maintenance of machine learning models, bridging the gap between data science and software engineering. This article aims to provide valuable tips and tricks for MLOps and data engineering, covering a wide range of topics such as model training, data preprocessing, performance optimization, monitoring, and deployment.
Dask-ML-Parallelize model training:Use Dask-ML to train and evaluate your machine-learning models in parallel, leveraging the full power of your hardware.With Dask-ML, you can quickly scale your machine learning workloads across multiple cores, processors, or even clusters, making it easy to train and evaluate large models on large datasets.import dask_ml.model_selection as dcvfrom sklearn.datasets import make_classificationfrom sklearn.svm import SVC
# Create a large datasetX, y = make_classification(n_samples=100000, n_features=20, random_state=42)
# Define your modelmodel = SVC()
# Train your model in parallel using Dask-MLparams = {“C”: dcv.Categorical([0.1, 1, 10]), “kernel”: dcv.Categorical([“linear”, “rbf”])}search = dcv.RandomizedSearchCV(model, params, n_iter=10, cv=3)search.fit(X, y)
Check for more information.
2. Feature Tools: Featuretools is an open-source Python library for automated feature engineering, allowing you to generate new features from your raw data with minimal manual effort.
import featuretools as ft
# Load your raw data into an entitysetes = ft.EntitySet(id=”my_data”)es = es.entity_from_dataframe(entity_id=”customers”, dataframe=data, index=”customer_id”)
# Define relationships between entities# …
# Automatically generate new featuresfeature_matrix, feature_defs = ft.dfs(entityset=es, target_entity=”customers”, max_depth=2)
For more information please check.
3. Tensorboard: TensorBoard is a powerful visualization tool for TensorFlow that allows you to monitor your model’s performance and track various metrics during training and evaluation.
import tensorflow as tffrom tensorflow.keras.callbacks import TensorBoard
# Define your modelmodel = tf.keras.Sequential([…])
# Compile your modelmodel.compile(optimizer=’adam’, loss=’sparse_categorical_crossentropy’, metrics=[‘accuracy’])
# Create a TensorBoard callbacktensorboard_callback = TensorBoard(log_dir=”./logs”)
# Train your model with the TensorBoard callbackmodel.fit(X_train, y_train, epochs=10, validation_data=(X_test, y_test), callbacks=[tensorboard_callback])
For more information please check.
4. Tensorflow Serving:
TensorFlow Serving is a high-performance serving system for machine learning models, designed for production environments.TensorFlow Serving supports multiple models, model versioning, and automatic loading and unloading of models, making it easy to manage and serve your machine learning models at scale.# Save your TensorFlow model in the SavedModel formatmodel.save(“my_model/1/”)
# Install TensorFlow Servingecho “deb [arch=amd64] stable tensorflow-model-server tensorflow-model-server-universal” | sudo tee /etc/apt/sources.list.d/tensorflow-serving.list && \curl | sudo apt-key add -sudo apt-get update && sudo apt-get install tensorflow-model-server
# Start TensorFlow Serving with your modeltensorflow_model_server –rest_api_port=8501 –model_name=my_model –model_base_path=$(pwd)/my_model
For more information please check.
5. Automate hyperparameter tuning with Optuna: Optuna is a powerful and flexible optimization library that can automatically explore and optimize hyperparameters for your machine-learning models.
import optunafrom sklearn.model_selection import cross_val_scorefrom sklearn.ensemble import RandomForestClassifier
def objective(trial):n_estimators = trial.suggest_int(“n_estimators”, 10, 200)max_depth = trial.suggest_int(“max_depth”, 3, 20)
clf = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth)score = cross_val_score(clf, X_train, y_train, cv=5).mean()
return score
study = optuna.create_study(direction=”maximize”)study.optimize(objective, n_trials=50)
best_params = study.best_params
For more information please check.
6. SHAP: Use SHAP (SHapley Additive exPlanations) to explain the output of your machine learning models and gain insights into their behavior.
import shapfrom sklearn.model_selection import train_test_splitfrom sklearn.ensemble import RandomForestRegressor
# Load and prepare your data# …
# Train a RandomForestRegressorX_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)model = RandomForestRegressor()model.fit(X_train, y_train)
# Explain the model’s predictions using SHAPexplainer = shap.Explainer(model)shap_values = explainer(X_test)
# Plot the SHAP values for a single predictionshap.plots.waterfall(shap_values[0])
For more information please check.
7. Ray: Ray Tune is a powerful and flexible library for distributed hyperparameter tuning, allowing you to leverage the full power of your hardware to optimize your machine learning models.
from ray import tunefrom ray.tune.schedulers import ASHASchedulerfrom sklearn.datasets import make_classificationfrom sklearn.model_selection import cross_val_scorefrom sklearn.ensemble import RandomForestClassifier
def train_model(config):n_estimators = config[“n_estimators”]max_depth = config[“max_depth”]
clf = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth)score = cross_val_score(clf, X_train, y_train, cv=3).mean()
tune.report(mean_accuracy=score)
# Load your dataX, y = make_classification(n_samples=1000, n_features=20, random_state=42)
# Define the search space for hyperparametersconfig = {“n_estimators”: tune.randint(10, 200),”max_depth”: tu:ne.randint(3, 20)}
# Set up Ray Tunescheduler = ASHAScheduler(metric=”mean_accuracy”, mode=”max”)analysis = tune.run(train_model, config=config, scheduler=scheduler, num_samples=50)
# Get the best hyperparametersbest_params = analysis.best_config
For more information please check.
8. Experiment tracking with MLflow: MLflow, you can compare different runs, reproduce previous results, and share your work with others, making collaboration and iteration more efficient.
import mlflowimport mlflow.sklearnfrom sklearn.datasets import load_irisfrom sklearn.model_selection import train_test_splitfrom sklearn.ensemble import RandomForestClassifierfrom sklearn.metrics import accuracy_score
# Load your datairis = load_iris()X, y = iris.data, iris.targetX_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# Train your model and log metrics with MLflowwith mlflow.start_run():clf = RandomForestClassifier(n_estimators=100, max_depth=10)clf.fit(X_train, y_train)
train_accuracy = clf.score(X_train, y_train)test_accuracy = clf.score(X_test, y_test)
mlflow.log_param(“n_estimators”, 100)mlflow.log_param(“max_depth”, 10)mlflow.log_metric(“train_accuracy”, train_accuracy)mlflow.log_metric(“test_accuracy”, test_accuracy)
mlflow.sklearn.log_model(clf, “model”)
For more information please check.
9. Scikit-learn: Pipeline: Use Scikit-learn Pipeline to chain multiple preprocessing steps and a final estimator.
from sklearn.pipeline import Pipelinefrom sklearn.preprocessing import StandardScalerfrom sklearn.linear_model import LogisticRegression
pipe = Pipeline([(“scaler”, StandardScaler()),(“classifier”, LogisticRegression())])
pipe.fit(X_train, y_train)
10. Scikit-learn: Grid search: Use GridSearchCV to perform hyperparameter tuning.
from sklearn.model_selection import GridSearchCV
param_grid = {“classifier__C”: [0.1, 1, 10],”classifier__penalty”: [“l1”, “l2”]}
grid_search = GridSearchCV(pipe, param_grid, cv=5)grid_search.fit(X_train, y_train)
11. Joblib:joblib is a popular library for saving and loading Scikit-learn models. Use dump() to save a model to a file, and load() to restore the model from the file.
import joblib
# Save the modeljoblib.dump(grid_search.best_estimator_, “model.pkl”)
# Load the modelloaded_model = joblib.load(“model.pkl”)
12. Tensorflow: Simple neural network. Use the Keras API to define a simple feedforward neural network with dense (fully connected) layers.
import tensorflow as tf
model = tf.keras.Sequential([tf.keras.layers.Dense(64, activation=”relu”, input_shape=(10,)),tf.keras.layers.Dense(32, activation=”relu”),tf.keras.layers.Dense(1, activation=”sigmoid”)])
model.compile(optimizer=”adam”, loss=”binary_crossentropy”, metrics=[“accuracy”])
13. Early Stopping: Code snippet for early stopping
early_stopping = tf.keras.callbacks.EarlyStopping(monitor=”val_loss”, patience=3)
history = model.fit(X_train, y_train, epochs=100, validation_split=0.2, callbacks=[early_stopping])
14. Tensorflow Model-Save and Load: Use the save() method to save the model architecture, weights, and optimizer state to a single file. Use load_model() to restore the saved model from the file.
# Save the modelmodel.save(“model.h5”)
# Load the modelloaded_model = tf.keras.models.load_model(“model.h5”)
15. Dask: Parallelize operations: Use Dask to parallelize operations on large datasets.
import dask.array as da
x = da.ones((10000, 10000), chunks=(1000, 1000))y = x + x.Tz = y.sum(axis=0)result = z.compute()
16. TPOT: Automated machine learning: TPOT (Tree-based Pipeline Optimization Tool) is a genetic algorithm-based automated machine learning library. Use TPOTClassifier or TPOTRegressor to optimize a machine learning pipeline for your data.
from tpot import TPOTClassifier
tpot = TPOTClassifier(generations=5, population_size=20, verbosity=2)tpot.fit(X_train, y_train)
For more information please check.
17. Category Encoders: Category Encoders is a library that provides various encoding methods for categorical variables, such as target encoding, one-hot encoding, and ordinal encoding.
import category_encoders as ce
encoder = ce.TargetEncoder()X_train_encoded = encoder.fit_transform(X_train, y_train)X_test_encoded = encoder.transform(X_test)
18. Imbalanced-learn: is a library that provides various techniques for handling imbalanced datasets, such as oversampling, undersampling, and combination methods. Use the appropriate resampling technique, such as SMOTE, to balance your dataset before training your model.
from imblearn.over_sampling import SMOTE
smote = SMOTE()X_resampled, y_resampled = smote.fit_resample(X_train, y_train)
For more information please check.
19. Auto-sklearn: is an automated machine learning library that wraps Scikit-learn, providing the automated model and preprocessing selection. Use AutoSklearnClassifier or AutoSklearnRegressor to optimize a machine learning pipeline data.
from autosklearn.classification import AutoSklearnClassifier
auto_classifier = AutoSklearnClassifier(time_left_for_this_task=600)auto_classifier.fit(X_train, y_train)
20. Scikit-learn: Column Transformer: ColumnTransformer allows you to apply different preprocessing steps to different columns of your input data, which is particularly useful when dealing with mixed data types.
from sklearn.compose import ColumnTransformerfrom sklearn.preprocessing import StandardScaler, OneHotEncoder
preprocessor = ColumnTransformer(transformers=[(“num”, StandardScaler(), [“numerical_feature_1”, “numerical_feature_2”]),(“cat”, OneHotEncoder(), [“categorical_feature”]),])
X_train_transformed = preprocessor.fit_transform(X_train)X_test_transformed = preprocessor.transform(X_test)
21. RandomizedSearchCV is an alternative to GridSearchCV that searches the parameter space more efficiently by randomly sampling a fixed number of parameter settings. Define a parameter distribution as a dictionary, where the keys are the parameter names (including the step name if using a pipeline) and the values are distributions from which to sample parameter values. Pass the model (or pipeline) and parameter distribution to RandomizedSearchCV and fit the data.
from sklearn.model_selection import RandomizedSearchCVfrom scipy.stats import uniform
param_dist = {“classifier__C”: uniform(loc=0, scale=4),”preprocessor__num__with_mean”: [True, False],}
random_search = RandomizedSearchCV(pipe, param_dist, n_iter=10, cv=5, scoring=”accuracy”)random_search.fit(X_train, y_train)
22. TensorFlow Data Validation: Use TensorFlow Data Validation (TFDV) to validate and explore your data.
import tensorflow_data_validation as tfdv
stats = tfdv.generate_statistics_from_csv(data_location=”train.csv”)schema = tfdv.infer_schema(statistics=stats)tfdv.display_schema(schema=schema)
23. TensorFlow Model Analysis: Use TensorFlow Model Analysis (TFMA) to evaluate your TensorFlow models.
import tensorflow_model_analysis as tfma
eval_shared_model = tfma.default_eval_shared_model(eval_saved_model_path=”path/to/saved_model”)results = tfma.run_model_analysis(eval_shared_model=eval_shared_model,data_location=”test.tfrecords”,file_format=”tfrecords”,slice_spec=[tfma.slicer.SingleSliceSpec()])tfma.view.render_slicing_metrics(results)
24. TensorFlow Transform: Use TensorFlow Transform (TFT) to preprocess your data for TensorFlow models.
import tensorflow_transform as tft
def preprocessing_fn(inputs):outputs = {}outputs[“scaled_feature”] = tft.scale_to_z_score(inputs[“numerical_feature”])outputs[“one_hot_feature”] = tft.compute_and_apply_vocabulary(inputs[“categorical_feature”])return outputs
25. TensorFlow Extended (TFX): Use TensorFlow Extended (TFX) to create end-to-end machine learning pipelines.
from tfx.components import CsvExampleGen, Trainerfrom tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
context = InteractiveContext()
example_gen = CsvExampleGen(input_base=”path/to/data”)context.run(example_gen)
trainer = Trainer(module_file=”path/to/trainer_module.py”,examples=example_gen.outputs[“examples”],train_args=trainer_pb2.TrainArgs(num_steps=10000),eval_args=trainer_pb2.EvalArgs(num_steps=5000))context.run(trainer)
26. CuPy: CuPy is a library that provides a NumPy-like interface for GPU-accelerated computing. Use CuPy arrays, which have a similar interface to NumPy arrays, to perform computations on GPU. Many common NumPy functions are available in CuPy, allowing you to perform GPU-accelerated computations with familiar syntax.
import cupy as cp
x = cp.array([1, 2, 3, 4, 5])y = cp.array([6, 7, 8, 9, 10])
z = cp.dot(x, y)
27. RAPIDS is a suite of GPU-accelerated libraries for data science, including cuDF (GPU-accelerated DataFrame library similar to Pandas) and cuML (GPU-accelerated machine learning library similar to Scikit-learn). Use cuDF DataFrames to perform data manipulation tasks on GPU, and cuML models to train and evaluate machine learning models on GPU.
import cudfimport cuml
df = cudf.read_csv(“data.csv”)kmeans_model = cuml.KMeans(n_clusters=5)kmeans_model.fit(df)
28. FastAPI is a modern, high-performance web framework for building APIs with Python, particularly suitable for machine learning models.
Create an instance of FastAPI, and define API endpoints using decorators, such as @app.post().Use uvicorn to run your FastAPI application, specifying the host and port.from fastapi import FastAPIimport uvicorn
app = FastAPI()
@app.post(“/predict”)async def predict(text: str):prediction = model.predict([text])return {“prediction”: prediction}
if __name__ == “__main__”:uvicorn.run(app, host=”0.0.0.0″, port=8000)
29. Streamlit is a library for quickly creating interactive web applications for machine learning and data science, using only Python.
Use Streamlit’s simple API to create user interface elements, such as text inputs and sliders, and display output or visualizations.Run the Streamlit app using the command streamlit run app.py in your terminal.import streamlit as st
st.title(“My Streamlit App”)
input_text = st.text_input(“Enter some text:”)st.write(f”You entered: {input_text}”)
slider_value = st.slider(“Select a value:”, 0, 100, 50)st.write(f”Slider value: {slider_value}”)
For more information please check.
30. Docker File: Create a Dockerfile to define a custom Docker image for your machine learning application.
FROM python:3.8
WORKDIR /appCOPY requirements.txt .RUN pip install –no-cache-dir -r requirements.txt
COPY . .
CMD [“python”, “app.py”]
Use the FROM keyword to specify the base image, such as the official Python image.Use the WORKDIR keyword to set the working directory for subsequent instructions.Use the COPY keyword to copy files and directories from the host system to the image.Use the RUN keyword to execute commands during the build process, such as installing dependencies.Use the CMD keyword to define the default command to run when the container starts.
31. Build a docker image:
docker build -t my_ml_app:latest .
32. Run a docker container: Use the docker run command to create and start a Docker container from an image. Use the -p flag to map a host port to a container port, allowing external access to services running inside the container.
docker run -p 5000:5000 my_ml_app:latest
32. Kubernetes YAML Config File:
apiVersion: apps/v1kind: Deploymentmetadata:name: my-ml-appspec:replicas: 3selector:matchLabels:app: my-ml-apptemplate:metadata:labels:app: my-ml-appspec:containers:- name: my-ml-app-containerimage: my_ml_app:latestports:- containerPort: 5000
—
apiVersion: v1kind: Servicemetadata:name: my-ml-app-servicespec:selector:app: my-ml-appports:- protocol: TCPport: 80targetPort: 5000type: LoadBalancer
Use apiVersion, kind, and metadata to define the Kubernetes resource type and metadata.Use spec to define the desired state of the resource, such as the number of replicas, container images, and exposed ports.Use the — separator to define multiple resources in the same file, such as a Deployment and a Service.
33. kubectl: Use the kubectl command-line tool to manage the Kubernetes cluster and resources.
# Apply the Kubernetes configuration filekubectl apply -f my_ml_app.yaml
# List all deploymentskubectl get deployments
# List all serviceskubectl get services
# Scale the deploymentkubectl scale deployment my-ml-app –replicas=5
# Delete the deployment and servicekubectl delete -f my_ml_app.yaml
34. Organize your project:
my_ml_project/|– data/| |– raw/| |– processed/|– models/|– notebooks/|– src/| |– features/| |– models/| |– utils/|– Dockerfile|– requirements.txtUse separate directories for data, models, notebooks, and source code.Further, subdivide directories to separate raw and processed data or different types of source code modules.
35. Model versioning: Use model versioning tools like DVC or MLflow to track different versions of your trained machine learning models.
Store model artifacts (e.g., weights, metadata) in a centralized storage system, such as Amazon S3 or Google Cloud Storage.Use a versioning tool to keep track of model versions, their associated training data, and hyperparameters.Enable easy model comparison and reproducibility by tracking performance metrics and training configurations.
36. Automated testing:
Use testing libraries like unittest or pytest to write and run tests.Test individual functions and classes with unit tests, and test interactions between components with integration tests.Perform end-to-end tests to ensure the entire system works as expected, including model serving and API endpoints.
37. Papermill:
Papermill allows you to parameterize Jupyter Notebooks by injecting new values for specific cells.Execute Notebooks programmatically and generate reports with different parameter values without manual intervention.import papermill as pm
pm.execute_notebook(input_path=’input_notebook.ipynb’,output_path=’output_notebook.ipynb’,parameters={‘param1’: ‘value1’, ‘param2’: ‘value2’})
38. Environment management: tools like Conda or virtualenv to create isolated environments for projects.
# Create a new Conda environmentconda create -n my_ml_env python=3.8
# Activate the environmentconda activate my_ml_env
# Install packagesconda install pandas scikit-learn
# Deactivate the environmentconda deactivate
39. Progressive model loading: Load large models in chunks to reduce memory consumption and improve performance.
import numpy as npimport pandas as pdfrom sklearn.linear_model import LinearRegression
chunksize = 10000model = LinearRegression()
for i, chunk in enumerate(pd.read_csv(“large_dataset.csv”, chunksize=chunksize)):X_chunk = chunk.drop(“target”, axis=1)y_chunk = chunk[“target”]model.partial_fit(X_chunk, y_chunk)print(f”Processed chunk {i + 1}”)
40. Feature encoding:
Feature encoding techniques transform categorical variables into numerical representations that machine learning models can use.One-hot encoding creates binary columns for each category, while target encoding replaces each category with the mean of the target variable for that category.import pandas as pdfrom sklearn.preprocessing import OneHotEncoder
data = pd.DataFrame({“Category”: [“A”, “B”, “A”, “C”]})
encoder = OneHotEncoder()encoded_data = encoder.fit_transform(data)
print(encoded_data.toarray())
41. Data validation: Validate the quality and consistency of your data using data validation frameworks like Great Expectations, Pandera, or custom validation functions.
import pandera as pafrom pandera import DataFrameSchema, Column, Check
schema = DataFrameSchema({“age”: Column(pa.Int, Check(lambda x: 18 <= x <= 100)),”income”: Column(pa.Float, Check(lambda x: x >= 0)),”gender”: Column(pa.String, Check(lambda x: x in [“M”, “F”, “Other”])),})
# Validate your DataFramevalidated_df = schema.validate(df)
42. Data versioning: Use data versioning tools like DVC or Pachyderm to track changes to your datasets and ensure reproducibility across different experiments and model versions.
# Initialize DVC in your projectdvc init
# Add your dataset to DVCdvc add data/my_dataset
# Commit the changes to your Git repositorygit add data/my_dataset.dvc .dvc/configgit commit -m “Add my_dataset to DVC”
43. Use feature stores: Implement feature stores like Feast or Hopsworks to store, manage, and serve features for machine learning models.
from feast import FeatureStore
# Initialize the feature storestore = FeatureStore(repo_path=”path/to/your/feature_store”)
# Fetch features for trainingtraining_df = store.get_historical_features(entity_df=entity_df,feature_refs=[“your_feature_name”]).to_df()
# Fetch features for servingfeature_vector = store.get_online_features(feature_refs=[“your_feature_name”],entity_rows=[{“your_entity_key”: “your_value”}]).to_dict()
Feature stores can help you centralize the management of your features, ensuring consistency and reducing duplication across different models and experiments.
44. Feature scaling: Apply feature scaling techniques like MinMax scaling, standard scaling, or normalization to ensure that your features have similar scales and distributions.
from sklearn.datasets import load_irisfrom sklearn.preprocessing import StandardScaler
X, y = load_iris(return_X_y=True)
# Scale features using standard scalingscaler = StandardScaler()X_scaled = scaler.fit_transform(X)
45. Dimensionality reduction: Apply dimensionality reduction techniques like PCA, t-SNE, or UMAP to reduce the number of features in your dataset while preserving important patterns and relationships.
from sklearn.datasets import load_irisfrom sklearn.decomposition import PCA
X, y = load_iris(return_X_y=True)
# Apply PCA to reduce the dimensionality of the datasetpca = PCA(n_components=2)X_reduced = pca.fit_transform(X)
46. Pandas chaining: Chain Pandas operations together to create more readable and concise data manipulation code.
import pandas as pd
data = pd.read_csv(“my_data.csv”)
# Chain Pandas operationsresult = (data.query(“age >= 30”).groupby(“city”).agg({“salary”: “mean”}).sort_values(“salary”, ascending=False))
47. Use the ‘pipe’ function: Use the pipe function to integrate custom functions or operations in your Pandas chaining workflow.
import pandas as pd
def custom_operation(df, column, value):return df[df[column] > value]
data = pd.read_csv(“data.csv”)
# Integrate custom operations using ‘pipe’result = (data.pipe(custom_operation, “age”, 18).groupby(“city”).agg({“salary”: “mean”}).sort_values(“salary”, ascending=False))
48. Pandas’ built-in plotting: Use Pandas’ built-in plotting functions for quick and easy data visualization.
import pandas as pd
data = pd.read_csv(“my_data.csv”)
# Create a bar plot of average salary by citydata.groupby(“city”)[“salary”].mean().plot(kind=”bar”)
49. Visualize missing data with Missingno: Use the Missingno library to visualize missing data in your dataset.
import pandas as pdimport missingno as msno
data = pd.read_csv(“data.csv”)
# Visualize missing datamsno.matrix(data)
50. Use SQL Databases: You can use the sqlite3 library in Python to interact with an SQLite database. For example, you can create a table in an SQLite database and insert some data into it:
import sqlite3
# Connect to an SQLite databaseconn = sqlite3.connect(‘example.db’)
# Create a tableconn.execute(‘CREATE TABLE IF NOT EXISTS my_table (id INTEGER PRIMARY KEY, name TEXT)’)
# Insert some dataconn.execute(‘INSERT INTO my_table (id, name) VALUES (?, ?)’, (1, ‘John’))conn.execute(‘INSERT INTO my_table (id, name) VALUES (?, ?)’, (2, ‘Jane’))
# Commit the changesconn.commit()
# Retrieve datacursor = conn.execute(‘SELECT * FROM my_table’)for row in cursor:print(row)
51. Requests Library: Use the requests library to make HTTP requests: The requests library provides a simple way to make HTTP requests to APIs or websites. Here’s an example of how to make a GET request.
import requests
# make a GET request to a websiteresponse = requests.get(‘https://www.google.com’)
# print the response contentprint(response.content)
52. OS Library: Use the os library to manipulate files and directories: The os library provides functions for interacting with files and directories.
import os
# create a directoryos.mkdir(‘my_directory’)
53. Working with JSON:
Encoding Python data to JSON format:
import json
data = {“name”: “Mark”,”age”: 28,”gender”: “Male”}
json_data = json.dumps(data)print(json_data)
Decoding JSON data to Python format:
import json
json_data = ‘{“name”: “Mark”, “age”: 28, “gender”: “Male”}’
data = json.loads(json_data)print(data)
54. Working with CSV Files: USing CSV module.
import csv
# Reading a CSV filewith open(‘example.csv’, ‘r’) as file:csv_reader = csv.reader(file)for row in csv_reader:print(row)
# Writing to a CSV filewith open(‘example.csv’, ‘w’, newline=”) as file:csv_writer = csv.writer(file)csv_writer.writerow([‘Name’, ‘Age’, ‘Gender’])csv_writer.writerow([‘John’, 25, ‘Male’])csv_writer.writerow([‘Jane’, 30, ‘Female’])
55. Using SQL Alchemy for Database Access: SQL Alchemy is a popular Python library for working with databases. It provides a simple interface for connecting to various databases and executing SQL queries.
from sqlalchemy import create_engine
# Connect to a PostgreSQL databaseengine = create_engine(‘postgresql://username:password@host:port/database_name’)
# Execute a SQL query and return the results as a dataframequery = “SELECT * FROM table_name WHERE column_name > 100″df = pd.read_sql(query, engine)
# Write a dataframe to a new table in the databasedf.to_sql(‘new_table_name’, engine)
56. Feature selection using Recursive Feature Elimination (RFE):
RFE helps identify the most important features, leading to better model performance and faster training.Feature selection can reduce overfitting and improve the generalization of your model.from sklearn.datasets import load_irisfrom sklearn.feature_selection import RFEfrom sklearn.linear_model import LogisticRegression
# Load your datairis = load_iris()X, y = iris.data, iris.target
# Create a Logistic Regression modelmodel = LogisticRegression()
# Perform Recursive Feature Eliminationrfe = RFE(model, n_features_to_select=2)rfe.fit(X, y)
# Get the most important featuresimportant_features = rfe.support_
57. Use Apache Parquet for efficient storage of columnar data: Apache Parquet is a columnar storage file format that provides efficient compression and encoding schemes, making it ideal for storing large datasets used in machine learning.
import pandas as pdimport pyarrow as paimport pyarrow.parquet as pq
# Read a CSV file using pandasdata = pd.read_csv(“data.csv”)
# Convert the pandas DataFrame to an Apache Arrow Tabletable = pa.Table.from_pandas(data)
# Write the Arrow Table to a Parquet filepq.write_table(table, “data.parquet”)
# Read the Parquet file into a pandas DataFramedata_from_parquet = pq.read_table(“data.parquet”).to_pandas()
58. Use Apache Kafka for real-time data streaming: Apache Kafka is a distributed streaming platform that enables you to build real-time data pipelines and applications.
from kafka import KafkaProducer, KafkaConsumer
# Create a Kafka producerproducer = KafkaProducer(bootstrap_servers=”localhost:9092″)
# Send a message to a Kafka topicproducer.send(“my_topic”, b”Hello, Kafka!”)
# Create a Kafka consumerconsumer = KafkaConsumer(“my_topic”, bootstrap_servers=”localhost:9092″)
# Consume messages from the Kafka topicfor msg in consumer:print(msg.value)
59. Partition your data for efficient querying: Partitioning your data can help improve query performance by reducing the amount of data that needs to be read for a given query.
import pandas as pdimport pyarrow as paimport pyarrow.parquet as pq
# Read a CSV file using pandasdata = pd.read_csv(“data.csv”)
# Convert the pandas DataFrame to an Apache Arrow Tabletable = pa.Table.from_pandas(data)
# Write the Arrow Table to a partitioned Parquet datasetpq.write_to_dataset(table, root_path=”partitioned_data”, partition_cols=[“state”])
# Read the partitioned Parquet dataset into a pandas DataFramedata_from_partitioned_parquet = pq.ParquetDataset(“partitioned_data”).read().to_pandas()
60. Use data augmentation techniques to increase dataset size: Data augmentation involves creating new training examples by applying various transformations to the existing data, which can help improve model performance.
import numpy as npimport tensorflow as tffrom tensorflow.keras.preprocessing.image import ImageDataGenerator
# Define an image data generator for data augmentationdatagen = ImageDataGenerator(rotation_range=20,width_shift_range=0.2,height_shift_range=0.2,horizontal_flip=True,)
# Load your data(x_train, y_train), (_, _) = tf.keras.datasets.cifar10.load_data()x_train = x_train.astype(np.float32) / 255.0
# Fit the data generator to your datadatagen.fit(x_train)
# Train your model with augmented datamodel = create_your_model()model.compile(optimizer=”adam”, loss=”sparse_categorical_crossentropy”, metrics=[“accuracy”])model.fit(datagen.flow(x_train, y_train, batch_size=32), epochs=10)
61. Using Flask for model deployment: Below is an example of how to use Flask to deploy a machine learning model:
from flask import Flask, request, jsonifyimport joblib
app = Flask(__name__)
@app.route(‘/predict’, methods=[‘POST’])def predict():data = request.get_json()features = [data[‘feature1’], data[‘feature2’], data[‘feature3’]]model = joblib.load(‘model.pkl’)prediction = model.predict([features])[0]response = {‘prediction’: int(prediction)}return jsonify(response)
if __name__ == ‘__main__’:app.run()
62. Using Pytest for testing:
For example, we have a file called math_operations.py.
# math_operations.py
def add(a, b):return a + b
def multiply(a, b):return a * b
Next, create a test module with the same name as your module, but with a test_ prefix. In our case, we’ll create a file calledtest_math_operations.py:
# test_math_operations.py
import math_operations
def test_add():assert math_operations.add(2, 3) == 5assert math_operations.add(-1, 1) == 0assert math_operations.add(0, 0) == 0
def test_multiply():assert math_operations.multiply(2, 3) == 6assert math_operations.multiply(-1, 1) == -1assert math_operations.multiply(0, 0) == 0
Run the tests using the pytest command
pytest test_math_operations.py
Pytest will discover and run the test functions in the test_math_operations.py module.
63. Use automated data pipelines: Automated data pipelines can help you automate the process of data ingestion, cleaning, and transformation. Some of the important tools are
Apache Airflow Ml pipeline
from airflow import DAGfrom airflow.operators.python_operator import PythonOperatorfrom datetime import datetime
def preprocess_data():# Preprocess data herepass
def train_model():# Train model herepass
default_args = {‘owner’: ‘myname’,’start_date’: datetime(2023, 3, 15),’retries’: 1,’retry_delay’: timedelta(minutes=5),}
with DAG(‘my_dag’, default_args=default_args, schedule_interval=’@daily’) as dag:preprocess_task = PythonOperator(task_id=’preprocess_task’, python_callable=preprocess_data)train_task = PythonOperator(task_id=’train_task’, python_callable=train_model)
preprocess_task >> train_task
64. Use Transfer Learning: Transfer learning can help you reuse and adapt pre-trained machine learning models for your own use cases. Here’s an example of how to use transfer learning with TensorFlow:
import tensorflow as tffrom tensorflow.keras.applications import VGG16
# Load pre-trained modelbase_model = VGG16(weights=’imagenet’, include_top=False, input_shape=(224, 224, 3))
# Freeze base layersfor layer in base_model.layers:layer.trainable = False
# Add custom top layersx = base_model.outputx = tf.keras.layers.GlobalAveragePooling2D()(x)x = tf.keras.layers.Dense(256, activation=’relu’)(x)predictions = tf.keras.layers.Dense(10, activation=’softmax’)(x)
# Create new modelmodel = tf.keras.models.Model(inputs=base_model.input, outputs=predictions)
# Compile and train modelmodel.compile(optimizer=’adam’, loss=’categorical_crossentropy’, metrics=[‘accuracy’])model.fit(X_train, y_train, epochs=10)
65. Use automated machine learning(Auto ML): By using platforms like H2O.ai or Google Cloud AutoML, you can automatically select, train, and deploy models based on your data and requirements. Here’s an example of how to use H2O.ai’s AutoML platform:
import h2ofrom h2o.automl import H2OAutoML
# Start H2O clusterh2o.init()
# Load datadata = h2o.import_file(‘my-data.csv’)
# Define target variabletarget = ‘label’
# Split data into train and test setstrain, test = data.split_frame(ratios=[0.8])
# Define AutoML settingsautoml = H2OAutoML(max_models=10, seed=1234)
# Train AutoML modelautoml.train(x=data.columns, y=target, training_frame=train)
# Evaluate AutoML modelpredictions = automl.leader.predict(test)accuracy = (predictions[‘predict’] == test[target]).mean()print(f’Accuracy: {accuracy}’)
66. Use anomaly detection: By using libraries like PyOD or TensorFlow, you can detect anomalies based on statistical or machine learning techniques. Here’s an example of how to use PyOD to detect anomalies in a dataset:
import numpy as npfrom pyod.models.knn import KNN
# Load dataX = np.load(‘my-data.npy’)
# Define anomaly detectordetector = KNN(n_neighbors=5)
# Train detectordetector.fit(X)
# Detect anomaliesanomaly_scores = detector.decision_scores_threshold = np.percentile(anomaly_scores, 95)anomalies = np.where(anomaly_scores > threshold)
# Print anomaliesprint(f’Anomalies: {anomalies}’)
67. Using Weights and Biases: Here’s an example of how to use Weights & Biases to run and track machine learning experiments.
import wandbimport tensorflow as tf
# Initialize W&Bwandb.init(project=’my-project’)
# Load datadata = tf.data.TFRecordDataset(‘my-data.tfrecord’)
# Define hyperparametersconfig = wandb.configconfig.learning_rate = 0.1config.num_epochs = 10
# Define modelmodel = tf.keras.models.Sequential([tf.keras.layers.Dense(32, activation=’relu’),tf.keras.layers.Dense(10, activation=’softmax’)])model.compile(optimizer=tf.keras.optimizers.Adam(), loss=’categorical_crossentropy’, metrics=[‘accuracy’])
# Train modelhistory = model.fit(data.batch(32), epochs=config.num_epochs)
# Log metrics and artifacts to W&Bwandb.log({‘accuracy’: history.history[‘accuracy’][-1]})wandb.log_artifact(‘my-model.h5’)
68. Important tools managing machine learning workflows:
69. Use Data Compression: Consider using tools and libraries such as zlib, gzip, or bz2 for data compression in Python.
import zlib
# Compress data with zlibdata_compressed = zlib.compress(data)
# Decompress data with zlibdata_decompressed = zlib.decompress(data_compressed)
70. Data serialization: Consider using tools and libraries such as JSON, YAML, or protobuf for data serialization in Python.
import json
# Serialize data to JSONdata_json = json.dumps(data)
# Deserialize data from JSONdata_deserialized = json.loads(data_json)
71. Data normalization and scaling: Consider using tools and libraries such as scikit-learn, TensorFlow, or PyTorch for data normalization and scaling in Python.
import pandas as pdfrom sklearn.preprocessing import StandardScaler, MinMaxScaler
# Standardize data with Z-score normalizationscaler = StandardScaler()data_normalized = scaler.fit_transform(data)
# Scale data with min-max scalingscaler = MinMaxScaler()data_scaled = scaler.fit_transform(data)
72. Data encryption and security: Consider using tools and libraries such as cryptography, Fernet, or PyAesCrypt for data encryption and security in Python.
from cryptography.fernet import Fernet
# Generate encryption key with Fernetkey = Fernet.generate_key()
# Encrypt data with Fernetcipher_suite = Fernet(key)encrypted_data = cipher_suite.encrypt(data)
# Decrypt data with Fernetdecrypted_data = cipher_suite.decrypt(encrypted_data)
import hashlib
# Hash data with hashlibhash_value = hashlib.sha256(data.encode(‘utf-8’)).hexdigest()
import tokenizers
# Define tokenization with tokenizerstokenizer = tokenizers.Tokenizer(tokenizers.models.WordPiece(‘vocab.txt’, unk_token='[UNK]’))encoded_data = tokenizer.encode(data).ids
73. Data Validation using Great Expectation:
import great_expectations as ge
# Load a dataset (e.g., a Pandas DataFrame)data = ge.read_csv(“data.csv”)
# Create an Expectation Suiteexpectation_suite = data.create_expectation_suite(“my_suite”)
# Add expectationsdata.expect_column_values_to_be_unique(“id”)data.expect_column_values_to_not_be_null(“name”)data.expect_column_mean_to_be_between(“age”, min_value=20, max_value=40)
# Validate data against the Expectation Suitevalidation_result = data.validate(expectation_type=”basic”)
# Save the Expectation Suite and the validation resultge.save_expectation_suite(expectation_suite, “my_suite.json”)ge.save_validation_result(validation_result, “my_suite_validation.json”)
74. logging module: Use the logging module for flexible logging.
import logging
logging.basicConfig(level=logging.INFO)logging.info(“This is an info message.”)logging.error(“This is an error message.”)
75. Use Dask dataframe : Dask is a powerful library for parallel and distributed computing in Python. It allows you to process large datasets that don’t fit into memory by breaking them into smaller chunks and processing them in parallel.
import dask.dataframe as dd
# Read CSV file using Dask (file is partitioned into smaller chunks)ddf = dd.read_csv(‘large_file.csv’)
# Perform operations on the data (lazy evaluation)filtered_ddf = ddf[ddf[‘column_A’] > 10]mean_value = filtered_ddf[‘column_B’].mean()
# Compute the result (operations are executed in parallel)result = mean_value.compute()print(“Mean of column B for rows where column A > 10:”, result)
Conclusion:
I hope the above tips and tricks are useful. Again there is a lot of tools and process in MLOps. The MLOps landscape also keeps changing very fast. It is better to keep updating with the latest tools and MLOps processes.
References:
Mlflow: An Open Platform to Simplify the Machine Learning Lifecycle. (2021). An open-source monitoring system with a dimensional data model. (2021). The open and composable observability and data visualization platform. (2021). Core: Deploy, scale & monitor your machine learning models in Kubernetes. (2021). The Machine Learning Toolkit for Kubernetes. (2021). Parallel computing with task scheduling. (2021). Retrieved from Expectations: Always know what to expect from your data. (2021). A platform to programmatically author, schedule, and monitor workflows. (2021). Extended: A production-ready ML platform for TensorFlow. (2021). The New Standard in Dataflow Automation. (2021). Feature Store for Machine Learning. (2021). Learning Engineering” by Andriy Burkov.“Data Engineering Cookbook” by Andreas Kretz.“Hands-On Data Engineering with Python” by James Lee.
Source link