In [2]:
import requests

class Result:
    def __init__(self, value=None, error=None):
        self.value = value
        self.error = error

    @classmethod
    def of(cls, func, *args, **kwargs):
        try:
            return cls(value=func(*args, **kwargs))
        except Exception as e:
            return cls(error=str(e))

    @staticmethod
    def success(x):
        return Result(value=x)

    @staticmethod
    def failure(error):
        return Result(error=error)

    def bind(self, func):
        if self.is_error():
            return self
        return Result.of(func, self.value)

    def is_error(self):
        return self.error is not None

    def __repr__(self):
        if self.is_error():
            return f"Error({self.error})"
        return f"Success({self.value})"
    
    def __eq__(self, other):
        return self.value == other.value
    
    def handle(self, success_func, error_func=None):
        if not self.is_error():
            return success_func(self.value)
        elif error_func:
            return error_func(self.error)
        return None
    
    def map(self, func):
        if self.is_error():
            return self
        return Result.of(func, self.value)
    
def resultify(func):
    def wrapper(*args, **kwargs):
        return Result.of(func, *args, **kwargs)
    return wrapper



In [3]:
import requests
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client, progress, LocalCluster

In [4]:
from dsmlibrary.datanode import DataNode 

datanode = DataNode(
  token="eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ0b2tlbl90eXBlIjoiYWNjZXNzIiwiZXhwIjoxNjk3Njg5OTQ4LCJpYXQiOjE2OTcwOTMxOTUsImp0aSI6IjY0NGFlNGEwOWJmZDRjMjhhZDE3ZmM3ZTAwZjFlMTA0IiwidXNlcl9pZCI6Mjd9.F9zqTjtGZ1bc1tITAZkHlHkXLLGEI2mxVXwmJRnD29o",
  dataplatform_api_uri='https://api.discovery.data.storemesh.com',
  object_storage_uri="dataframe.objectstorage.data.storemesh.com",
  object_storage_secue=True,
)

Init DataNode sucessful!


  from .autonotebook import tqdm as notebook_tqdm


In [5]:
cluster = LocalCluster(dashboard_address='0.0.0.0:8788', n_workers=4, threads_per_worker=2, host='0.0.0.0')
client = Client(cluster)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 44075 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://192.168.24.68:44075/status,

0,1
Dashboard: http://192.168.24.68:44075/status,Workers: 4
Total threads: 8,Total memory: 31.36 GiB
Status: running,Using processes: True

0,1
Comm: tcp://192.168.24.68:34403,Workers: 4
Dashboard: http://192.168.24.68:44075/status,Total threads: 8
Started: Just now,Total memory: 31.36 GiB

0,1
Comm: tcp://192.168.24.68:44859,Total threads: 2
Dashboard: http://192.168.24.68:34747/status,Memory: 7.84 GiB
Nanny: tcp://192.168.24.68:35623,
Local directory: /tmp/dask-worker-space/worker-_05abs8l,Local directory: /tmp/dask-worker-space/worker-_05abs8l

0,1
Comm: tcp://192.168.24.68:34405,Total threads: 2
Dashboard: http://192.168.24.68:45363/status,Memory: 7.84 GiB
Nanny: tcp://192.168.24.68:39037,
Local directory: /tmp/dask-worker-space/worker-n0rg_w2t,Local directory: /tmp/dask-worker-space/worker-n0rg_w2t

0,1
Comm: tcp://192.168.24.68:40291,Total threads: 2
Dashboard: http://192.168.24.68:45833/status,Memory: 7.84 GiB
Nanny: tcp://192.168.24.68:33333,
Local directory: /tmp/dask-worker-space/worker-2fgbf3wq,Local directory: /tmp/dask-worker-space/worker-2fgbf3wq

0,1
Comm: tcp://192.168.24.68:43253,Total threads: 2
Dashboard: http://192.168.24.68:39031/status,Memory: 7.84 GiB
Nanny: tcp://192.168.24.68:36435,
Local directory: /tmp/dask-worker-space/worker-r3ec8sp6,Local directory: /tmp/dask-worker-space/worker-r3ec8sp6


In [7]:
import time
@resultify
def fetch_user_details(user_id):
    response = requests.get(f"https://jsonplaceholder.typicode.com/users/{user_id}")
    time.sleep(1)
    response.raise_for_status()
    return response.json()

user = fetch_user_details(0)
user.handle(lambda u: print(u['name']), lambda e: print(f"Error fetching user details: {e}"))

Error fetching user details: 404 Client Error: Not Found for url: https://jsonplaceholder.typicode.com/users/0


In [16]:
import requests
@resultify
def get_data(value):
    proxies = {
        'http': 'http://sbtmgbme-rotate:tujc456rgvgf@p.webshare.io:80',
        'https': 'http://sbtmgbme-rotate:tujc456rgvgf@p.webshare.io:80',
    }
    
    # tax_id = value.get('tax_no')
    tax_id = value
    url = f"https://dataapi.moc.go.th/juristic?juristic_id={tax_id}"
    response = requests.get(url, proxies=proxies)
    response.raise_for_status()
    return response.json()

user = get_data({'tax_no':'0775555000875'})
user.handle(lambda u: print(u['name']), lambda e: print(f"Error fetching user details: {e}"))

Error fetching user details: HTTPSConnectionPool(host='dataapi.moc.go.th', port=443): Max retries exceeded with url: /juristic?juristic_id=%7B'tax_no':%20'0775555000875'%7D (Caused by ProxyError('Unable to connect to proxy', OSError('Tunnel connection failed: 403 Forbidden')))


In [17]:
ddf = datanode.read_ddf(file_id="553")
ddf3 = ddf.tail()

ddf3['monad'] = ddf3['tax_no'].apply(get_data)

In [18]:
ddf3

Unnamed: 0,tax_no,name_th,name_en,juristic_type,register_date,status,capital,tsic_code,objective,address,province,monad
863538,105565163055,บริษัท เอ พี ที บูสเตอร์ ปั๊ม จำกัด,-,-,-,-,-,98100,-,-,-,Error(HTTPSConnectionPool(host='dataapi.moc.go...
863539,125558010842,บริษัท อะโบฟ 29 จำกัด,-,-,-,-,-,98100,-,-,-,Error(HTTPSConnectionPool(host='dataapi.moc.go...
863540,105565036606,บริษัท ฟ็อกซ์บุรี จำกัด,-,-,-,-,-,98100,-,-,-,Error(HTTPSConnectionPool(host='dataapi.moc.go...
863541,105558183798,บริษัท อัล- ระยาน เฮลท์แคร์ จำกัด,-,-,-,-,-,99000,-,-,-,Error(HTTPSConnectionPool(host='dataapi.moc.go...
863542,303559004753,ห้างหุ้นส่วนจำกัด ธนพล 08,-,-,-,-,-,99009,-,-,-,Error(HTTPSConnectionPool(host='dataapi.moc.go...
