10.1. Xorbits Data#
Xorbits Data is a distributed computing framework for data science, similar to Dask and Modin, used to accelerate pandas DataFrame and NumPy. Xorbits Data works by partitioning large datasets and then performing operations using pandas or NumPy. It uses its own Actor programming framework Xoscar, which does not rely on Ray or Dask.
Xorbits Cluster#
Before computation, Xorbits needs to initialize the cluster in a multi-node environment. For a single-machine environment, you can directly use xorbits.init()
for initialization. For a cluster environment, you can configure it by following these steps: first, start a management process (Supervisor), and then start Worker processes on each computing node.
# Start the Supervisor on the management node first
xorbits-supervisor -H <supervisor_ip> -p <supervisor_port> -w <web_port>
# Start the Worker on each computing node
xorbits-worker -H <worker_ip> -p <worker_port> -s <supervisor_ip>:<supervisor_port>
Here, <supervisor_ip>
and <supervisor_port>
are the IP and port number of the management node, and <web_port>
is the port number for the dashboard, which the client also uses to connect to the cluster. <worker_ip>
and <worker_port>
are the IP and port number of each computing node. After starting the Supervisor and Workers, use xorbits.init("<supervisor_ip>:<web_port>")
in your code to connect to the cluster, allowing the computation tasks to scale horizontally across the cluster.
API Compatibility#
In terms of compatibility with pandas DataFrame, Modin > Xorbits > Dask DataFrame; in terms of performance, Xorbits > Dask DataFrame > Modin. ( > means better than here)
Xorbits also partitions data along multiple dimensions, retaining row and column labels, and provides most of the pandas API, such as iloc()
and median()
.
import os
import sys
sys.path.append("..")
from utils import nyc_taxi
taxi_path = nyc_taxi()
import xorbits
import xorbits.pandas as pd
df = pd.read_parquet(taxi_path, use_arrow_dtype=False)
df.iloc[3]
/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/xorbits/_mars/deploy/oscar/session.py:1953: UserWarning: No existing session found, creating a new local session now.
warnings.warn(warning_msg)
2024-05-09 10:13:12,400 xorbits._mars.deploy.oscar.local 43280 WARNING Web service started at http://0.0.0.0:54965
VendorID 1
tpep_pickup_datetime 2023-01-01 00:03:48
tpep_dropoff_datetime 2023-01-01 00:13:25
passenger_count 0.0
trip_distance 1.9
RatecodeID 1.0
store_and_fwd_flag N
PULocationID 138
DOLocationID 7
payment_type 1
fare_amount 12.1
extra 7.25
mta_tax 0.5
tip_amount 0.0
tolls_amount 0.0
improvement_surcharge 1.0
total_amount 20.85
congestion_surcharge 0.0
airport_fee 1.25
Name: 3, dtype: object
df.dtypes
VendorID int64
tpep_pickup_datetime datetime64[us]
tpep_dropoff_datetime datetime64[us]
passenger_count float64
trip_distance float64
RatecodeID float64
store_and_fwd_flag object
PULocationID int64
DOLocationID int64
payment_type int64
fare_amount float64
extra float64
mta_tax float64
tip_amount float64
tolls_amount float64
improvement_surcharge float64
total_amount float64
congestion_surcharge float64
airport_fee float64
dtype: object
df['trip_distance'].median()
1.79
Deferred Execution#
Xorbits uses a computation graph similar to Dask, where any computation is first converted into a computation graph before execution; however, Xorbits does not require explicitly calling compute()
to trigger the computation. This method is known as deferred execution. Xorbits constructs the computation graph in the background, but only executes the graph when operations like print()
that need to present data to the user are encountered. This approach makes Xorbits’ semantics more similar to pandas and NumPy. If you want to manually trigger the computation, you can use xorbits.run(df)
.
For example, in the following data visualization, gb_time
is just a pointer to the computation graph, not the actual data, but when Plotly needs the result of gb_time
, Xorbits will trigger the computation.
df['PU_dayofweek'] = df['tpep_pickup_datetime'].dt.dayofweek
df['PU_hour'] = df['tpep_pickup_datetime'].dt.hour
gb_time = df.groupby(by=['PU_dayofweek', 'PU_hour'], as_index=False).agg(count=('PU_dayofweek', 'count'))
import plotly.express as px
import plotly.io as pio
pio.renderers.default = "notebook"
b = px.bar(
gb_time,
x='PU_hour',
y='count',
color='PU_dayofweek',
color_continuous_scale='sunset_r',
)
b.show()
Similar to using a computation graph, Xorbits does not require user attention to the details of the computation graph like Dask DataFrame, nor does it require the use of repartition()
. Xorbits optimizes the computation graph construction and execution in the background. When data skew occurs, Xorbits will automatically optimize the computation graph to avoid load unbalance.