a
    xd!                     @   s   d dl mZ d dlmZ d dlmZ d dlmZ d dlm	Z	 e	rhd dl
mZ d dl
mZ d dlmZmZ G d	d
 d
eZdd Zdd Zdd ZG dd deZG dd deZdS )    )configure_scope)Hub)Integration)capture_internal_exceptions)MYPY)Any)Optional)EventHintc                   @   s   e Zd ZdZedd ZdS )SparkIntegrationZsparkc                   C   s
   t   d S N)patch_spark_context_init r   r   b/var/www/html/Ranjet/env/lib/python3.9/site-packages/sentry_sdk/integrations/spark/spark_driver.py
setup_once   s    zSparkIntegration.setup_onceN)__name__
__module____qualname__
identifierstaticmethodr   r   r   r   r   r      s   r   c                  C   s6   ddl m}  | j}|r2|d|j |d|j dS )z
    Set properties in driver that propagate to worker processes, allowing for workers to have access to those properties.
    This allows worker integration to have access to app_name and application_id.
    r   SparkContextZsentry_app_nameZsentry_application_idN)pysparkr   Z_active_spark_contextZsetLocalPropertyappNameapplicationId)r   Zspark_contextr   r   r   _set_app_properties   s    r   c                 C   s4   ddl m} | j}|| t }| j | dS )zA
    Start java gateway server to add custom `SparkListener`
    r   )ensure_callback_server_startedN)Zpyspark.java_gatewayr   Z_gatewaySentryListenerZ_jscscZaddSparkListener)r   r   ZgwZlistenerr   r   r   _start_sentry_listener(   s
    r   c                     s(   ddl m}  | j  fdd}|| _d S )Nr   r   c                    st    g|R i |}t jtd u r*|S t  t  t "}|j fdd}W d    n1 sf0    Y  |S )Nc                    s<  t   tjtd u r,| W  d    S | di d   | di d jd | d d jd | d d jd	 | d d
 jd | d d j	 | d d j
 | d d j | d d j | d d j | di d j W d    n1 s.0    Y  | S )Nuseridtagszexecutor.idzspark.executor.idzspark-submit.deployModezspark.submit.deployModezdriver.hostzspark.driver.hostzdriver.portzspark.driver.portZspark_versionZapp_nameZapplication_idmasterZ
spark_homeextraZweb_url)r   r   currentget_integrationr   
setdefaultZ	sparkUserZ_confgetversionr   r   r#   Z	sparkHomeZuiWebUrl)eventhintselfr   r   process_eventG   s0    

6z[patch_spark_context_init.<locals>._sentry_patched_spark_context_init.<locals>.process_event)r   r%   r&   r   r   r   r   Zadd_event_processor)r-   argskwargsinitZscoper.   Zspark_context_initr,   r   "_sentry_patched_spark_context_init;   s    ,zDpatch_spark_context_init.<locals>._sentry_patched_spark_context_init)r   r   Z_do_init)r   r3   r   r2   r   r   5   s    .r   c                   @   s   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* Zd+d, Zd-d. Zd/d0 ZG d1d2 d2Zd3S )4SparkListenerc                 C   s   d S r   r   )r-   ZapplicationEndr   r   r   onApplicationEndm   s    zSparkListener.onApplicationEndc                 C   s   d S r   r   )r-   ZapplicationStartr   r   r   onApplicationStartq   s    z SparkListener.onApplicationStartc                 C   s   d S r   r   )r-   ZblockManagerAddedr   r   r   onBlockManagerAddedu   s    z!SparkListener.onBlockManagerAddedc                 C   s   d S r   r   )r-   ZblockManagerRemovedr   r   r   onBlockManagerRemovedy   s    z#SparkListener.onBlockManagerRemovedc                 C   s   d S r   r   )r-   ZblockUpdatedr   r   r   onBlockUpdated}   s    zSparkListener.onBlockUpdatedc                 C   s   d S r   r   )r-   ZenvironmentUpdater   r   r   onEnvironmentUpdate   s    z!SparkListener.onEnvironmentUpdatec                 C   s   d S r   r   )r-   ZexecutorAddedr   r   r   onExecutorAdded   s    zSparkListener.onExecutorAddedc                 C   s   d S r   r   )r-   ZexecutorBlacklistedr   r   r   onExecutorBlacklisted   s    z#SparkListener.onExecutorBlacklistedc                 C   s   d S r   r   )r-   ZexecutorBlacklistedForStager   r   r   onExecutorBlacklistedForStage   s    z+SparkListener.onExecutorBlacklistedForStagec                 C   s   d S r   r   )r-   ZexecutorMetricsUpdater   r   r   onExecutorMetricsUpdate   s    z%SparkListener.onExecutorMetricsUpdatec                 C   s   d S r   r   )r-   ZexecutorRemovedr   r   r   onExecutorRemoved   s    zSparkListener.onExecutorRemovedc                 C   s   d S r   r   )r-   jobEndr   r   r   onJobEnd   s    zSparkListener.onJobEndc                 C   s   d S r   r   )r-   jobStartr   r   r   
onJobStart   s    zSparkListener.onJobStartc                 C   s   d S r   r   )r-   ZnodeBlacklistedr   r   r   onNodeBlacklisted   s    zSparkListener.onNodeBlacklistedc                 C   s   d S r   r   )r-   ZnodeBlacklistedForStager   r   r   onNodeBlacklistedForStage   s    z'SparkListener.onNodeBlacklistedForStagec                 C   s   d S r   r   )r-   ZnodeUnblacklistedr   r   r   onNodeUnblacklisted   s    z!SparkListener.onNodeUnblacklistedc                 C   s   d S r   r   )r-   r*   r   r   r   onOtherEvent   s    zSparkListener.onOtherEventc                 C   s   d S r   r   )r-   ZspeculativeTaskr   r   r   onSpeculativeTaskSubmitted   s    z(SparkListener.onSpeculativeTaskSubmittedc                 C   s   d S r   r   )r-   stageCompletedr   r   r   onStageCompleted   s    zSparkListener.onStageCompletedc                 C   s   d S r   r   )r-   stageSubmittedr   r   r   onStageSubmitted   s    zSparkListener.onStageSubmittedc                 C   s   d S r   r   )r-   ZtaskEndr   r   r   	onTaskEnd   s    zSparkListener.onTaskEndc                 C   s   d S r   r   )r-   ZtaskGettingResultr   r   r   onTaskGettingResult   s    z!SparkListener.onTaskGettingResultc                 C   s   d S r   r   )r-   Z	taskStartr   r   r   onTaskStart   s    zSparkListener.onTaskStartc                 C   s   d S r   r   )r-   ZunpersistRDDr   r   r   onUnpersistRDD   s    zSparkListener.onUnpersistRDDc                   @   s   e Zd ZdgZdS )zSparkListener.Javaz1org.apache.spark.scheduler.SparkListenerInterfaceN)r   r   r   Z
implementsr   r   r   r   Java   s   rQ   N)r   r   r   r5   r6   r7   r8   r9   r:   r;   r<   r=   r>   r?   rA   rC   rD   rE   rF   rG   rH   rJ   rL   rM   rN   rO   rP   rQ   r   r   r   r   r4   l   s2   r4   c                   @   s4   e Zd Zdd Zdd Zdd Zdd Zd	d
 ZdS )r   c                 C   s   t j| _d S r   )r   r%   hubr,   r   r   r   __init__   s    zSentryListener.__init__c                 C   s(   d | }| jjd|d t  d S )NzJob {} Startedinfo)levelmessage)formatjobIdrR   add_breadcrumbr   )r-   rB   rV   r   r   r   rC      s    zSentryListener.onJobStartc                 C   sd   d}d}d|   i}|   dkr<d}d| }nd}d| }| jj|||d d S )	N resultZJobSucceededrT   zJob {} EndedwarningzJob {} FailedrU   rV   data)Z	jobResulttoStringrW   rX   rR   rY   )r-   r@   rU   rV   r^   r   r   r   rA      s    zSentryListener.onJobEndc                 C   sD   |  }d| }| | d}| jjd||d t  d S )NzStage {} Submitted	attemptIdnamerT   r]   )	stageInforW   stageIdra   rb   rR   rY   r   )r-   rK   
stage_inforV   r^   r   r   r   rL      s
    zSentryListener.onStageSubmittedc                 C   s   ddl m} | }d}d}| | d}z&|  |d< d| }d}W n$ |yx   d| }d	}Y n0 | j	j
|||d
 d S )Nr   )Py4JJavaErrorrZ   r`   reasonzStage {} Failedr\   zStage {} CompletedrT   r]   )Zpy4j.protocolrf   rc   ra   rb   ZfailureReasonr(   rW   rd   rR   rY   )r-   rI   rf   re   rV   rU   r^   r   r   r   rJ      s    
zSentryListener.onStageCompletedN)r   r   r   rS   rC   rA   rL   rJ   r   r   r   r   r      s
   r   N)Z
sentry_sdkr   Zsentry_sdk.hubr   Zsentry_sdk.integrationsr   Zsentry_sdk.utilsr   Zsentry_sdk._typesr   typingr   r   r	   r
   r   r   r   r   objectr4   r   r   r   r   r   <module>   s   	7g